Compare commits

..

23 commits

Author SHA1 Message Date
8e99ab4555 #9 chore(cargo): update dependencies
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Reviewed-on: #9
2025-01-14 16:58:02 +01:00
eb8926c091
chore(cargo): update dependencies
All checks were successful
ci/woodpecker/pr/ci Pipeline was successful
2025-01-14 16:52:19 +01:00
092a66a3fb #7 feat: Add user resource w/database as storage
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
The User resource itself is just more or less a place holder for now.

But the idea is to get some proper e2e example to build off of with the basics in place:

- [x] sqlx for database queries
- [x] sqlx schema migration
- [x] type safe dropshot api

Reviewed-on: #7
2025-01-14 14:25:52 +01:00
9b7e1fb226
feat: Add user resource w/database as storage
All checks were successful
ci/woodpecker/pr/ci Pipeline was successful
2025-01-14 14:24:35 +01:00
3b04b82998
chore(clippy): cleanup
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2025-01-12 14:38:08 +01:00
c460e4b992
chore: typo
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2025-01-09 00:41:26 +01:00
3ccfb881b5
feat(telemetry): add resource detection and better default
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2025-01-08 21:53:32 +01:00
e297e16cfb
feat(direnv): opentelemetry env vars for development
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2025-01-08 18:55:43 +01:00
7333ced376
feat(controller): use env in clap to allow env var config
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2025-01-08 16:41:58 +01:00
a9a4c513f5
feat(controller): add listen-address flag 2025-01-08 16:40:07 +01:00
abd6748703
chore(just): remove unnecessary dev-postgres-reset recipe
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2025-01-08 14:16:03 +01:00
ffe95cb83c
feat(devel): postgresql dev server
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2025-01-08 00:11:21 +01:00
d5ab8058f2
chore(cargo): update dependencies
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2025-01-06 00:28:07 +01:00
d4c5a71367
chore(nix): nix update 2025-01-06 00:20:48 +01:00
cf3dd45f48
chore: fmt 2025-01-06 00:20:15 +01:00
817e940167
chore(nix): remove devshell hook banner 2025-01-06 00:13:43 +01:00
39eb69172f
nix: add nixfmt to devshell 2025-01-06 00:06:33 +01:00
49372f0e58
chore(nix): nix update
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2024-12-26 22:47:30 +01:00
ab71064b29
feat(telemetry): mark error field in spans w/errors
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2024-12-26 21:10:11 +01:00
86db3cb3a0
feat(instrumentation): Add logging to stderr config
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2024-12-25 17:54:57 +01:00
706b6787d3
chore: clean up tracing subscriber setup
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2024-12-25 17:15:12 +01:00
b1f701ddf2
feat(telemetry): configure otlp endpoint from flag
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2024-12-24 12:16:03 +01:00
441c38b3d5 #4 Use trace-request for dropshot instrumentation
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
A convenience macro for instrumenting dropshot request handlers.

This is a fork of  https://github.com/oxidecomputer/rfd-api/tree/main/trace-request with some changes for flavor:

- Use span fields instead of emitting events at start+end of request
- Name the span based on the attributed function
- A different opinion on field/tag names

Maybe we can/should publish this as a public crate if more people are interested?

Reviewed-on: #4
2024-12-17 11:12:13 +01:00
29 changed files with 1598 additions and 427 deletions

8
.cargo/audit.toml Normal file
View file

@ -0,0 +1,8 @@
[advisories]
ignore = [
# Advisory about a vulnerability in rsa, which we don't use, but comes via sqlx due
# to a bug in cargo. For context, see:
# https://github.com/launchbadge/sqlx/issues/2911
# and https://github.com/rust-lang/cargo/issues/10801
"RUSTSEC-2023-0071"
]

View file

@ -1,3 +1,8 @@
nix_direnv_manual_reload
use flake
export DATABASE_URL=postgresql://patagia:swordfish@patagia?host=$XDG_RUNTIME_DIR/patagia-postgres
export OTEL_EXPORTER_OTLP_TRACES_PROTOCOL="http/protobuf"
export OTEL_RESOURCE_ATTRIBUTES=host.name=$HOSTNAME
export OTEL_SERVICE_NAME=$USER.patagia-control
export OTEL_TRACES_SAMPLER=always_on
dotenv_if_exists

1108
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -23,39 +23,28 @@ edition = "2021"
name = "patagia-run"
[workspace.dependencies]
anyhow = "1.0.94"
clap = { version = "4.5.23", features = [
anyhow = "1.0.95"
clap = { version = "4.5.26", features = [
"derive",
"deprecated",
"env",
"wrap_help",
"string",
] }
dropshot = "0.15.1"
futures = "0.3"
http = "1.2.0"
once_cell = "1.20.2"
opentelemetry = "0.27.1"
opentelemetry-appender-tracing = { version = "0.27.0", features = ["log", "experimental_metadata_attributes"] }
opentelemetry-otlp = { version = "0.27.0", features = ["grpc-tonic", "trace"] }
opentelemetry_sdk = { version = "0.27.1", features = ["metrics", "rt-tokio"] }
opentelemetry-semantic-conventions = "0.27.0"
opentelemetry-stdout = "0.27.0"
progenitor = "0.8.0"
reqwest = { version = "0.12.9", features = ["json", "stream", "rustls-tls"] }
progenitor = "0.9"
reqwest = { version = "0.12.12", features = ["json", "stream", "rustls-tls"] }
schemars = "0.8.21"
semver = "1.0.24"
serde = { version = "1.0.216", features = ["derive"] }
serde = { version = "1.0.217", features = ["derive"] }
slog = "2.7.0"
slog-async = "2.8.0"
tokio = { version = "1.42.0", features = ["full"] }
tokio = { version = "1.43.0", features = ["full"] }
tracing = "0.1.41"
tracing-core = "0.1.33"
tracing-chrome = "0.7.2"
tracing-opentelemetry = "0.28.0"
tracing-slog = { git = "https://github.com/oxidecomputer/tracing-slog", default-features = false }
tracing-subscriber = { version = "0.3.19", default-features = false, features = [
"std",
"ansi",
"env-filter",
"fmt",
] }
uuid = { version = "1", features = [ "serde", "v4" ] }

View file

@ -1,12 +1,13 @@
[package]
name = "patagia-agent"
version = "0.1.0"
edition = "2021"
license = "MPL-2.0"
version.workspace = true
[dependencies]
anyhow.workspace = true
clap.workspace = true
futures.workspace = true
instrumentation = { path = "../instrumentation" }
progenitor.workspace = true
reqwest.workspace = true
@ -14,6 +15,7 @@ schemars.workspace = true
serde.workspace = true
tokio.workspace = true
tracing.workspace = true
uuid.workspace = true
[package.metadata.cargo-machete]
ignored = ["reqwest", "serde"]

View file

@ -6,12 +6,27 @@ mod patagia_api;
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Cli {}
struct Cli {
#[arg(
long = "telemetry-otlp-endpoint",
default_value = "http://localhost:4317",
value_name = "OTEL_EXPORTER_OTLP_ENDPOINT"
)]
otlp_endpoint: Option<String>,
#[arg(
long = "log-stderr",
short = 'v',
default_value = "false",
value_name = "LOG_TO_STDERR"
)]
log_stderr: bool,
}
#[tokio::main]
async fn main() -> Result<()> {
let _args = Cli::parse();
let _tracing = instrumentation::init_tracing_subscriber()?;
let args = Cli::parse();
let _tracing = instrumentation::init_tracing(args.otlp_endpoint.as_ref(), args.log_stderr)?;
tracing::info!("Patagia Agent");

135
api.json
View file

@ -5,6 +5,96 @@
"version": "1.0.0"
},
"paths": {
"/users": {
"get": {
"tags": [
"user"
],
"summary": "List users",
"operationId": "list_users",
"parameters": [
{
"in": "query",
"name": "limit",
"description": "Maximum number of items returned by a single call",
"schema": {
"nullable": true,
"type": "integer",
"format": "uint32",
"minimum": 1
}
},
{
"in": "query",
"name": "page_token",
"description": "Token returned by previous call to retrieve the subsequent page",
"schema": {
"nullable": true,
"type": "string"
}
}
],
"responses": {
"200": {
"description": "successful operation",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/UserResultsPage"
}
}
}
},
"4XX": {
"$ref": "#/components/responses/Error"
},
"5XX": {
"$ref": "#/components/responses/Error"
}
},
"x-dropshot-pagination": {
"required": []
}
}
},
"/users/{userId}": {
"get": {
"tags": [
"user"
],
"summary": "Fetch user info.",
"operationId": "get_user_by_id",
"parameters": [
{
"in": "path",
"name": "userId",
"required": true,
"schema": {
"type": "string",
"format": "uuid"
}
}
],
"responses": {
"200": {
"description": "successful operation",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/User"
}
}
}
},
"4XX": {
"$ref": "#/components/responses/Error"
},
"5XX": {
"$ref": "#/components/responses/Error"
}
}
}
},
"/version": {
"get": {
"summary": "Fetch version info.",
@ -51,6 +141,44 @@
"request_id"
]
},
"User": {
"description": "User",
"type": "object",
"properties": {
"id": {
"type": "string",
"format": "uuid"
},
"name": {
"type": "string"
}
},
"required": [
"id",
"name"
]
},
"UserResultsPage": {
"description": "A single page of results",
"type": "object",
"properties": {
"items": {
"description": "list of items on this page of results",
"type": "array",
"items": {
"$ref": "#/components/schemas/User"
}
},
"next_page": {
"nullable": true,
"description": "token used to fetch the next page of results (if any)",
"type": "string"
}
},
"required": [
"items"
]
},
"VersionInfo": {
"description": "Version and build information",
"type": "object",
@ -80,5 +208,10 @@
}
}
}
}
},
"tags": [
{
"name": "user"
}
]
}

View file

@ -0,0 +1,47 @@
{
"db_name": "PostgreSQL",
"query": "SELECT * FROM users WHERE id > coalesce($1, '00000000-0000-0000-0000-000000000000'::UUID) ORDER BY id LIMIT $2",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "name",
"type_info": "Varchar"
},
{
"ordinal": 2,
"name": "time_deleted",
"type_info": "Timestamptz"
},
{
"ordinal": 3,
"name": "time_created",
"type_info": "Timestamptz"
},
{
"ordinal": 4,
"name": "time_modified",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"Uuid",
"Int8"
]
},
"nullable": [
false,
false,
true,
false,
false
]
},
"hash": "40dee0d539971f95bb3dc2ba4c49d5910bfdb2a6c9b82ddb296854973369594c"
}

View file

@ -0,0 +1,46 @@
{
"db_name": "PostgreSQL",
"query": "SELECT * FROM users WHERE id = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "name",
"type_info": "Varchar"
},
{
"ordinal": 2,
"name": "time_deleted",
"type_info": "Timestamptz"
},
{
"ordinal": 3,
"name": "time_created",
"type_info": "Timestamptz"
},
{
"ordinal": 4,
"name": "time_modified",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
false,
true,
false,
false
]
},
"hash": "843923b9a0257cf80f1dff554e7dc8fdfc05f489328e8376513124dfb42996e3"
}

View file

@ -1,9 +1,9 @@
[package]
name = "patagia-controller"
description = "Patagia controller server"
version = "0.1.0"
description = "Patagia control plane server"
edition = "2021"
license = "MPL-2.0"
version.workspace = true
[dependencies]
anyhow.workspace = true
@ -15,10 +15,14 @@ schemars.workspace = true
serde.workspace = true
slog-async.workspace = true
slog.workspace = true
sqlx = { version = "0.8.3", default-features = false, features = [
"macros", "migrate", "postgres", "runtime-tokio", "tls-rustls", "time", "uuid"
] }
tokio.workspace = true
trace-request = { path = "../trace-request" }
tracing-slog.workspace = true
tracing.workspace = true
uuid.workspace = true
[package.metadata.cargo-machete]
ignored = ["http"]

5
controller/build.rs Normal file
View file

@ -0,0 +1,5 @@
// generated by `sqlx migrate build-script`
fn main() {
// trigger recompilation when a new migration is added
println!("cargo:rerun-if-changed=migrations");
}

View file

@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS patagia.public.Users(
id UUID PRIMARY KEY,
name VARCHAR(63) NOT NULL,
time_deleted TIMESTAMP WITH TIME ZONE, -- non-NULL if deleted
time_created TIMESTAMP WITH TIME ZONE NOT NULL,
time_modified TIMESTAMP WITH TIME ZONE NOT NULL
);

View file

@ -4,14 +4,14 @@ use dropshot::ApiDescription;
use std::sync::Arc;
use crate::context::ControllerContext;
use crate::updates;
use crate::user;
use crate::version;
type ControllerApiDescription = ApiDescription<Arc<ControllerContext>>;
pub fn api() -> Result<ControllerApiDescription> {
let mut api = ControllerApiDescription::new();
user::register_api(&mut api)?;
api.register(version::version)?;
api.register(updates::get_latest)?;
Ok(api)
}

View file

@ -1,8 +1,8 @@
use anyhow::{anyhow, Result};
use clap::Parser;
use dropshot::{ConfigDropshot, ServerBuilder};
use slog::Drain;
use sqlx::postgres::PgPool;
use tracing_slog::TracingSlogDrain;
use std::net::SocketAddr;
@ -14,17 +14,46 @@ use patagia_controller::context::ControllerContext;
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Cli {}
struct Cli {
#[arg(
long = "telemetry-otlp-endpoint",
default_value = "http://localhost:4317",
env = "OTEL_EXPORTER_OTLP_ENDPOINT"
)]
otlp_endpoint: Option<String>,
#[arg(
long = "log-stderr",
short = 'v',
default_value = "false",
env = "LOG_TO_STDERR"
)]
log_stderr: bool,
#[arg(
long = "listen-address",
default_value = "0.0.0.0:9474",
env = "LISTEN_ADDRESS"
)]
listen_address: String,
#[arg(
long = "database-url",
default_value = "postgresql://localhost/patagia",
env = "DATABASE_URL"
)]
database_url: Option<String>,
}
#[tokio::main]
async fn main() -> Result<()> {
let _args = Cli::parse();
let _tracing = instrumentation::init_tracing_subscriber()?;
let args = Cli::parse();
let _tracing = instrumentation::init_tracing(args.otlp_endpoint.as_ref(), args.log_stderr)?;
tracing::info!("Patagia Controller");
let config = ConfigDropshot {
bind_address: SocketAddr::from_str("0.0.0.0:9474").unwrap(),
bind_address: SocketAddr::from_str(&args.listen_address).unwrap(),
..Default::default()
};
@ -35,7 +64,19 @@ async fn main() -> Result<()> {
slog::Logger::root(async_drain, slog::o!())
};
let ctx = ControllerContext::new();
let database_url = args.database_url.unwrap();
tracing::info!(
database_url,
listen_address = args.listen_address,
"Starting server"
);
let pg = PgPool::connect(&database_url).await?;
sqlx::migrate!().run(&pg).await?;
let ctx = ControllerContext::new(pg);
let api = api::api()?;
ServerBuilder::new(api, Arc::new(ctx), logger)
.config(config)

View file

@ -1,13 +1,11 @@
pub struct ControllerContext {}
use sqlx::postgres::PgPool;
pub struct ControllerContext {
pub pg_pool: PgPool,
}
impl ControllerContext {
pub fn new() -> ControllerContext {
ControllerContext {}
}
}
impl Default for ControllerContext {
fn default() -> Self {
Self::new()
pub fn new(pg_pool: PgPool) -> ControllerContext {
ControllerContext { pg_pool }
}
}

View file

@ -1,5 +1,5 @@
pub mod api;
pub mod context;
mod updates;
mod user;
mod version;

View file

@ -1,38 +0,0 @@
use dropshot::{endpoint, HttpError, HttpResponseOk, RequestContext};
use schemars::JsonSchema;
use serde::Serialize;
use std::sync::Arc;
use crate::context::ControllerContext;
use trace_request::trace_request;
/// Update information
#[derive(Serialize, JsonSchema)]
struct UpdateInfo {
name: String,
}
/// Fetch update info
#[endpoint {
method = GET,
path = "/updates/latest",
}]
#[trace_request]
pub(crate) async fn get_latest(
rqctx: RequestContext<Arc<ControllerContext>>,
) -> Result<HttpResponseOk<UpdateInfo>, HttpError> {
let upd = UpdateInfo {
name: "Hello".to_string(),
};
tracing::info_span!("Hello, span!");
async move {
tracing::info!("Someone made a request to /updates/latest");
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
.instrument(tracing::info_span!("Let's do the thing...."))
.await;
Ok(HttpResponseOk(upd))
}

110
controller/src/user/api.rs Normal file
View file

@ -0,0 +1,110 @@
use dropshot::{
endpoint, EmptyScanParams, HttpError, HttpResponseOk, PaginationParams, Path, Query,
RequestContext, ResultsPage, WhichPage,
};
use dropshot::{ApiDescription, ApiDescriptionRegisterError};
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use trace_request::trace_request;
use uuid::Uuid;
use std::sync::Arc;
use super::User;
use crate::context::ControllerContext;
#[derive(Deserialize, JsonSchema, Serialize)]
#[serde(rename_all = "camelCase")]
struct UsersPathParams {
user_id: Uuid,
}
#[derive(Deserialize, JsonSchema, Serialize)]
#[serde(rename_all = "camelCase")]
struct UserPage {
user_id: Uuid,
}
pub fn register_api(
api: &mut ApiDescription<Arc<ControllerContext>>,
) -> Result<(), ApiDescriptionRegisterError> {
api.register(get_user_by_id)?;
api.register(list_users)
}
/// Fetch user info.
#[endpoint {
method = GET,
path = "/users/{userId}",
tags = [ "user" ],
}]
#[trace_request]
async fn get_user_by_id(
rqctx: RequestContext<Arc<ControllerContext>>,
params: Path<UsersPathParams>,
) -> Result<HttpResponseOk<User>, HttpError> {
let id = params.into_inner().user_id;
tracing::debug!(id = id.to_string(), "Getting user by id");
let pg = rqctx.context().pg_pool.to_owned();
let rec = sqlx::query!(r#"SELECT * FROM users WHERE id = $1"#, id)
.fetch_one(&pg)
.await
.map_err(|e| match e {
sqlx::Error::RowNotFound => {
HttpError::for_not_found(None, format!("User not found by id: {:?}", id))
}
err => HttpError::for_internal_error(format!("Error: {}", err)),
})?;
let user = User {
id: rec.id,
name: rec.name,
};
Ok(HttpResponseOk(user))
}
/// List users
#[endpoint {
method = GET,
path = "/users",
tags = [ "user" ],
}]
#[trace_request]
async fn list_users(
rqctx: RequestContext<Arc<ControllerContext>>,
query: Query<PaginationParams<EmptyScanParams, UserPage>>,
) -> Result<HttpResponseOk<ResultsPage<User>>, HttpError> {
let pag_params = query.into_inner();
let limit = rqctx.page_limit(&pag_params)?.get() as i64;
let pg = rqctx.context().pg_pool.to_owned();
let last_seen = match &pag_params.page {
WhichPage::Next(UserPage { user_id: id }) => Some(id),
_ => None,
};
let users = sqlx::query!(
r#"SELECT * FROM users WHERE id > coalesce($1, '00000000-0000-0000-0000-000000000000'::UUID) ORDER BY id LIMIT $2"#,
last_seen,
limit
)
.fetch_all(&pg)
.await
.map_err(|e| HttpError::for_internal_error(format!("Error: {}", e)))?
.into_iter()
.map(|rec| User {
id: rec.id,
name: rec.name,
})
.collect();
Ok(HttpResponseOk(ResultsPage::new(
users,
&EmptyScanParams {},
|u: &User, _| UserPage { user_id: u.id },
)?))
}

View file

@ -0,0 +1,14 @@
use schemars::JsonSchema;
use serde::Serialize;
use uuid::Uuid;
mod api;
pub use self::api::register_api;
/// User
#[derive(Serialize, JsonSchema)]
struct User {
id: Uuid,
name: String,
}

View file

@ -30,6 +30,8 @@ pub(crate) async fn version(
tracing::info_span!("Hello, span!");
tracing::info!(monotonic_counter.version_calls = 1);
async move {
tracing::info!("Someone made a request to /version");
tokio::time::sleep(std::time::Duration::from_millis(200)).await;

View file

@ -3,11 +3,11 @@
"advisory-db": {
"flake": false,
"locked": {
"lastModified": 1733749954,
"narHash": "sha256-2Ug80Uf/oUujxgh02Iy5vTG0V+Ab9+YUHuRLRY0ayiY=",
"lastModified": 1735928634,
"narHash": "sha256-Qg1vJOuEohAbdRmTTOLrbbGsyK9KRB54r3+aBuOMctM=",
"owner": "rustsec",
"repo": "advisory-db",
"rev": "ec9ce28714bb38d77a2223e7266df705500a7f11",
"rev": "63a2f39924f66ca89cf5761f299a8a244fe02543",
"type": "github"
},
"original": {
@ -18,11 +18,11 @@
},
"crane": {
"locked": {
"lastModified": 1734324364,
"narHash": "sha256-omYTR59TdH0AumP1cfh49fBnWZ52HjfdNfaLzCMZBx0=",
"lastModified": 1736101677,
"narHash": "sha256-iKOPq86AOWCohuzxwFy/MtC8PcSVGnrxBOvxpjpzrAY=",
"owner": "ipetkov",
"repo": "crane",
"rev": "60d7623f1320470bf2fdb92fd2dca1e9a27b98ce",
"rev": "61ba163d85e5adeddc7b3a69bb174034965965b2",
"type": "github"
},
"original": {
@ -66,11 +66,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1734083684,
"narHash": "sha256-5fNndbndxSx5d+C/D0p/VF32xDiJCJzyOqorOYW4JEo=",
"lastModified": 1736061677,
"narHash": "sha256-DjkQPnkAfd7eB522PwnkGhOMuT9QVCZspDpJJYyOj60=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "314e12ba369ccdb9b352a4db26ff419f7c49fa84",
"rev": "cbd8ec4de4469333c82ff40d057350c30e9f7d36",
"type": "github"
},
"original": {
@ -98,11 +98,11 @@
},
"nixpkgs_3": {
"locked": {
"lastModified": 1733097829,
"narHash": "sha256-9hbb1rqGelllb4kVUCZ307G2k3/UhmA8PPGBoyuWaSw=",
"lastModified": 1735554305,
"narHash": "sha256-zExSA1i/b+1NMRhGGLtNfFGXgLtgo+dcuzHzaWA6w3Q=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "2c15aa59df0017ca140d9ba302412298ab4bf22a",
"rev": "0e82ab234249d8eee3e8c91437802b32c74bb3fd",
"type": "github"
},
"original": {
@ -128,11 +128,11 @@
"nixpkgs": "nixpkgs_2"
},
"locked": {
"lastModified": 1734316514,
"narHash": "sha256-0aLx44yMblcOGpfFXKCzp2GhU5JaE6OTvdU+JYrXiUc=",
"lastModified": 1736044260,
"narHash": "sha256-DTAr0mAd8AZwWgRtU9ZZFPz3DwNeoH/Oi/1QMSqc9YQ=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "83ee8ff74d6294a7657320f16814754c4594127b",
"rev": "c8ed24cc104ebbc218d992e208131e9f024b69f0",
"type": "github"
},
"original": {
@ -161,11 +161,11 @@
"nixpkgs": "nixpkgs_3"
},
"locked": {
"lastModified": 1733761991,
"narHash": "sha256-s4DalCDepD22jtKL5Nw6f4LP5UwoMcPzPZgHWjAfqbQ=",
"lastModified": 1736115332,
"narHash": "sha256-FBG9d7e0BTFfxVdw4b5EmNll2Mv7hfRc54hbB4LrKko=",
"owner": "numtide",
"repo": "treefmt-nix",
"rev": "0ce9d149d99bc383d1f2d85f31f6ebd146e46085",
"rev": "1788ca5acd4b542b923d4757d4cfe4183cc6a92d",
"type": "github"
},
"original": {

View file

@ -49,6 +49,8 @@
root = ./.;
fileset = pkgs.lib.fileset.unions [
./api.json
./controller/.sqlx
./controller/migrations
(craneLib.fileset.commonCargoSources ./.)
];
};
@ -116,6 +118,7 @@
formatter =
(treefmt-nix.lib.evalModule pkgs {
projectRootFile = "flake.nix";
programs = {
nixfmt.enable = true;
nixfmt.package = pkgs.nixfmt-rfc-style;
@ -139,18 +142,15 @@
cargo-watch
hyperfine
just
nixfmt-rfc-style
rust-dev-toolchain
sqls
sqlx-cli
watchexec
]
++ commonArgs.buildInputs;
RUST_BACKTRACE = 1;
RUST_SRC_PATH = pkgs.rustPlatform.rustLibSrc; # Required for rust-analyzer
shellHook = ''
echo
echo "🛠 Welcome to the Patagia development environment 🛠"
echo "Run 'just' to see available commands."
echo
'';
};
}

View file

@ -1,16 +1,24 @@
[package]
name = "instrumentation"
version = "0.1.0"
edition = "2021"
license = "MPL-2.0"
version.workspace = true
[dependencies]
anyhow.workspace = true
http.workspace = true
once_cell.workspace = true
opentelemetry-otlp.workspace = true
opentelemetry_sdk.workspace = true
opentelemetry-semantic-conventions.workspace = true
opentelemetry.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
opentelemetry-otlp = { version = "0.27.0", features = ["grpc-tonic", "gzip-tonic", "zstd-tonic", "tls", "tls-roots", "trace"] }
opentelemetry_sdk = { version = "0.27.1", features = ["metrics", "rt-tokio"] }
opentelemetry-semantic-conventions = "0.27.0"
opentelemetry-appender-tracing = { version = "0.27.0", features = ["log", "experimental_metadata_attributes"] }
opentelemetry-resource-detectors = { version = "0.6.0" }
opentelemetry = "0.27.1"
tonic = "0.12.3"
tracing-opentelemetry = "0.28.0"
tracing-subscriber = { version = "0.3.19", default-features = false, features = [
"std",
"ansi",
"env-filter",
"fmt",
] }

View file

@ -1,102 +0,0 @@
use anyhow::{anyhow, Result};
use once_cell::sync::Lazy;
use opentelemetry::{trace::TracerProvider as _, KeyValue};
use opentelemetry_sdk::{
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
runtime,
trace::{RandomIdGenerator, Sampler, TracerProvider},
Resource,
};
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
static RESOURCE: Lazy<Resource> = Lazy::new(|| {
Resource::new(vec![
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
env!("CARGO_PKG_NAME"),
),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_VERSION,
env!("CARGO_PKG_VERSION"),
),
])
});
// Construct MeterProvider for MetricsLayer
fn init_meter_provider() -> Result<SdkMeterProvider> {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()
.map_err(|e| anyhow!("Error creating OTLP metric exporter: {:?}", e))?;
let meter_provider = MeterProviderBuilder::default()
.with_resource(RESOURCE.clone())
.with_reader(
PeriodicReader::builder(exporter, runtime::Tokio)
.with_interval(std::time::Duration::from_secs(10))
.build(),
)
.build();
opentelemetry::global::set_meter_provider(meter_provider.clone());
Ok(meter_provider)
}
// Construct TracerProvider for OpenTelemetryLayer
fn init_tracer_provider() -> Result<TracerProvider> {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.build()
.map_err(|e| anyhow!("Error creating OTLP span exporter: {:?}", e))?;
let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
1.0,
))))
.with_resource(RESOURCE.clone())
.with_id_generator(RandomIdGenerator::default())
.with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
.build();
Ok(tracer_provider)
}
// Initialize tracing-subscriber and return OtelGuard for opentelemetry-related termination processing
pub fn init_tracing_subscriber() -> Result<OtelGuard> {
let meter_provider = init_meter_provider()?;
let tracer_provider = init_tracer_provider()?;
let tracer = tracer_provider.tracer("tracing-otel-subscriber");
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer())
.with(MetricsLayer::new(meter_provider.clone()))
.with(OpenTelemetryLayer::new(tracer))
.init();
Ok(OtelGuard {
meter_provider,
tracer_provider,
})
}
pub struct OtelGuard {
meter_provider: SdkMeterProvider,
tracer_provider: TracerProvider,
}
impl Drop for OtelGuard {
fn drop(&mut self) {
if let Err(err) = self.tracer_provider.shutdown() {
eprintln!("{err:?}");
}
if let Err(err) = self.meter_provider.shutdown() {
eprintln!("{err:?}");
}
}
}

View file

@ -1,39 +1,33 @@
use anyhow::{anyhow, Result};
use once_cell::sync::Lazy;
use opentelemetry::{trace::TracerProvider as _, KeyValue};
use opentelemetry_otlp::{WithExportConfig, WithTonicConfig};
use opentelemetry_sdk::{
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
propagation::TraceContextPropagator,
runtime,
trace::{RandomIdGenerator, Sampler, TracerProvider},
Resource,
};
use opentelemetry_semantic_conventions as semcov;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
static RESOURCE: Lazy<Resource> = Lazy::new(|| {
Resource::new(vec![
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
env!("CARGO_PKG_NAME"),
),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_VERSION,
env!("CARGO_PKG_VERSION"),
),
])
});
use std::time::Duration;
// Construct MeterProvider for MetricsLayer
fn init_meter_provider() -> Result<SdkMeterProvider> {
fn init_meter_provider(otel_endpoint: &String, resource: Resource) -> Result<SdkMeterProvider> {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(otel_endpoint)
.with_tls_config(tonic::transport::ClientTlsConfig::new().with_native_roots())
.with_compression(opentelemetry_otlp::Compression::Gzip)
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()
.map_err(|e| anyhow!("Error creating OTLP metric exporter: {:?}", e))?;
let meter_provider = MeterProviderBuilder::default()
.with_resource(RESOURCE.clone())
.with_resource(resource)
.with_reader(
PeriodicReader::builder(exporter, runtime::Tokio)
.with_interval(std::time::Duration::from_secs(10))
@ -47,9 +41,12 @@ fn init_meter_provider() -> Result<SdkMeterProvider> {
}
// Construct TracerProvider for OpenTelemetryLayer
fn init_tracer_provider() -> Result<TracerProvider> {
fn init_tracer_provider(otel_endpoint: &String, resource: Resource) -> Result<TracerProvider> {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_tls_config(tonic::transport::ClientTlsConfig::new().with_native_roots())
.with_compression(opentelemetry_otlp::Compression::Gzip)
.with_endpoint(otel_endpoint)
.build()
.map_err(|e| anyhow!("Error creating OTLP span exporter: {:?}", e))?;
@ -57,7 +54,7 @@ fn init_tracer_provider() -> Result<TracerProvider> {
.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
1.0,
))))
.with_resource(RESOURCE.clone())
.with_resource(resource)
.with_id_generator(RandomIdGenerator::default())
.with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
.build();
@ -65,38 +62,93 @@ fn init_tracer_provider() -> Result<TracerProvider> {
Ok(tracer_provider)
}
// Initialize tracing-subscriber and return OtelGuard for opentelemetry-related termination processing
pub fn init_tracing_subscriber() -> Result<OtelGuard> {
let meter_provider = init_meter_provider()?;
let tracer_provider = init_tracer_provider()?;
// Initialize tracing-subscriber and return TracingGuard for opentelemetry-related termination processing
pub fn init_tracing(otel_endpoint: Option<&String>, log_stderr: bool) -> Result<TracingGuard> {
let resource = {
let r = Resource::new([KeyValue::new(
semcov::resource::SERVICE_VERSION,
env!("CARGO_PKG_VERSION"),
)]);
let tracer = tracer_provider.tracer("tracing-otel-subscriber");
let detected = Resource::from_detectors(
Duration::from_secs(5),
vec![
Box::new(opentelemetry_sdk::resource::SdkProvidedResourceDetector),
Box::new(opentelemetry_sdk::resource::EnvResourceDetector::new()),
Box::new(opentelemetry_resource_detectors::OsResourceDetector),
Box::new(opentelemetry_resource_detectors::ProcessResourceDetector),
Box::new(opentelemetry_sdk::resource::TelemetryResourceDetector),
],
);
r.merge(&detected)
};
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(""));
let fmt_layer = match log_stderr {
true => Some(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)),
false => None,
};
let meter_provider = match otel_endpoint {
Some(endpoint) => Some(init_meter_provider(endpoint, resource.clone())?),
None => None,
};
let metrics_layer = meter_provider
.as_ref()
.map(|p| MetricsLayer::new(p.to_owned()));
let tracer_provider = match otel_endpoint {
Some(endpoint) => Some(init_tracer_provider(endpoint, resource)?),
None => None,
};
let trace_layer = tracer_provider
.as_ref()
.map(|p| OpenTelemetryLayer::new(p.tracer("tracing-otel-subscriber")));
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer())
.with(MetricsLayer::new(meter_provider.clone()))
.with(OpenTelemetryLayer::new(tracer))
.with(env_filter)
.with(fmt_layer)
.with(metrics_layer)
.with(trace_layer)
.init();
Ok(OtelGuard {
Ok(TracingGuard {
meter_provider,
tracer_provider,
})
}
pub struct OtelGuard {
meter_provider: SdkMeterProvider,
tracer_provider: TracerProvider,
pub struct TracingGuard {
meter_provider: Option<SdkMeterProvider>,
tracer_provider: Option<TracerProvider>,
}
impl Drop for OtelGuard {
impl Drop for TracingGuard {
fn drop(&mut self) {
if let Err(err) = self.tracer_provider.shutdown() {
eprintln!("{err:?}");
if let Some(tracer_provider) = &self.tracer_provider {
for result in tracer_provider.force_flush() {
if let Err(err) = result {
eprintln!("{err:?}");
}
}
if let Err(err) = tracer_provider.shutdown() {
eprintln!("{err:?}");
}
}
if let Err(err) = self.meter_provider.shutdown() {
eprintln!("{err:?}");
if let Some(meter_provider) = &self.meter_provider {
if let Err(err) = meter_provider.force_flush() {
eprintln!("{err:?}");
}
if let Err(err) = meter_provider.shutdown() {
eprintln!("{err:?}");
}
}
}
}

View file

@ -5,11 +5,13 @@ default:
@just --choose
# Run controller
[group('controller')]
run-controller $RUST_LOG="debug,h2=info,hyper_util=info,tower=info":
cargo run --package patagia-controller
cargo run --package patagia-controller -- --log-stderr
# Run controller local development
dev-controller:
[group('controller')]
dev-controller: dev-controller-db-migrate
watchexec --clear --restart --stop-signal INT --debounce 300ms -- just run-controller
# Run agent
@ -48,9 +50,63 @@ machete:
open-api:
cargo xtask open-api
# Update OpenAPI spec
gen-open-api:
cargo xtask open-api > api.json
# Run all tests
check: check-nix
check: check-nix
# check-nix
check-nix:
nix flake check
# Run PostgreSQL for development and testing
[group('controller')]
dev-postgres:
#!/usr/bin/env sh
if podman ps --filter "name=patagia-postgres" --filter "status=running" -q | grep -q .; then
exit 0
fi
mkdir -p "${XDG_RUNTIME_DIR}/patagia-postgres"
podman volume exists patagia-postgres || podman volume create patagia-postgres
podman run \
--detach \
--replace \
--name patagia-postgres \
--env POSTGRES_DB=patagia \
--env POSTGRES_USER=patagia \
--env POSTGRES_PASSWORD=swordfish \
--volume patagia-postgres:/var/lib/postgresql/data \
--volume "${XDG_RUNTIME_DIR}/patagia-postgres:/var/run/postgresql" \
docker.io/postgres:17
sleep 0.3
# Clean up PostgreSQL data
[group('controller')]
dev-postgres-clean:
podman rm -f patagia-postgres || true
podman volume rm patagia-postgres || true
# Connect to PostgreSQL with psql
[group('controller')]
dev-postgres-psql:
podman exec -it patagia-postgres psql -U patagia
[group('controller')]
[working-directory: 'controller']
dev-controller-db-migrate: dev-postgres
cargo sqlx migrate run
[group('controller')]
[working-directory: 'controller']
dev-controller-db-reset:
cargo sqlx db reset -y
[group('controller')]
[working-directory: 'controller']
gen-controller-sqlx-prepare:
cargo sqlx prepare
gen: gen-open-api gen-controller-sqlx-prepare fmt

View file

@ -1,8 +1,8 @@
[package]
name = "trace-request"
version = "0.1.0"
edition = "2021"
license = "MPL-2.0"
version.workspace = true
[lib]
proc-macro = true

View file

@ -50,8 +50,10 @@ pub fn trace_request(_attr: TokenStream, input: TokenStream) -> TokenStream {
let span = tracing::Span::current();
span.record("http.status", &status.as_str());
if let Err(err) = &result {
span.record("error", true);
span.record("error.external", &err.external_message);
span.record("error.internal", &err.internal_message);
}
result
@ -62,8 +64,9 @@ pub fn trace_request(_attr: TokenStream, input: TokenStream) -> TokenStream {
"http.method" = req.method().as_str(),
"http.uri" = req.uri().to_string(),
"http.status" = tracing::field::Empty,
"error" = tracing::field::Empty,
"error.external" = tracing::field::Empty,
"error.internal" = tracing::field::Empty
"error.internal" = tracing::field::Empty,
)
).await
}

View file

@ -1,7 +1,7 @@
[package]
name = "xtask"
version = "0.0.0"
edition = "2021"
version.workspace = true
[[bin]]
name = "xtask"