From 61b4def7bc7ef3c3fe811ff72d482e7ca90843a8 Mon Sep 17 00:00:00 2001 From: Yeachan-Heo Date: Wed, 1 Apr 2026 06:15:15 +0000 Subject: [PATCH] feat: telemetry progress --- rust/crates/api/src/client.rs | 76 ++++++++++++++++++++- rust/crates/api/src/types.rs | 69 ++++++++++++++++++- rust/crates/api/tests/client_integration.rs | 17 ++++- rust/crates/runtime/src/mcp_stdio.rs | 16 ++++- rust/crates/rusty-claude-cli/src/main.rs | 14 +--- rust/crates/tools/src/lib.rs | 16 +---- 6 files changed, 175 insertions(+), 33 deletions(-) diff --git a/rust/crates/api/src/client.rs b/rust/crates/api/src/client.rs index 6901de6..f8eb97a 100644 --- a/rust/crates/api/src/client.rs +++ b/rust/crates/api/src/client.rs @@ -2,12 +2,12 @@ use std::collections::VecDeque; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use runtime::{ - load_oauth_credentials, save_oauth_credentials, OAuthConfig, OAuthRefreshRequest, - OAuthTokenExchangeRequest, + format_usd, load_oauth_credentials, pricing_for_model, save_oauth_credentials, OAuthConfig, + OAuthRefreshRequest, OAuthTokenExchangeRequest, }; use serde::Deserialize; use serde_json::{Map, Value}; -use telemetry::{AnthropicRequestProfile, ClientIdentity, SessionTracer}; +use telemetry::{AnalyticsEvent, AnthropicRequestProfile, ClientIdentity, SessionTracer}; use crate::error::ApiError; use crate::sse::SseParser; @@ -252,6 +252,7 @@ impl AnthropicClient { if response.request_id.is_none() { response.request_id = request_id; } + self.record_response_usage(&response); Ok(response) } @@ -420,6 +421,75 @@ impl AnthropicClient { } } + fn record_response_usage(&self, response: &MessageResponse) { + let Some(tracer) = &self.session_tracer else { + return; + }; + + let cost = response.usage.estimated_cost_usd(&response.model); + let pricing_source = if pricing_for_model(&response.model).is_some() { + "model-specific" + } else { + "default-sonnet" + }; + + let mut properties = Map::new(); + properties.insert("model".to_string(), Value::String(response.model.clone())); + properties.insert( + "pricing_source".to_string(), + Value::String(pricing_source.to_string()), + ); + properties.insert( + "input_tokens".to_string(), + Value::from(response.usage.input_tokens), + ); + properties.insert( + "output_tokens".to_string(), + Value::from(response.usage.output_tokens), + ); + properties.insert( + "cache_creation_input_tokens".to_string(), + Value::from(response.usage.cache_creation_input_tokens), + ); + properties.insert( + "cache_read_input_tokens".to_string(), + Value::from(response.usage.cache_read_input_tokens), + ); + properties.insert( + "total_tokens".to_string(), + Value::from(response.usage.total_tokens()), + ); + properties.insert( + "estimated_cost_usd".to_string(), + Value::String(format_usd(cost.total_cost_usd())), + ); + properties.insert( + "estimated_input_cost_usd".to_string(), + Value::String(format_usd(cost.input_cost_usd)), + ); + properties.insert( + "estimated_output_cost_usd".to_string(), + Value::String(format_usd(cost.output_cost_usd)), + ); + properties.insert( + "estimated_cache_creation_cost_usd".to_string(), + Value::String(format_usd(cost.cache_creation_cost_usd)), + ); + properties.insert( + "estimated_cache_read_cost_usd".to_string(), + Value::String(format_usd(cost.cache_read_cost_usd)), + ); + if let Some(request_id) = &response.request_id { + properties.insert("request_id".to_string(), Value::String(request_id.clone())); + } + + tracer.record_analytics(AnalyticsEvent { + namespace: "api".to_string(), + action: "message_usage".to_string(), + properties, + }); + } + fn request_attributes(&self, request: &MessageRequest) -> Map { let mut attributes = Map::new(); attributes.insert("model".to_string(), Value::String(request.model.clone())); diff --git a/rust/crates/api/src/types.rs b/rust/crates/api/src/types.rs index 45d5c08..f05fbe5 100644 --- a/rust/crates/api/src/types.rs +++ b/rust/crates/api/src/types.rs @@ -1,3 +1,4 @@ +use runtime::{pricing_for_model, TokenUsage, UsageCostEstimate}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -150,7 +151,29 @@ pub struct Usage { impl Usage { #[must_use] pub const fn total_tokens(&self) -> u32 { - self.input_tokens + self.output_tokens + self.input_tokens + + self.output_tokens + + self.cache_creation_input_tokens + + self.cache_read_input_tokens + } + + #[must_use] + pub const fn token_usage(&self) -> TokenUsage { + TokenUsage { + input_tokens: self.input_tokens, + output_tokens: self.output_tokens, + cache_creation_input_tokens: self.cache_creation_input_tokens, + cache_read_input_tokens: self.cache_read_input_tokens, + } + } + + #[must_use] + pub fn estimated_cost_usd(&self, model: &str) -> UsageCostEstimate { + let usage = self.token_usage(); + pricing_for_model(model).map_or_else( + || usage.estimate_cost_usd(), + |pricing| usage.estimate_cost_usd_with_pricing(pricing), + ) } } @@ -210,3 +233,47 @@ pub enum StreamEvent { ContentBlockStop(ContentBlockStopEvent), MessageStop(MessageStopEvent), } + +#[cfg(test)] +mod tests { + use runtime::format_usd; + + use super::{MessageResponse, Usage}; + + #[test] + fn usage_total_tokens_includes_cache_tokens() { + let usage = Usage { + input_tokens: 10, + cache_creation_input_tokens: 2, + cache_read_input_tokens: 3, + output_tokens: 4, + }; + + assert_eq!(usage.total_tokens(), 19); + assert_eq!(usage.token_usage().total_tokens(), 19); + } + + #[test] + fn message_response_estimates_cost_from_model_usage() { + let response = MessageResponse { + id: "msg_cost".to_string(), + kind: "message".to_string(), + role: "assistant".to_string(), + content: Vec::new(), + model: "claude-sonnet-4-20250514".to_string(), + stop_reason: Some("end_turn".to_string()), + stop_sequence: None, + usage: Usage { + input_tokens: 1_000_000, + cache_creation_input_tokens: 100_000, + cache_read_input_tokens: 200_000, + output_tokens: 500_000, + }, + request_id: None, + }; + + let cost = response.usage.estimated_cost_usd(&response.model); + assert_eq!(format_usd(cost.total_cost_usd()), "$54.6750"); + assert_eq!(response.total_tokens(), 1_800_000); + } +} diff --git a/rust/crates/api/tests/client_integration.rs b/rust/crates/api/tests/client_integration.rs index 4b78d0e..6bb1c0f 100644 --- a/rust/crates/api/tests/client_integration.rs +++ b/rust/crates/api/tests/client_integration.rs @@ -109,7 +109,7 @@ async fn send_message_applies_request_profile_and_records_telemetry() { "\"model\":\"claude-3-7-sonnet-latest\",", "\"stop_reason\":\"end_turn\",", "\"stop_sequence\":null,", - "\"usage\":{\"input_tokens\":1,\"output_tokens\":1}", + "\"usage\":{\"input_tokens\":1,\"cache_creation_input_tokens\":2,\"cache_read_input_tokens\":3,\"output_tokens\":1}", "}" ), &[("request-id", "req_profile_123")], @@ -155,7 +155,7 @@ async fn send_message_applies_request_profile_and_records_telemetry() { ); let events = sink.events(); - assert_eq!(events.len(), 4); + assert_eq!(events.len(), 6); assert!(matches!( &events[0], TelemetryEvent::HttpRequestStarted { @@ -182,6 +182,19 @@ async fn send_message_applies_request_profile_and_records_telemetry() { &events[3], TelemetryEvent::SessionTrace(trace) if trace.name == "http_request_succeeded" )); + assert!(matches!( + &events[4], + TelemetryEvent::Analytics(event) + if event.namespace == "api" + && event.action == "message_usage" + && event.properties.get("request_id") == Some(&json!("req_profile_123")) + && event.properties.get("total_tokens") == Some(&json!(7)) + && event.properties.get("estimated_cost_usd") == Some(&json!("$0.0001")) + )); + assert!(matches!( + &events[5], + TelemetryEvent::SessionTrace(trace) if trace.name == "analytics" + )); } #[tokio::test] diff --git a/rust/crates/runtime/src/mcp_stdio.rs b/rust/crates/runtime/src/mcp_stdio.rs index 7e67d5d..b72b9dd 100644 --- a/rust/crates/runtime/src/mcp_stdio.rs +++ b/rust/crates/runtime/src/mcp_stdio.rs @@ -1144,8 +1144,20 @@ mod tests { } fn cleanup_script(script_path: &Path) { - fs::remove_file(script_path).expect("cleanup script"); - fs::remove_dir_all(script_path.parent().expect("script parent")).expect("cleanup dir"); + if let Err(error) = fs::remove_file(script_path) { + assert_eq!( + error.kind(), + std::io::ErrorKind::NotFound, + "cleanup script: {error}" + ); + } + if let Err(error) = fs::remove_dir_all(script_path.parent().expect("script parent")) { + assert_eq!( + error.kind(), + std::io::ErrorKind::NotFound, + "cleanup dir: {error}" + ); + } } fn manager_server_config( diff --git a/rust/crates/rusty-claude-cli/src/main.rs b/rust/crates/rusty-claude-cli/src/main.rs index 0a4ea16..41746b3 100644 --- a/rust/crates/rusty-claude-cli/src/main.rs +++ b/rust/crates/rusty-claude-cli/src/main.rs @@ -2154,12 +2154,7 @@ impl ApiClient for AnthropicRuntimeClient { } } ApiStreamEvent::MessageDelta(delta) => { - events.push(AssistantEvent::Usage(TokenUsage { - input_tokens: delta.usage.input_tokens, - output_tokens: delta.usage.output_tokens, - cache_creation_input_tokens: 0, - cache_read_input_tokens: 0, - })); + events.push(AssistantEvent::Usage(delta.usage.token_usage())); } ApiStreamEvent::MessageStop(_) => { saw_stop = true; @@ -2655,12 +2650,7 @@ fn response_to_events( } } - events.push(AssistantEvent::Usage(TokenUsage { - input_tokens: response.usage.input_tokens, - output_tokens: response.usage.output_tokens, - cache_creation_input_tokens: response.usage.cache_creation_input_tokens, - cache_read_input_tokens: response.usage.cache_read_input_tokens, - })); + events.push(AssistantEvent::Usage(response.usage.token_usage())); events.push(AssistantEvent::MessageStop); Ok(events) } diff --git a/rust/crates/tools/src/lib.rs b/rust/crates/tools/src/lib.rs index 8dcd33d..3526d2c 100644 --- a/rust/crates/tools/src/lib.rs +++ b/rust/crates/tools/src/lib.rs @@ -13,7 +13,7 @@ use runtime::{ edit_file, execute_bash, glob_search, grep_search, load_system_prompt, read_file, write_file, ApiClient, ApiRequest, AssistantEvent, BashCommandInput, ContentBlock, ConversationMessage, ConversationRuntime, GrepSearchInput, MessageRole, PermissionMode, PermissionPolicy, - RuntimeError, Session, TokenUsage, ToolError, ToolExecutor, + RuntimeError, Session, ToolError, ToolExecutor, }; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; @@ -1723,12 +1723,7 @@ impl ApiClient for AnthropicRuntimeClient { } } ApiStreamEvent::MessageDelta(delta) => { - events.push(AssistantEvent::Usage(TokenUsage { - input_tokens: delta.usage.input_tokens, - output_tokens: delta.usage.output_tokens, - cache_creation_input_tokens: 0, - cache_read_input_tokens: 0, - })); + events.push(AssistantEvent::Usage(delta.usage.token_usage())); } ApiStreamEvent::MessageStop(_) => { saw_stop = true; @@ -1874,12 +1869,7 @@ fn response_to_events(response: MessageResponse) -> Vec { } } - events.push(AssistantEvent::Usage(TokenUsage { - input_tokens: response.usage.input_tokens, - output_tokens: response.usage.output_tokens, - cache_creation_input_tokens: response.usage.cache_creation_input_tokens, - cache_read_input_tokens: response.usage.cache_read_input_tokens, - })); + events.push(AssistantEvent::Usage(response.usage.token_usage())); events.push(AssistantEvent::MessageStop); events }