-
Notifications
You must be signed in to change notification settings - Fork 98
/
Copy pathmod.rs
127 lines (105 loc) · 3.78 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use async_trait::async_trait;
use mcp_spec::protocol::JsonRpcMessage;
use std::collections::HashMap;
use thiserror::Error;
use tokio::sync::{mpsc, oneshot, RwLock};
pub type BoxError = Box<dyn std::error::Error + Sync + Send>;
/// A generic error type for transport operations.
#[derive(Debug, Error)]
pub enum Error {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("Transport was not connected or is already closed")]
NotConnected,
#[error("Channel closed")]
ChannelClosed,
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("Unsupported message type. JsonRpcMessage can only be Request or Notification.")]
UnsupportedMessage,
#[error("Stdio process error: {0}")]
StdioProcessError(String),
#[error("SSE connection error: {0}")]
SseConnection(String),
#[error("HTTP error: {status} - {message}")]
HttpError { status: u16, message: String },
}
/// A message that can be sent through the transport
#[derive(Debug)]
pub struct TransportMessage {
/// The JSON-RPC message to send
pub message: JsonRpcMessage,
/// Channel to receive the response on (None for notifications)
pub response_tx: Option<oneshot::Sender<Result<JsonRpcMessage, Error>>>,
}
/// A generic asynchronous transport trait with channel-based communication
#[async_trait]
pub trait Transport {
type Handle: TransportHandle;
/// Start the transport and establish the underlying connection.
/// Returns the transport handle for sending messages.
async fn start(&self) -> Result<Self::Handle, Error>;
/// Close the transport and free any resources.
async fn close(&self) -> Result<(), Error>;
}
#[async_trait]
pub trait TransportHandle: Send + Sync + Clone + 'static {
async fn send(&self, message: JsonRpcMessage) -> Result<JsonRpcMessage, Error>;
}
// Helper function that contains the common send implementation
pub async fn send_message(
sender: &mpsc::Sender<TransportMessage>,
message: JsonRpcMessage,
) -> Result<JsonRpcMessage, Error> {
match message {
JsonRpcMessage::Request(request) => {
let (respond_to, response) = oneshot::channel();
let msg = TransportMessage {
message: JsonRpcMessage::Request(request),
response_tx: Some(respond_to),
};
sender.send(msg).await.map_err(|_| Error::ChannelClosed)?;
Ok(response.await.map_err(|_| Error::ChannelClosed)??)
}
JsonRpcMessage::Notification(notification) => {
let msg = TransportMessage {
message: JsonRpcMessage::Notification(notification),
response_tx: None,
};
sender.send(msg).await.map_err(|_| Error::ChannelClosed)?;
Ok(JsonRpcMessage::Nil)
}
_ => Err(Error::UnsupportedMessage),
}
}
// A data structure to store pending requests and their response channels
pub struct PendingRequests {
requests: RwLock<HashMap<String, oneshot::Sender<Result<JsonRpcMessage, Error>>>>,
}
impl Default for PendingRequests {
fn default() -> Self {
Self::new()
}
}
impl PendingRequests {
pub fn new() -> Self {
Self {
requests: RwLock::new(HashMap::new()),
}
}
pub async fn insert(&self, id: String, sender: oneshot::Sender<Result<JsonRpcMessage, Error>>) {
self.requests.write().await.insert(id, sender);
}
pub async fn respond(&self, id: &str, response: Result<JsonRpcMessage, Error>) {
if let Some(tx) = self.requests.write().await.remove(id) {
let _ = tx.send(response);
}
}
pub async fn clear(&self) {
self.requests.write().await.clear();
}
}
pub mod stdio;
pub use stdio::StdioTransport;
pub mod sse;
pub use sse::SseTransport;