Merge remote-tracking branch 'origin/rcc/runtime' into dev/rust
This commit is contained in:
@@ -2,9 +2,11 @@ use std::collections::BTreeMap;
|
||||
use std::io;
|
||||
use std::process::Stdio;
|
||||
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
|
||||
|
||||
use serde_json::Value as JsonRpcMessage;
|
||||
|
||||
use crate::mcp_client::{McpClientBootstrap, McpClientTransport, McpStdioTransport};
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -12,7 +14,7 @@ use crate::mcp_client::{McpClientBootstrap, McpClientTransport, McpStdioTranspor
|
||||
pub struct McpStdioProcess {
|
||||
child: Child,
|
||||
stdin: ChildStdin,
|
||||
stdout: ChildStdout,
|
||||
stdout: BufReader<ChildStdout>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
@@ -39,7 +41,7 @@ impl McpStdioProcess {
|
||||
Ok(Self {
|
||||
child,
|
||||
stdin,
|
||||
stdout,
|
||||
stdout: BufReader::new(stdout),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -58,6 +60,49 @@ impl McpStdioProcess {
|
||||
Ok(buffer)
|
||||
}
|
||||
|
||||
pub async fn write_jsonrpc_message(&mut self, message: &JsonRpcMessage) -> io::Result<()> {
|
||||
let encoded = encode_jsonrpc_message(message)?;
|
||||
self.write_all(&encoded).await?;
|
||||
self.flush().await
|
||||
}
|
||||
|
||||
pub async fn read_jsonrpc_message(&mut self) -> io::Result<JsonRpcMessage> {
|
||||
let payload = self.read_frame().await?;
|
||||
serde_json::from_slice(&payload)
|
||||
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
|
||||
}
|
||||
|
||||
async fn read_frame(&mut self) -> io::Result<Vec<u8>> {
|
||||
let mut content_length = None;
|
||||
loop {
|
||||
let mut line = String::new();
|
||||
let bytes_read = self.stdout.read_line(&mut line).await?;
|
||||
if bytes_read == 0 {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::UnexpectedEof,
|
||||
"MCP stdio stream closed while reading headers",
|
||||
));
|
||||
}
|
||||
if line == "\r\n" {
|
||||
break;
|
||||
}
|
||||
if let Some(value) = line.strip_prefix("Content-Length:") {
|
||||
let parsed = value
|
||||
.trim()
|
||||
.parse::<usize>()
|
||||
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
|
||||
content_length = Some(parsed);
|
||||
}
|
||||
}
|
||||
|
||||
let content_length = content_length.ok_or_else(|| {
|
||||
io::Error::new(io::ErrorKind::InvalidData, "missing Content-Length header")
|
||||
})?;
|
||||
let mut payload = vec![0_u8; content_length];
|
||||
self.stdout.read_exact(&mut payload).await?;
|
||||
Ok(payload)
|
||||
}
|
||||
|
||||
pub async fn terminate(&mut self) -> io::Result<()> {
|
||||
self.child.kill().await
|
||||
}
|
||||
@@ -88,6 +133,15 @@ fn apply_env(command: &mut Command, env: &BTreeMap<String, String>) {
|
||||
}
|
||||
}
|
||||
|
||||
fn encode_jsonrpc_message(message: &JsonRpcMessage) -> io::Result<Vec<u8>> {
|
||||
let body = serde_json::to_vec(message)
|
||||
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
|
||||
let header = format!("Content-Length: {}\r\n\r\n", body.len());
|
||||
let mut framed = header.into_bytes();
|
||||
framed.extend(body);
|
||||
Ok(framed)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::BTreeMap;
|
||||
@@ -129,6 +183,37 @@ mod tests {
|
||||
script_path
|
||||
}
|
||||
|
||||
fn write_jsonrpc_script() -> PathBuf {
|
||||
let root = temp_dir();
|
||||
fs::create_dir_all(&root).expect("temp dir");
|
||||
let script_path = root.join("jsonrpc-mcp.py");
|
||||
let script = [
|
||||
"#!/usr/bin/env python3",
|
||||
"import json, sys",
|
||||
"header = b''",
|
||||
r"while not header.endswith(b'\r\n\r\n'):",
|
||||
" chunk = sys.stdin.buffer.read(1)",
|
||||
" if not chunk:",
|
||||
" raise SystemExit(1)",
|
||||
" header += chunk",
|
||||
"length = 0",
|
||||
r"for line in header.decode().split('\r\n'):",
|
||||
r" if line.lower().startswith('content-length:'):",
|
||||
r" length = int(line.split(':', 1)[1].strip())",
|
||||
"payload = sys.stdin.buffer.read(length)",
|
||||
"json.loads(payload.decode())",
|
||||
r"response = json.dumps({'jsonrpc': '2.0', 'id': 1, 'result': {'echo': True}}).encode()",
|
||||
r"sys.stdout.buffer.write(f'Content-Length: {len(response)}\r\n\r\n'.encode() + response)",
|
||||
"sys.stdout.buffer.flush()",
|
||||
"",
|
||||
]
|
||||
.join("\n");
|
||||
fs::write(&script_path, script).expect("write script");
|
||||
let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
|
||||
permissions.set_mode(0o755);
|
||||
fs::set_permissions(&script_path, permissions).expect("chmod");
|
||||
script_path
|
||||
}
|
||||
fn sample_bootstrap(script_path: &Path) -> McpClientBootstrap {
|
||||
let config = ScopedMcpServerConfig {
|
||||
scope: ConfigSource::Local,
|
||||
@@ -185,6 +270,45 @@ mod tests {
|
||||
assert_eq!(error.kind(), ErrorKind::InvalidInput);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trips_jsonrpc_messages_over_stdio_frames() {
|
||||
let runtime = Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("runtime");
|
||||
runtime.block_on(async {
|
||||
let script_path = write_jsonrpc_script();
|
||||
let transport = crate::mcp_client::McpStdioTransport {
|
||||
command: "python3".to_string(),
|
||||
args: vec![script_path.to_string_lossy().into_owned()],
|
||||
env: BTreeMap::new(),
|
||||
};
|
||||
let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
|
||||
process
|
||||
.write_jsonrpc_message(&serde_json::json!({
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"method": "initialize"
|
||||
}))
|
||||
.await
|
||||
.expect("write jsonrpc message");
|
||||
|
||||
let response = process
|
||||
.read_jsonrpc_message()
|
||||
.await
|
||||
.expect("read jsonrpc response");
|
||||
assert_eq!(response["jsonrpc"], serde_json::json!("2.0"));
|
||||
assert_eq!(response["id"], serde_json::json!(1));
|
||||
assert_eq!(response["result"]["echo"], serde_json::json!(true));
|
||||
|
||||
let status = process.wait().await.expect("wait for exit");
|
||||
assert!(status.success());
|
||||
|
||||
fs::remove_file(&script_path).expect("cleanup script");
|
||||
fs::remove_dir_all(script_path.parent().expect("script parent")).expect("cleanup dir");
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn direct_spawn_uses_transport_env() {
|
||||
let runtime = Builder::new_current_thread()
|
||||
|
||||
Reference in New Issue
Block a user