feat: telemetry progress
This commit is contained in:
@@ -2,12 +2,12 @@ use std::collections::VecDeque;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use runtime::{
|
||||
load_oauth_credentials, save_oauth_credentials, OAuthConfig, OAuthRefreshRequest,
|
||||
OAuthTokenExchangeRequest,
|
||||
format_usd, load_oauth_credentials, pricing_for_model, save_oauth_credentials, OAuthConfig,
|
||||
OAuthRefreshRequest, OAuthTokenExchangeRequest,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use serde_json::{Map, Value};
|
||||
use telemetry::{AnthropicRequestProfile, ClientIdentity, SessionTracer};
|
||||
use telemetry::{AnalyticsEvent, AnthropicRequestProfile, ClientIdentity, SessionTracer};
|
||||
|
||||
use crate::error::ApiError;
|
||||
use crate::sse::SseParser;
|
||||
@@ -252,6 +252,7 @@ impl AnthropicClient {
|
||||
if response.request_id.is_none() {
|
||||
response.request_id = request_id;
|
||||
}
|
||||
self.record_response_usage(&response);
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
@@ -420,6 +421,75 @@ impl AnthropicClient {
|
||||
}
|
||||
}
|
||||
|
||||
fn record_response_usage(&self, response: &MessageResponse) {
|
||||
let Some(tracer) = &self.session_tracer else {
|
||||
return;
|
||||
};
|
||||
|
||||
let cost = response.usage.estimated_cost_usd(&response.model);
|
||||
let pricing_source = if pricing_for_model(&response.model).is_some() {
|
||||
"model-specific"
|
||||
} else {
|
||||
"default-sonnet"
|
||||
};
|
||||
|
||||
let mut properties = Map::new();
|
||||
properties.insert("model".to_string(), Value::String(response.model.clone()));
|
||||
properties.insert(
|
||||
"pricing_source".to_string(),
|
||||
Value::String(pricing_source.to_string()),
|
||||
);
|
||||
properties.insert(
|
||||
"input_tokens".to_string(),
|
||||
Value::from(response.usage.input_tokens),
|
||||
);
|
||||
properties.insert(
|
||||
"output_tokens".to_string(),
|
||||
Value::from(response.usage.output_tokens),
|
||||
);
|
||||
properties.insert(
|
||||
"cache_creation_input_tokens".to_string(),
|
||||
Value::from(response.usage.cache_creation_input_tokens),
|
||||
);
|
||||
properties.insert(
|
||||
"cache_read_input_tokens".to_string(),
|
||||
Value::from(response.usage.cache_read_input_tokens),
|
||||
);
|
||||
properties.insert(
|
||||
"total_tokens".to_string(),
|
||||
Value::from(response.usage.total_tokens()),
|
||||
);
|
||||
properties.insert(
|
||||
"estimated_cost_usd".to_string(),
|
||||
Value::String(format_usd(cost.total_cost_usd())),
|
||||
);
|
||||
properties.insert(
|
||||
"estimated_input_cost_usd".to_string(),
|
||||
Value::String(format_usd(cost.input_cost_usd)),
|
||||
);
|
||||
properties.insert(
|
||||
"estimated_output_cost_usd".to_string(),
|
||||
Value::String(format_usd(cost.output_cost_usd)),
|
||||
);
|
||||
properties.insert(
|
||||
"estimated_cache_creation_cost_usd".to_string(),
|
||||
Value::String(format_usd(cost.cache_creation_cost_usd)),
|
||||
);
|
||||
properties.insert(
|
||||
"estimated_cache_read_cost_usd".to_string(),
|
||||
Value::String(format_usd(cost.cache_read_cost_usd)),
|
||||
);
|
||||
if let Some(request_id) = &response.request_id {
|
||||
properties.insert("request_id".to_string(), Value::String(request_id.clone()));
|
||||
}
|
||||
|
||||
tracer.record_analytics(AnalyticsEvent {
|
||||
namespace: "api".to_string(),
|
||||
action: "message_usage".to_string(),
|
||||
properties,
|
||||
});
|
||||
}
|
||||
|
||||
fn request_attributes(&self, request: &MessageRequest) -> Map<String, Value> {
|
||||
let mut attributes = Map::new();
|
||||
attributes.insert("model".to_string(), Value::String(request.model.clone()));
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use runtime::{pricing_for_model, TokenUsage, UsageCostEstimate};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
|
||||
@@ -150,7 +151,29 @@ pub struct Usage {
|
||||
impl Usage {
|
||||
#[must_use]
|
||||
pub const fn total_tokens(&self) -> u32 {
|
||||
self.input_tokens + self.output_tokens
|
||||
self.input_tokens
|
||||
+ self.output_tokens
|
||||
+ self.cache_creation_input_tokens
|
||||
+ self.cache_read_input_tokens
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub const fn token_usage(&self) -> TokenUsage {
|
||||
TokenUsage {
|
||||
input_tokens: self.input_tokens,
|
||||
output_tokens: self.output_tokens,
|
||||
cache_creation_input_tokens: self.cache_creation_input_tokens,
|
||||
cache_read_input_tokens: self.cache_read_input_tokens,
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn estimated_cost_usd(&self, model: &str) -> UsageCostEstimate {
|
||||
let usage = self.token_usage();
|
||||
pricing_for_model(model).map_or_else(
|
||||
|| usage.estimate_cost_usd(),
|
||||
|pricing| usage.estimate_cost_usd_with_pricing(pricing),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -210,3 +233,47 @@ pub enum StreamEvent {
|
||||
ContentBlockStop(ContentBlockStopEvent),
|
||||
MessageStop(MessageStopEvent),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use runtime::format_usd;
|
||||
|
||||
use super::{MessageResponse, Usage};
|
||||
|
||||
#[test]
|
||||
fn usage_total_tokens_includes_cache_tokens() {
|
||||
let usage = Usage {
|
||||
input_tokens: 10,
|
||||
cache_creation_input_tokens: 2,
|
||||
cache_read_input_tokens: 3,
|
||||
output_tokens: 4,
|
||||
};
|
||||
|
||||
assert_eq!(usage.total_tokens(), 19);
|
||||
assert_eq!(usage.token_usage().total_tokens(), 19);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn message_response_estimates_cost_from_model_usage() {
|
||||
let response = MessageResponse {
|
||||
id: "msg_cost".to_string(),
|
||||
kind: "message".to_string(),
|
||||
role: "assistant".to_string(),
|
||||
content: Vec::new(),
|
||||
model: "claude-sonnet-4-20250514".to_string(),
|
||||
stop_reason: Some("end_turn".to_string()),
|
||||
stop_sequence: None,
|
||||
usage: Usage {
|
||||
input_tokens: 1_000_000,
|
||||
cache_creation_input_tokens: 100_000,
|
||||
cache_read_input_tokens: 200_000,
|
||||
output_tokens: 500_000,
|
||||
},
|
||||
request_id: None,
|
||||
};
|
||||
|
||||
let cost = response.usage.estimated_cost_usd(&response.model);
|
||||
assert_eq!(format_usd(cost.total_cost_usd()), "$54.6750");
|
||||
assert_eq!(response.total_tokens(), 1_800_000);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,7 +109,7 @@ async fn send_message_applies_request_profile_and_records_telemetry() {
|
||||
"\"model\":\"claude-3-7-sonnet-latest\",",
|
||||
"\"stop_reason\":\"end_turn\",",
|
||||
"\"stop_sequence\":null,",
|
||||
"\"usage\":{\"input_tokens\":1,\"output_tokens\":1}",
|
||||
"\"usage\":{\"input_tokens\":1,\"cache_creation_input_tokens\":2,\"cache_read_input_tokens\":3,\"output_tokens\":1}",
|
||||
"}"
|
||||
),
|
||||
&[("request-id", "req_profile_123")],
|
||||
@@ -155,7 +155,7 @@ async fn send_message_applies_request_profile_and_records_telemetry() {
|
||||
);
|
||||
|
||||
let events = sink.events();
|
||||
assert_eq!(events.len(), 4);
|
||||
assert_eq!(events.len(), 6);
|
||||
assert!(matches!(
|
||||
&events[0],
|
||||
TelemetryEvent::HttpRequestStarted {
|
||||
@@ -182,6 +182,19 @@ async fn send_message_applies_request_profile_and_records_telemetry() {
|
||||
&events[3],
|
||||
TelemetryEvent::SessionTrace(trace) if trace.name == "http_request_succeeded"
|
||||
));
|
||||
assert!(matches!(
|
||||
&events[4],
|
||||
TelemetryEvent::Analytics(event)
|
||||
if event.namespace == "api"
|
||||
&& event.action == "message_usage"
|
||||
&& event.properties.get("request_id") == Some(&json!("req_profile_123"))
|
||||
&& event.properties.get("total_tokens") == Some(&json!(7))
|
||||
&& event.properties.get("estimated_cost_usd") == Some(&json!("$0.0001"))
|
||||
));
|
||||
assert!(matches!(
|
||||
&events[5],
|
||||
TelemetryEvent::SessionTrace(trace) if trace.name == "analytics"
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
Reference in New Issue
Block a user