Compare commits

...

2 commits

Author SHA1 Message Date
32d0d2f02b
feat: Add user resource w/database as storage
Some checks failed
ci/woodpecker/pr/ci Pipeline failed
2025-01-09 00:38:22 +01:00
3ccfb881b5
feat(telemetry): add resource detection and better default
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2025-01-08 21:53:32 +01:00
17 changed files with 868 additions and 85 deletions

View file

@ -1,6 +1,6 @@
nix_direnv_manual_reload
use flake
export DATABASE_URL=postgresql://patagia:swordfish@/patagia?host=$XDG_RUNTIME_DIR/patagia-postgres
export DATABASE_URL=postgresql://patagia:swordfish@patagia?host=$XDG_RUNTIME_DIR/patagia-postgres
export OTEL_EXPORTER_OTLP_TRACES_PROTOCOL="http/protobuf"
export OTEL_RESOURCE_ATTRIBUTES=host.name=$HOSTNAME
export OTEL_SERVICE_NAME=$USER.patagia-control

722
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -34,12 +34,6 @@ clap = { version = "4.5.23", features = [
dropshot = "0.15.1"
http = "1.2.0"
once_cell = "1.20.2"
opentelemetry = "0.27.1"
opentelemetry-appender-tracing = { version = "0.27.0", features = ["log", "experimental_metadata_attributes"] }
opentelemetry-otlp = { version = "0.27.0", features = ["grpc-tonic", "gzip-tonic", "zstd-tonic", "tls", "tls-roots", "trace"] }
opentelemetry_sdk = { version = "0.27.1", features = ["metrics", "rt-tokio"] }
opentelemetry-semantic-conventions = "0.27.0"
opentelemetry-stdout = "0.27.0"
progenitor = "0.8.0"
reqwest = { version = "0.12.12", features = ["json", "stream", "rustls-tls"] }
schemars = "0.8.21"
@ -48,16 +42,8 @@ serde = { version = "1.0.217", features = ["derive"] }
slog = "2.7.0"
slog-async = "2.8.0"
tokio = { version = "1.42.0", features = ["full"] }
tonic = "0.12.3"
tracing = "0.1.41"
tracing-core = "0.1.33"
tracing-chrome = "0.7.2"
tracing-opentelemetry = "0.28.0"
tracing-slog = { git = "https://github.com/oxidecomputer/tracing-slog", default-features = false }
tracing-subscriber = { version = "0.3.19", default-features = false, features = [
"std",
"ansi",
"env-filter",
"fmt",
] }
uuid = { version = "1", features = [ "serde", "v4" ] }

View file

@ -1,8 +1,8 @@
[package]
name = "patagia-agent"
version = "0.1.0"
edition = "2021"
license = "MPL-2.0"
version.workspace = true
[dependencies]
anyhow.workspace = true

View file

@ -1,9 +1,9 @@
[package]
name = "patagia-controller"
description = "Patagia controller server"
version = "0.1.0"
description = "Patagia control plane server"
edition = "2021"
license = "MPL-2.0"
version.workspace = true
[dependencies]
anyhow.workspace = true
@ -15,10 +15,12 @@ schemars.workspace = true
serde.workspace = true
slog-async.workspace = true
slog.workspace = true
sqlx = { version = "0.8.3", features = ["postgres", "runtime-tokio", "tls-rustls", "time", "uuid"] }
tokio.workspace = true
trace-request = { path = "../trace-request" }
tracing-slog.workspace = true
tracing.workspace = true
uuid.workspace = true
[package.metadata.cargo-machete]
ignored = ["http"]

View file

@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS patagia.public.Users(
id UUID PRIMARY KEY,
name VARCHAR(63) NOT NULL,
time_deleted TIMESTAMP WITH TIME ZONE, -- non-NULL if deleted
time_created TIMESTAMP WITH TIME ZONE NOT NULL,
time_modified TIMESTAMP WITH TIME ZONE NOT NULL
);

View file

@ -4,12 +4,14 @@ use dropshot::ApiDescription;
use std::sync::Arc;
use crate::context::ControllerContext;
use crate::user;
use crate::version;
type ControllerApiDescription = ApiDescription<Arc<ControllerContext>>;
pub fn api() -> Result<ControllerApiDescription> {
let mut api = ControllerApiDescription::new();
api.register(user::get_user_by_id)?;
api.register(version::version)?;
Ok(api)
}

View file

@ -1,8 +1,8 @@
use anyhow::{anyhow, Result};
use clap::Parser;
use dropshot::{ConfigDropshot, ServerBuilder};
use slog::Drain;
use sqlx::postgres::PgPool;
use tracing_slog::TracingSlogDrain;
use std::net::SocketAddr;
@ -18,7 +18,7 @@ struct Cli {
#[arg(
long = "telemetry-otlp-endpoint",
default_value = "http://localhost:4317",
env = "OTEL_EXPORTER_OTLP_ENDPOINT",
env = "OTEL_EXPORTER_OTLP_ENDPOINT"
)]
otlp_endpoint: Option<String>,
@ -30,12 +30,19 @@ struct Cli {
)]
log_stderr: bool,
#[arg(
long = "listen-address",
default_value = "0.0.0.0:9474",
env = "LISTEN_ADDRESS"
)]
listen_address: String,
#[arg(
long = "listen-address",
default_value = "0.0.0.0:9474",
env = "LISTEN_ADDRESS"
long = "database-url",
default_value = "postgresql://localhost/patagia",
env = "DATABASE_URL"
)]
listen_address: String,
database_url: Option<String>,
}
#[tokio::main]
@ -57,7 +64,13 @@ async fn main() -> Result<()> {
slog::Logger::root(async_drain, slog::o!())
};
let ctx = ControllerContext::new();
let database_url = args.database_url.unwrap();
tracing::info!(database_url, listen_address=args.listen_address, "Starting server");
let pg = PgPool::connect(&database_url).await?;
let ctx = ControllerContext::new(pg);
let api = api::api()?;
ServerBuilder::new(api, Arc::new(ctx), logger)
.config(config)

View file

@ -1,13 +1,13 @@
pub struct ControllerContext {}
use sqlx::postgres::PgPool;
pub struct ControllerContext {
pub pg_pool: PgPool,
}
impl ControllerContext {
pub fn new() -> ControllerContext {
ControllerContext {}
}
}
impl Default for ControllerContext {
fn default() -> Self {
Self::new()
pub fn new(pg_pool : PgPool) -> ControllerContext {
ControllerContext {
pg_pool
}
}
}

View file

@ -1,4 +1,5 @@
pub mod api;
pub mod context;
mod user;
mod version;

56
controller/src/user.rs Normal file
View file

@ -0,0 +1,56 @@
use dropshot::{endpoint, HttpError, HttpResponseOk, Path, RequestContext};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use trace_request::trace_request;
use uuid::Uuid;
use std::sync::Arc;
use crate::context::ControllerContext;
/// User
#[derive(Serialize, JsonSchema)]
struct User {
id: Uuid,
name: String,
}
#[allow(dead_code)]
#[derive(Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
struct UsersPathParams {
user_id: Uuid,
}
/// Fetch user info.
#[endpoint {
method = GET,
path = "/users/{userId}",
tags = [ "user" ],
}]
#[trace_request]
pub(crate) async fn get_user_by_id(
rqctx: RequestContext<Arc<ControllerContext>>,
params: Path<UsersPathParams>,
) -> Result<HttpResponseOk<User>, HttpError> {
let id = params.into_inner().user_id;
tracing::debug!(id = id.to_string(), "Getting user by id");
let pg = rqctx.context().pg_pool.to_owned();
let rec = sqlx::query!(r#"SELECT * FROM users WHERE id = $1"#, id)
.fetch_one(&pg)
.await
.map_err(|e| match e {
sqlx::Error::RowNotFound => HttpError::for_not_found(None, format!("User not found by id: {:?}", id)),
err => HttpError::for_internal_error( format!("Error: {}", err))
}
)?;
let user = User {
id: rec.id,
name: rec.name,
};
Ok(HttpResponseOk(user))
}

View file

@ -141,6 +141,8 @@
just
nixfmt-rfc-style
rust-dev-toolchain
sqls
sqlx-cli
watchexec
]
++ commonArgs.buildInputs;

View file

@ -1,17 +1,24 @@
[package]
name = "instrumentation"
version = "0.1.0"
edition = "2021"
license = "MPL-2.0"
version.workspace = true
[dependencies]
anyhow.workspace = true
http.workspace = true
once_cell.workspace = true
opentelemetry-otlp.workspace = true
opentelemetry_sdk.workspace = true
opentelemetry-semantic-conventions.workspace = true
opentelemetry.workspace = true
tonic.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
opentelemetry-otlp = { version = "0.27.0", features = ["grpc-tonic", "gzip-tonic", "zstd-tonic", "tls", "tls-roots", "trace"] }
opentelemetry_sdk = { version = "0.27.1", features = ["metrics", "rt-tokio"] }
opentelemetry-semantic-conventions = "0.27.0"
opentelemetry-appender-tracing = { version = "0.27.0", features = ["log", "experimental_metadata_attributes"] }
opentelemetry-resource-detectors = { version = "0.6.0" }
opentelemetry = "0.27.1"
tonic = "0.12.3"
tracing-opentelemetry = "0.28.0"
tracing-subscriber = { version = "0.3.19", default-features = false, features = [
"std",
"ansi",
"env-filter",
"fmt",
] }

View file

@ -1,5 +1,4 @@
use anyhow::{anyhow, Result};
use once_cell::sync::Lazy;
use opentelemetry::{trace::TracerProvider as _, KeyValue};
use opentelemetry_otlp::{WithExportConfig, WithTonicConfig};
use opentelemetry_sdk::{
@ -9,25 +8,15 @@ use opentelemetry_sdk::{
trace::{RandomIdGenerator, Sampler, TracerProvider},
Resource,
};
use opentelemetry_semantic_conventions as semcov;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
static RESOURCE: Lazy<Resource> = Lazy::new(|| {
Resource::new(vec![
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
env!("CARGO_PKG_NAME"),
),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_VERSION,
env!("CARGO_PKG_VERSION"),
),
])
});
use std::time::Duration;
// Construct MeterProvider for MetricsLayer
fn init_meter_provider(otel_endpoint: &String) -> Result<SdkMeterProvider> {
fn init_meter_provider(otel_endpoint: &String, resource: Resource) -> Result<SdkMeterProvider> {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(otel_endpoint)
@ -38,7 +27,7 @@ fn init_meter_provider(otel_endpoint: &String) -> Result<SdkMeterProvider> {
.map_err(|e| anyhow!("Error creating OTLP metric exporter: {:?}", e))?;
let meter_provider = MeterProviderBuilder::default()
.with_resource(RESOURCE.clone())
.with_resource(resource)
.with_reader(
PeriodicReader::builder(exporter, runtime::Tokio)
.with_interval(std::time::Duration::from_secs(10))
@ -52,7 +41,7 @@ fn init_meter_provider(otel_endpoint: &String) -> Result<SdkMeterProvider> {
}
// Construct TracerProvider for OpenTelemetryLayer
fn init_tracer_provider(otel_endpoint: &String) -> Result<TracerProvider> {
fn init_tracer_provider(otel_endpoint: &String, resource: Resource) -> Result<TracerProvider> {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_tls_config(tonic::transport::ClientTlsConfig::new().with_native_roots())
@ -65,7 +54,7 @@ fn init_tracer_provider(otel_endpoint: &String) -> Result<TracerProvider> {
.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
1.0,
))))
.with_resource(RESOURCE.clone())
.with_resource(resource)
.with_id_generator(RandomIdGenerator::default())
.with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
.build();
@ -75,6 +64,25 @@ fn init_tracer_provider(otel_endpoint: &String) -> Result<TracerProvider> {
// Initialize tracing-subscriber and return TracingGuard for opentelemetry-related termination processing
pub fn init_tracing(otel_endpoint: Option<&String>, log_stderr: bool) -> Result<TracingGuard> {
let resource = {
let r = Resource::new([KeyValue::new(
semcov::resource::SERVICE_VERSION,
env!("CARGO_PKG_VERSION"),
)]);
let detected = Resource::from_detectors(
Duration::from_secs(5),
vec![
Box::new(opentelemetry_sdk::resource::SdkProvidedResourceDetector),
Box::new(opentelemetry_sdk::resource::EnvResourceDetector::new()),
Box::new(opentelemetry_resource_detectors::OsResourceDetector),
Box::new(opentelemetry_resource_detectors::ProcessResourceDetector),
Box::new(opentelemetry_sdk::resource::TelemetryResourceDetector),
],
);
r.merge(&detected)
};
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(""));
@ -84,7 +92,7 @@ pub fn init_tracing(otel_endpoint: Option<&String>, log_stderr: bool) -> Result<
};
let meter_provider = match otel_endpoint {
Some(endpoint) => Some(init_meter_provider(endpoint)?),
Some(endpoint) => Some(init_meter_provider(endpoint, resource.clone())?),
None => None,
};
@ -94,7 +102,7 @@ pub fn init_tracing(otel_endpoint: Option<&String>, log_stderr: bool) -> Result<
};
let tracer_provider = match otel_endpoint {
Some(endpoint) => Some(init_tracer_provider(endpoint)?),
Some(endpoint) => Some(init_tracer_provider(endpoint, resource)?),
None => None,
};

View file

@ -5,10 +5,12 @@ default:
@just --choose
# Run controller
[group('controller')]
run-controller $RUST_LOG="debug,h2=info,hyper_util=info,tower=info":
cargo run --package patagia-controller -- --log-stderr
# Run controller local development
[group('controller')]
dev-controller:
watchexec --clear --restart --stop-signal INT --debounce 300ms -- just run-controller
@ -56,6 +58,7 @@ check-nix:
nix flake check
# Run PostgreSQL for development and testing
[group('controller')]
dev-postgres:
mkdir -p "${XDG_RUNTIME_DIR}/patagia-postgres"
podman volume exists patagia-postgres || podman volume create patagia-postgres
@ -71,10 +74,28 @@ dev-postgres:
docker.io/postgres:17
# Clean up PostgreSQL data
[group('controller')]
dev-postgres-clean:
podman rm -f patagia-postgres || true
podman volume rm patagia-postgres || true
# Connect to PostgreSQL with psql
[group('controller')]
dev-postgres-psql:
podman exec -it patagia-postgres psql -U patagia
[group('controller')]
[working-directory: 'controller']
dev-controller-sqlx *ARGS:
sqlx {{ARGS}}
[group('controller')]
[working-directory: 'controller']
dev-controller-db-migrate:
sqlx migrate run
[group('controller')]
[working-directory: 'controller']
dev-controller-db-reset:
sqlx db reset -y

View file

@ -1,8 +1,8 @@
[package]
name = "trace-request"
version = "0.1.0"
edition = "2021"
license = "MPL-2.0"
version.workspace = true
[lib]
proc-macro = true

View file

@ -1,7 +1,7 @@
[package]
name = "xtask"
version = "0.0.0"
edition = "2021"
version.workspace = true
[[bin]]
name = "xtask"