From 2b0be0854f2de48f23ca06dfc4de2421993314a5 Mon Sep 17 00:00:00 2001 From: David Hewitt Date: Thu, 6 Mar 2025 17:57:00 +0000 Subject: [PATCH] set up default logfire endpoint and token handling (#3) --- Cargo.toml | 6 +- src/lib.rs | 291 +++++++++++++++++++++++++++++------------------------ 2 files changed, 164 insertions(+), 133 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 934363e..05d114c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,9 +35,9 @@ ulid = "*" default = ["export-http-protobuf"] serde = ["dep:serde"] # FIXME might need rustls feature on all of these? -export-grpc = ["opentelemetry-otlp/grpc-tonic"] -export-http-protobuf = ["opentelemetry-otlp/http-proto", "opentelemetry-otlp/reqwest-blocking-client"] -export-http-json = ["opentelemetry-otlp/http-json", "opentelemetry-otlp/reqwest-blocking-client"] +export-grpc = ["opentelemetry-otlp/grpc-tonic", "opentelemetry-otlp/tls"] +export-http-protobuf = ["opentelemetry-otlp/http-proto", "opentelemetry-otlp/reqwest-blocking-client", "opentelemetry-otlp/reqwest-rustls"] +export-http-json = ["opentelemetry-otlp/http-json", "opentelemetry-otlp/reqwest-blocking-client", "opentelemetry-otlp/reqwest-rustls"] [lints.clippy] dbg_macro = "deny" diff --git a/src/lib.rs b/src/lib.rs index cb7621e..0a3355a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ use std::borrow::Cow; use std::cell::RefCell; +use std::collections::HashMap; use std::fmt; use std::panic::PanicHookInfo; use std::sync::{Arc, LazyLock}; @@ -11,7 +12,9 @@ use futures_util::future::BoxFuture; use nu_ansi_term::{Color, Style}; use opentelemetry::Value; use opentelemetry::trace::TracerProvider; -use opentelemetry_otlp::{MetricExporter, Protocol, SpanExporter, WithExportConfig}; +use opentelemetry_otlp::{ + MetricExporter, Protocol, SpanExporter, WithExportConfig, WithHttpConfig, +}; use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider}; use opentelemetry_sdk::trace::{BatchConfigBuilder, BatchSpanProcessor, SimpleSpanProcessor}; use opentelemetry_sdk::trace::{SdkTracerProvider, SpanData, Tracer}; @@ -55,6 +58,12 @@ pub enum ConfigureError { #[error("Error configuring the OpenTelemetry tracer: {0}")] RustLogInvalid(#[from] tracing_subscriber::filter::FromEnvError), + #[error("Rust feature required: `{feature_name}` feature must be enabled for {functionality}")] + LogfireFeatureRequired { + feature_name: &'static str, + functionality: String, + }, + #[error(transparent)] Other(#[from] Box), } @@ -130,7 +139,13 @@ impl LogfireConfigBuilder { /// /// See [`ConfigureError`] for possible errors. pub fn finish(&self) -> Result { - let (tracer, subscriber, panic_handler, tracer_provider) = self.build_parts()?; + let LogfireParts { + tracer, + subscriber, + panic_handler, + tracer_provider, + logfire_token, + } = self.build_parts()?; GLOBAL_TRACER .set(tracer.clone()) @@ -149,7 +164,8 @@ impl LogfireConfigBuilder { // setup metrics only if sending to logfire let meter_provider = if self.send_to_logfire { - let metric_reader = PeriodicReader::builder(metrics_exporter()?).build(); + let metric_reader = + PeriodicReader::builder(metric_exporter(logfire_token.as_deref())?).build(); let meter_provider = SdkMeterProvider::builder() .with_reader(metric_reader) @@ -173,6 +189,8 @@ impl LogfireConfigBuilder { } fn build_parts(&self) -> Result { + let logfire_token = get_optional_env("LOGFIRE_TOKEN")?; + let tracer_provider = if let Some(provider) = &self.tracer_provider { provider.clone() } else { @@ -183,7 +201,7 @@ impl LogfireConfigBuilder { tracer_provider_builder.with_span_processor( BatchSpanProcessor::builder(LogfireSpanExporter { write_console: self.console_mode == ConsoleMode::Force, - inner: Some(trace_exporter()?), + inner: Some(span_exporter(logfire_token.as_deref())?), }) .with_batch_config( BatchConfigBuilder::default() @@ -245,7 +263,13 @@ impl LogfireConfigBuilder { }) as _ }); - Ok((tracer, Arc::new(subscriber), panic_handler, tracer_provider)) + Ok(LogfireParts { + tracer, + subscriber: Arc::new(subscriber), + panic_handler, + tracer_provider, + logfire_token, + }) } } @@ -279,12 +303,13 @@ impl ShutdownHandler { type PanicHandler = Box; -type LogfireParts = ( - Tracer, - Arc, - Option, - SdkTracerProvider, -); +struct LogfireParts { + tracer: Tracer, + subscriber: Arc, + panic_handler: Option, + tracer_provider: SdkTracerProvider, + logfire_token: Option, +} /// Install `handler` as part of a chain of panic handlers. fn install_panic_handler(handler: PanicHandler) { @@ -460,24 +485,70 @@ impl LogfireSpanExporter { } } -fn trace_exporter() -> Result { - let mut protocol_env_var = get_optional_env("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL")?; - if protocol_env_var.is_none() { - protocol_env_var = get_optional_env("OTEL_EXPORTER_OTLP_PROTOCOL")?; - }; +// current default logfire protocol is to export over HTTP in binary format +const DEFAULT_LOGFIRE_PROTOCOL: Protocol = Protocol::HttpBinary; + +// standard OTLP protocol values in configuration +const OTEL_EXPORTER_OTLP_PROTOCOL_GRPC: &str = "grpc"; +const OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_PROTOBUF: &str = "http/protobuf"; +const OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_JSON: &str = "http/json"; + +/// Temporary workaround for lack of https://github.com/open-telemetry/opentelemetry-rust/pull/2758 +fn protocol_from_str(value: &str) -> Result { + match value { + OTEL_EXPORTER_OTLP_PROTOCOL_GRPC => Ok(Protocol::Grpc), + OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_PROTOBUF => Ok(Protocol::HttpBinary), + OTEL_EXPORTER_OTLP_PROTOCOL_HTTP_JSON => Ok(Protocol::HttpJson), + _ => Err(ConfigureError::Other( + format!("unsupported protocol: {value}").into(), + )), + } +} + +/// Get a protocol from the environment (or default value), returning a string describing the source +/// plus the parsed protocol. +fn protocol_from_env(data_env_var: &str) -> Result<(String, Protocol), ConfigureError> { + // try both data-specific env var and general protocol + [data_env_var, "OTEL_EXPORTER_OTLP_PROTOCOL"] + .into_iter() + .find_map(|var_name| match get_optional_env(var_name) { + Ok(Some(value)) => Some(Ok((var_name, value))), + Ok(None) => None, + Err(e) => Some(Err(e)), + }) + .transpose()? + .map_or_else( + || { + Ok(( + "the default logfire export protocol".to_string(), + DEFAULT_LOGFIRE_PROTOCOL, + )) + }, + |(var_name, value)| Ok((format!("`{var_name}={value}`"), protocol_from_str(&value)?)), + ) +} - let protocol = match protocol_env_var.as_deref() { - // FIXME: opentelemetry-rust should export these constant - Some("grpc") => Protocol::Grpc, - // NB default protocol for logfire is presently http protobuf - Some("http/protobuf") | None => Protocol::HttpBinary, - Some("http/json") => Protocol::HttpJson, - Some(protocol) => { - return Err(ConfigureError::Other( - format!("unknown OTEL_EXPORTER_OTLP_PROTOCOL: `{protocol}`",).into(), - )); +macro_rules! feature_required { + ($feature_name:literal, $functionality:expr, $if_enabled:expr) => {{ + #[cfg(feature = $feature_name)] + { + Ok($if_enabled) } - }; + + #[cfg(not(feature = $feature_name))] + { + Err(ConfigureError::LogfireFeatureRequired { + feature_name: $feature_name, + functionality: $functionality, + }) + } + }}; +} + +fn span_exporter(logfire_token: Option<&str>) -> Result { + let (source, protocol) = protocol_from_env("OTEL_EXPORTER_OTLP_TRACES_PROTOCOL")?; + + let builder = SpanExporter::builder(); // FIXME: it would be nice to let `opentelemetry-rust` handle this; ideally we could detect if // OTEL_EXPORTER_OTLP_PROTOCOL or OTEL_EXPORTER_OTLP_TRACES_PROTOCOL is set and let the SDK @@ -487,73 +558,31 @@ fn trace_exporter() -> Result { // https://github.com/open-telemetry/opentelemetry-rust/issues/1983 match protocol { Protocol::Grpc => { - #[cfg(feature = "export-grpc")] - { - Ok(SpanExporter::builder().with_tonic().build()?) - } - - #[cfg(not(feature = "export-grpc"))] - { - Err(ConfigureError::Other( - "OTEL_EXPORTER_OTLP_PROTOCOL=grpc requires the `export-grpc` feature".into(), - )) - } + feature_required!("export-grpc", source, { builder.with_tonic().build()? }) } Protocol::HttpBinary => { - #[cfg(feature = "export-http-protobuf")] - { - Ok(SpanExporter::builder() + feature_required!("export-http-protobuf", source, { + builder .with_http() .with_protocol(Protocol::HttpBinary) - .build()?) - } - - #[cfg(not(feature = "export-http-protobuf"))] - { - Err(ConfigureError::Other( - "OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf requires the `export-http-protobuf` feature" - .into(), - )) - } + .with_logfire_http_defaults(logfire_token, "v1/traces") + .build()? + }) } Protocol::HttpJson => { - #[cfg(feature = "export-http-json")] - { - Ok(SpanExporter::builder() + feature_required!("export-http-json", source, { + builder .with_http() - .with_protocol(Protocol::HttpJson) - .build()?) - } - - #[cfg(not(feature = "export-http-json"))] - { - Err(ConfigureError::Other( - "OTEL_EXPORTER_OTLP_PROTOCOL=http/json requires the `export-http-json` feature" - .into(), - )) - } + .with_protocol(Protocol::HttpBinary) + .with_logfire_http_defaults(logfire_token, "v1/traces") + .build()? + }) } } } -fn metrics_exporter() -> Result { - let mut protocol_env_var = get_optional_env("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL")?; - if protocol_env_var.is_none() { - protocol_env_var = get_optional_env("OTEL_EXPORTER_OTLP_PROTOCOL")?; - }; - - let protocol = match protocol_env_var.as_deref() { - // FIXME: opentelemetry-rust should export these constant - Some("grpc") => Protocol::Grpc, - // NB default protocol for logfire is presently http protobuf - Some("http/protobuf") | None => Protocol::HttpBinary, - Some("http/json") => Protocol::HttpJson, - Some(protocol) => { - return Err(ConfigureError::Other( - format!("unknown OTEL_EXPORTER_OTLP_PROTOCOL: `{protocol}`",).into(), - )); - } - }; +fn metric_exporter(logfire_token: Option<&str>) -> Result { + let (source, protocol) = protocol_from_env("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL")?; let builder = MetricExporter::builder().with_temporality(opentelemetry_sdk::metrics::Temporality::Delta); @@ -566,52 +595,48 @@ fn metrics_exporter() -> Result { // https://github.com/open-telemetry/opentelemetry-rust/issues/1983 match protocol { Protocol::Grpc => { - #[cfg(feature = "export-grpc")] - { - Ok(builder.with_tonic().build()?) - } - - #[cfg(not(feature = "export-grpc"))] - { - Err(ConfigureError::Other( - "OTEL_EXPORTER_OTLP_PROTOCOL=grpc requires the `export-grpc` feature".into(), - )) - } + feature_required!("export-grpc", source, { builder.with_tonic().build()? }) } Protocol::HttpBinary => { - #[cfg(feature = "export-http-protobuf")] - { - Ok(builder + feature_required!("export-http-protobuf", source, { + builder .with_http() .with_protocol(Protocol::HttpBinary) - .build()?) - } - - #[cfg(not(feature = "export-http-protobuf"))] - { - Err(ConfigureError::Other( - "OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf requires the `export-http-protobuf` feature" - .into(), - )) - } + .with_logfire_http_defaults(logfire_token, "v1/metrics") + .build()? + }) } Protocol::HttpJson => { - #[cfg(feature = "export-http-json")] - { - Ok(builder + feature_required!("export-http-json", source, { + builder .with_http() - .with_protocol(Protocol::HttpJson) - .build()?) - } + .with_protocol(Protocol::HttpBinary) + .with_logfire_http_defaults(logfire_token, "v1/metrics") + .build()? + }) + } + } +} - #[cfg(not(feature = "export-http-json"))] - { - Err(ConfigureError::Other( - "OTEL_EXPORTER_OTLP_PROTOCOL=http/json requires the `export-http-json` feature" - .into(), - )) - } +/// Internal helper to build an exporter with logfire default config +trait WithLogfireHttpExportDefaults: WithHttpConfig + WithExportConfig + Sized { + fn with_logfire_http_defaults(self, logfire_token: Option<&str>, endpoint: &str) -> Self; +} + +impl WithLogfireHttpExportDefaults for T +where + T: WithHttpConfig + WithExportConfig + Sized, +{ + fn with_logfire_http_defaults(self, logfire_token: Option<&str>, endpoint: &str) -> Self { + let mut headers = HashMap::new(); + if let Some(logfire_token) = logfire_token { + headers.insert( + "Authorization".to_string(), + format!("Bearer {logfire_token}"), + ); } + self.with_headers(headers) + .with_endpoint(format!("https://logfire-api.pydantic.dev/{endpoint}")) } } @@ -668,7 +693,13 @@ impl Drop for LocalLogfireGuard { #[cfg(test)] #[expect(clippy::needless_pass_by_value)] // might consume in the future, leave it for now fn set_local_logfire(config: LogfireConfigBuilder) -> Result { - let (tracer, subscriber, panic_handler, tracer_provider) = config.build_parts()?; + let LogfireParts { + tracer, + subscriber, + panic_handler, + tracer_provider, + .. + } = config.build_parts()?; let prior = LOCAL_TRACER.with_borrow_mut(|local_logfire| local_logfire.replace(tracer.clone())); @@ -890,7 +921,7 @@ mod tests { "code.lineno", ), value: I64( - 830, + 861, ), }, KeyValue { @@ -1016,7 +1047,7 @@ mod tests { "code.lineno", ), value: I64( - 831, + 862, ), }, KeyValue { @@ -1152,7 +1183,7 @@ mod tests { "code.lineno", ), value: I64( - 831, + 862, ), }, KeyValue { @@ -1294,7 +1325,7 @@ mod tests { "code.lineno", ), value: I64( - 832, + 863, ), }, KeyValue { @@ -1436,7 +1467,7 @@ mod tests { "code.lineno", ), value: I64( - 834, + 865, ), }, KeyValue { @@ -1606,7 +1637,7 @@ mod tests { "code.lineno", ), value: I64( - 835, + 866, ), }, KeyValue { @@ -1685,7 +1716,7 @@ mod tests { ), value: String( Owned( - "src/lib.rs:836:17", + "src/lib.rs:867:17", ), ), }, @@ -1752,7 +1783,7 @@ mod tests { "code.lineno", ), value: I64( - 240, + 258, ), }, KeyValue { @@ -1850,7 +1881,7 @@ mod tests { "code.lineno", ), value: I64( - 830, + 861, ), }, KeyValue {