Skip to content

Commit cee3945

Browse files
authored
feat: websocket transport (#70)
## Problem The library needed WebSocket transport support for bidirectional communication with MCP servers. ## Solution Implemented a WebSocket transport module with: - Server connection handling - Bidirectional message passing - Comprehensive test coverage - Dependency upgrades for WebSocket support Close #10 ## Rationale WebSocket provides real-time, bidirectional communication necessary for efficient MCP client-server interaction.
1 parent c7ffa71 commit cee3945

File tree

14 files changed

+737
-21
lines changed

14 files changed

+737
-21
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,14 @@
22

33
All notable changes to this project are documented in this file.
44

5+
## [Unreleased]
6+
7+
### Added
8+
- Implemented WebSocket transport (#10)
9+
- Support for bidirectional WebSocket communication
10+
- Integration with the Gun library for WebSocket handling
11+
- New mix task for testing WebSocket servers interactively
12+
513
## [0.3.12](https://github.com/cloudwalk/hermes-mcp) - 2025-04-24
614

715
### Fixed

README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ Currently, Hermes MCP offers a feature-complete client implementation conforming
2121
### Current Status
2222

2323
- [x] Complete client implementation (MCP 2024-11-05)
24-
- [x] Multiple transport options (STDIO and HTTP/SSE)
24+
- [x] Multiple transport options (STDIO, HTTP/SSE, and WebSocket)
2525
- [x] Built-in connection supervision and automatic recovery
2626
- [x] Comprehensive capability negotiation
2727
- [x] Progress tracking and cancellation support
@@ -86,6 +86,9 @@ Hermes MCP provides interactive tools for testing MCP servers with a user-friend
8686
# Test an SSE server
8787
hermes_cli --transport sse --base-url="http://localhost:8000" --base-path="/mcp"
8888

89+
# Test a WebSocket server
90+
hermes_cli --transport websocket --base-url="http://localhost:8000" --base-path="/mcp" --ws-path="/ws"
91+
8992
# Test a local process via STDIO
9093
hermes_cli --transport stdio --command="mcp" --args="run,path/to/server.py"
9194
```
@@ -96,6 +99,9 @@ hermes_cli --transport stdio --command="mcp" --args="run,path/to/server.py"
9699
# Test an SSE server
97100
mix hermes.sse.interactive --base-url="http://localhost:8000" --base-path="/mcp"
98101

102+
# Test a WebSocket server
103+
mix hermes.websocket.interactive --base-url="http://localhost:8000" --base-path="/mcp" --ws-path="/ws"
104+
99105
# Test a local process via STDIO
100106
mix hermes.stdio.interactive --command="mcp" --args="run,path/to/server.py"
101107
```

ROADMAP.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ This document outlines the development roadmap for Hermes MCP, an Elixir impleme
77
Hermes MCP currently provides a complete client implementation for the MCP 2024-11-05 specification with:
88

99
- Full protocol lifecycle management (initialization, operation, shutdown)
10-
- Multiple transport options (STDIO and HTTP/SSE)
10+
- Multiple transport options (STDIO, HTTP/SSE, and WebSocket)
1111
- Connection supervision and automatic recovery
1212
- Comprehensive capability negotiation
1313
- Progress tracking for long-running operations
@@ -86,7 +86,6 @@ Once both client and server implementations are stable, we plan to provide:
8686

8787
Beyond the core roadmap, we maintain a backlog of features for future consideration:
8888

89-
- **WebSockets Transport**: Alternative transport layer for bidirectional communication
9089
- **Observability**: Advanced telemetry and monitoring integration
9190
- **Rate Limiting and Quota Management**: For server implementations
9291
- **Testing Tools**: Extended tools for protocol testing and validation

lib/hermes/client.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ defmodule Hermes.Client do
3333
@typedoc """
3434
MCP client transport options
3535
36-
- `:layer` - The transport layer to use, either `Hermes.Transport.STDIO` or `Hermes.Transport.SSE` (required)
36+
- `:layer` - The transport layer to use, either `Hermes.Transport.STDIO`, `Hermes.Transport.SSE`, or `Hermes.Transport.WebSocket` (required)
3737
- `:name` - The transport optional custom name
3838
"""
3939
@type transport ::
4040
list(
41-
{:layer, Hermes.Transport.STDIO | Hermes.Transport.SSE}
41+
{:layer, Hermes.Transport.STDIO | Hermes.Transport.SSE | Hermes.Transport.WebSocket}
4242
| {:name, GenServer.server()}
4343
)
4444

lib/hermes/transport/websocket.ex

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
defmodule Hermes.Transport.WebSocket do
2+
@moduledoc """
3+
A transport implementation that uses WebSockets for bidirectional communication
4+
with the MCP server.
5+
6+
> ## Notes {: .info}
7+
>
8+
> For initialization and setup, check our [Installation & Setup](./installation.html) and
9+
> the [Transport options](./transport_options.html) guides for reference.
10+
"""
11+
12+
@behaviour Hermes.Transport.Behaviour
13+
14+
use GenServer
15+
16+
import Peri
17+
18+
alias Hermes.Logging
19+
alias Hermes.Transport.Behaviour, as: Transport
20+
21+
@type t :: GenServer.server()
22+
23+
@typedoc """
24+
The options for the MCP server.
25+
26+
- `:base_url` - The base URL of the MCP server (e.g. http://localhost:8000) (required).
27+
- `:base_path` - The base path of the MCP server (e.g. /mcp).
28+
- `:ws_path` - The path to the WebSocket endpoint (e.g. /mcp/ws) (default `:base_path` + `/ws`).
29+
"""
30+
@type server ::
31+
Enumerable.t(
32+
{:base_url, String.t()}
33+
| {:base_path, String.t()}
34+
| {:ws_path, String.t()}
35+
)
36+
37+
@type params_t :: Enumerable.t(option)
38+
@typedoc """
39+
The options for the WebSocket transport.
40+
41+
- `:name` - The name of the transport process, respecting the `GenServer` "Name Registration" section.
42+
- `:client` - The client to send the messages to, respecting the `GenServer` "Name Registration" section.
43+
- `:server` - The server configuration.
44+
- `:headers` - The headers to send with the HTTP requests.
45+
- `:transport_opts` - The underlying transport options to pass to Gun.
46+
"""
47+
@type option ::
48+
{:name, GenServer.name()}
49+
| {:client, GenServer.server()}
50+
| {:server, server}
51+
| {:headers, map()}
52+
| {:transport_opts, keyword}
53+
| GenServer.option()
54+
55+
defschema(:options_schema, %{
56+
name: {{:custom, &Hermes.genserver_name/1}, {:default, __MODULE__}},
57+
client:
58+
{:required,
59+
{:oneof,
60+
[
61+
{:custom, &Hermes.genserver_name/1},
62+
:pid,
63+
{:tuple, [:atom, :any]}
64+
]}},
65+
server: [
66+
base_url: {:required, {:string, {:transform, &URI.new!/1}}},
67+
base_path: {:string, {:default, "/"}},
68+
ws_path: {:string, {:default, "/ws"}}
69+
],
70+
headers: {:map, {:default, %{}}},
71+
transport_opts: {:any, {:default, []}}
72+
})
73+
74+
@impl Transport
75+
@spec start_link(params_t) :: GenServer.on_start()
76+
def start_link(opts \\ []) do
77+
opts = options_schema!(opts)
78+
GenServer.start_link(__MODULE__, Map.new(opts), name: opts[:name])
79+
end
80+
81+
@impl Transport
82+
def send_message(pid, message) when is_binary(message) do
83+
GenServer.call(pid, {:send, message})
84+
end
85+
86+
@impl Transport
87+
def shutdown(pid) do
88+
GenServer.cast(pid, :close_connection)
89+
end
90+
91+
@impl GenServer
92+
def init(%{} = opts) do
93+
server_url = URI.append_path(opts.server[:base_url], opts.server[:base_path])
94+
ws_url = URI.append_path(server_url, opts.server[:ws_path])
95+
96+
state = Map.merge(opts, %{ws_url: ws_url, gun_pid: nil, stream_ref: nil})
97+
98+
{:ok, state, {:continue, :connect}}
99+
end
100+
101+
@impl GenServer
102+
def handle_continue(:connect, state) do
103+
uri = URI.parse(state.ws_url)
104+
protocol = if uri.scheme == "https", do: :https, else: :http
105+
port = uri.port || if protocol == :https, do: 443, else: 80
106+
107+
gun_opts = %{
108+
protocols: [:http],
109+
http_opts: %{keepalive: :infinity}
110+
}
111+
112+
gun_opts = Map.merge(gun_opts, Map.new(state.transport_opts))
113+
114+
case open_connection(uri.host, port, gun_opts) do
115+
{:ok, gun_pid} ->
116+
handle_connection_established(gun_pid, uri, state)
117+
118+
{:error, reason} ->
119+
Logging.transport_event("gun_open_failed", %{reason: reason}, level: :error)
120+
{:stop, {:gun_open_failed, reason}, state}
121+
end
122+
end
123+
124+
defp open_connection(host, port, gun_opts) do
125+
:gun.open(to_charlist(host), port, gun_opts)
126+
end
127+
128+
defp handle_connection_established(gun_pid, uri, state) do
129+
Logging.transport_event("gun_opened", %{host: uri.host, port: uri.port})
130+
Process.monitor(gun_pid)
131+
132+
case :gun.await_up(gun_pid, 5000) do
133+
{:ok, _protocol} ->
134+
initiate_websocket_upgrade(gun_pid, uri, state)
135+
136+
{:error, reason} ->
137+
Logging.transport_event("gun_await_up_failed", %{reason: reason}, level: :error)
138+
{:stop, {:gun_await_up_failed, reason}, state}
139+
end
140+
end
141+
142+
defp initiate_websocket_upgrade(gun_pid, uri, state) do
143+
headers = state.headers |> Map.to_list() |> Enum.map(fn {k, v} -> {to_charlist(k), to_charlist(v)} end)
144+
path = uri.path || "/"
145+
path = if uri.query, do: "#{path}?#{uri.query}", else: path
146+
stream_ref = :gun.ws_upgrade(gun_pid, to_charlist(path), headers)
147+
Logging.transport_event("ws_upgrade_requested", %{path: path})
148+
149+
{:noreply, %{state | gun_pid: gun_pid, stream_ref: stream_ref}}
150+
end
151+
152+
@impl GenServer
153+
def handle_call({:send, message}, _from, %{gun_pid: pid, stream_ref: stream_ref} = state)
154+
when not is_nil(pid) and not is_nil(stream_ref) do
155+
:ok = :gun.ws_send(pid, stream_ref, {:text, message})
156+
Logging.transport_event("ws_message_sent", String.slice(message, 0, 100))
157+
{:reply, :ok, state}
158+
rescue
159+
e ->
160+
Logging.transport_event("ws_send_failed", %{error: Exception.message(e)}, level: :error)
161+
{:reply, {:error, :send_failed}, state}
162+
end
163+
164+
def handle_call({:send, _message}, _from, state) do
165+
{:reply, {:error, :not_connected}, state}
166+
end
167+
168+
@impl GenServer
169+
def handle_info(
170+
{:gun_ws, pid, stream_ref, {:text, data}},
171+
%{gun_pid: pid, stream_ref: stream_ref, client: client} = state
172+
) do
173+
Logging.transport_event("ws_message_received", String.slice(data, 0, 100))
174+
GenServer.cast(client, {:response, data})
175+
{:noreply, state}
176+
end
177+
178+
def handle_info({:gun_ws, pid, stream_ref, :close}, %{gun_pid: pid, stream_ref: stream_ref} = state) do
179+
Logging.transport_event("ws_closed", "Connection closed by server", level: :warning)
180+
{:stop, :normal, state}
181+
end
182+
183+
def handle_info({:gun_ws, pid, stream_ref, {:close, code, reason}}, %{gun_pid: pid, stream_ref: stream_ref} = state) do
184+
Logging.transport_event("ws_closed", %{code: code, reason: reason}, level: :warning)
185+
{:stop, {:ws_closed, code, reason}, state}
186+
end
187+
188+
def handle_info(
189+
{:gun_upgrade, pid, stream_ref, ["websocket"], _headers},
190+
%{gun_pid: pid, stream_ref: stream_ref, client: client} = state
191+
) do
192+
Logging.transport_event("ws_upgrade_success", "WebSocket connection established")
193+
GenServer.cast(client, :initialize)
194+
{:noreply, state}
195+
end
196+
197+
def handle_info({:gun_response, pid, stream_ref, _, status, headers}, %{gun_pid: pid, stream_ref: stream_ref} = state) do
198+
Logging.transport_event("ws_upgrade_rejected", %{status: status, headers: headers}, level: :error)
199+
{:stop, {:ws_upgrade_rejected, status}, state}
200+
end
201+
202+
def handle_info({:gun_error, pid, stream_ref, reason}, %{gun_pid: pid, stream_ref: stream_ref} = state) do
203+
Logging.transport_event("gun_error", %{reason: reason}, level: :error)
204+
{:stop, {:gun_error, reason}, state}
205+
end
206+
207+
def handle_info({:DOWN, _ref, :process, pid, reason}, %{gun_pid: pid} = state) do
208+
Logging.transport_event("gun_down", %{reason: reason}, level: :error)
209+
{:stop, {:gun_down, reason}, state}
210+
end
211+
212+
def handle_info(msg, state) do
213+
Logging.transport_event("unexpected_message", %{message: msg})
214+
{:noreply, state}
215+
end
216+
217+
@impl GenServer
218+
def handle_cast(:close_connection, %{gun_pid: pid} = state) when not is_nil(pid) do
219+
:ok = :gun.close(pid)
220+
{:stop, :normal, state}
221+
end
222+
223+
def handle_cast(:close_connection, state) do
224+
{:stop, :normal, state}
225+
end
226+
227+
@impl GenServer
228+
def terminate(_reason, %{gun_pid: pid} = _state) when not is_nil(pid) do
229+
:gun.close(pid)
230+
:ok
231+
end
232+
233+
def terminate(_reason, _state), do: :ok
234+
end

0 commit comments

Comments
 (0)