diff --git a/controller/src/api.rs b/controller/src/api.rs index 4728059..5be86ce 100644 --- a/controller/src/api.rs +++ b/controller/src/api.rs @@ -11,7 +11,7 @@ type ControllerApiDescription = ApiDescription<Arc<ControllerContext>>; pub fn api() -> Result<ControllerApiDescription> { let mut api = ControllerApiDescription::new(); - api.register(user::get_user_by_id)?; + user::register_api(&mut api)?; api.register(version::version)?; Ok(api) } diff --git a/controller/src/lib.rs b/controller/src/lib.rs index 2774561..2d12df1 100644 --- a/controller/src/lib.rs +++ b/controller/src/lib.rs @@ -3,4 +3,3 @@ pub mod context; mod user; mod version; - diff --git a/controller/src/user.rs b/controller/src/user.rs deleted file mode 100644 index 7f203ef..0000000 --- a/controller/src/user.rs +++ /dev/null @@ -1,58 +0,0 @@ -use clap::error::RichFormatter; -use dropshot::{endpoint, HttpError, HttpResponseOk, Path, RequestContext}; -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; -use trace_request::trace_request; -use uuid::Uuid; - -use std::sync::Arc; - -use crate::context::ControllerContext; - -/// User -#[derive(Serialize, JsonSchema)] -struct User { - id: Uuid, - name: String, -} - -#[allow(dead_code)] -#[derive(Deserialize, JsonSchema)] -#[serde(rename_all = "camelCase")] -struct UsersPathParams { - user_id: Uuid, -} - -/// Fetch user info. -#[endpoint { - method = GET, - path = "/users/{userId}", - tags = [ "user" ], -}] -#[trace_request] -pub(crate) async fn get_user_by_id( - rqctx: RequestContext<Arc<ControllerContext>>, - params: Path<UsersPathParams>, -) -> Result<HttpResponseOk<User>, HttpError> { - let id = params.into_inner().user_id; - tracing::debug!(id = id.to_string(), "Getting user by id"); - - let pg = rqctx.context().pg_pool.to_owned(); - - let rec = sqlx::query!(r#"SELECT * FROM users WHERE id = $1"#, id) - .fetch_one(&pg) - .await - .map_err(|e| match e { - sqlx::Error::RowNotFound => { - HttpError::for_not_found(None, format!("User not found by id: {:?}", id)) - } - err => HttpError::for_internal_error(format!("Error: {}", err)), - })?; - - let user = User { - id: rec.id, - name: rec.name, - }; - - Ok(HttpResponseOk(user)) -} diff --git a/controller/src/user/api.rs b/controller/src/user/api.rs new file mode 100644 index 0000000..67397d9 --- /dev/null +++ b/controller/src/user/api.rs @@ -0,0 +1,126 @@ +use dropshot::{ + endpoint, EmptyScanParams, HttpError, HttpResponseOk, PaginationParams, Path, Query, + RequestContext, ResultsPage, WhichPage, +}; +use dropshot::{ApiDescription, ApiDescriptionRegisterError}; +use schemars::JsonSchema; +use serde::Deserialize; +use serde::Serialize; +use trace_request::trace_request; +use uuid::Uuid; + +use std::sync::Arc; + +use super::User; +use crate::context::ControllerContext; + +#[derive(Deserialize, JsonSchema, Serialize)] +#[serde(rename_all = "camelCase")] +struct UsersPathParams { + user_id: Uuid, +} + +#[derive(Deserialize, JsonSchema, Serialize)] +#[serde(rename_all = "camelCase")] +struct UserPage { + user_id: Uuid, +} + +pub fn register_api( + api: &mut ApiDescription<Arc<ControllerContext>>, +) -> Result<(), ApiDescriptionRegisterError> { + api.register(get_user_by_id)?; + api.register(list_users) +} + +/// Fetch user info. +#[endpoint { + method = GET, + path = "/users/{userId}", + tags = [ "user" ], +}] +#[trace_request] +async fn get_user_by_id( + rqctx: RequestContext<Arc<ControllerContext>>, + params: Path<UsersPathParams>, +) -> Result<HttpResponseOk<User>, HttpError> { + let id = params.into_inner().user_id; + tracing::debug!(id = id.to_string(), "Getting user by id"); + + let pg = rqctx.context().pg_pool.to_owned(); + + let rec = sqlx::query!(r#"SELECT * FROM users WHERE id = $1"#, id) + .fetch_one(&pg) + .await + .map_err(|e| match e { + sqlx::Error::RowNotFound => { + HttpError::for_not_found(None, format!("User not found by id: {:?}", id)) + } + err => HttpError::for_internal_error(format!("Error: {}", err)), + })?; + + let user = User { + id: rec.id, + name: rec.name, + }; + + Ok(HttpResponseOk(user)) +} + +/// List users +#[endpoint { + method = GET, + path = "/users", + tags = [ "user" ], +}] +#[trace_request] +async fn list_users( + rqctx: RequestContext<Arc<ControllerContext>>, + query: Query<PaginationParams<EmptyScanParams, UserPage>>, +) -> Result<HttpResponseOk<ResultsPage<User>>, HttpError> { + let pag_params = query.into_inner(); + let limit = rqctx.page_limit(&pag_params)?.get() as usize; + let pg = rqctx.context().pg_pool.to_owned(); + + let users = match &pag_params.page { + WhichPage::First(..) => { + tracing::debug!("Listing users. First page"); + sqlx::query!(r#"SELECT * FROM users ORDER BY id LIMIT $1"#, limit as i64) + .fetch_all(&pg) + .await + .map_err(|e| HttpError::for_internal_error(format!("Error: {}", e)))? + .into_iter() + .map(|rec| User { + id: rec.id, + name: rec.name, + }) + .collect() + } + WhichPage::Next(UserPage { user_id: last_seen }) => { + tracing::debug!( + last_seen = last_seen.to_string(), + "Listing users. Next page" + ); + sqlx::query!( + r#"SELECT * FROM users WHERE id > $1 ORDER BY id LIMIT $2"#, + last_seen, + limit as i64 + ) + .fetch_all(&pg) + .await + .map_err(|e| HttpError::for_internal_error(format!("Error: {}", e)))? + .into_iter() + .map(|rec| User { + id: rec.id, + name: rec.name, + }) + .collect() + } + }; + + Ok(HttpResponseOk(ResultsPage::new( + users, + &EmptyScanParams {}, + |u: &User, _| UserPage { user_id: u.id }, + )?)) +} diff --git a/controller/src/user/mod.rs b/controller/src/user/mod.rs new file mode 100644 index 0000000..46397e9 --- /dev/null +++ b/controller/src/user/mod.rs @@ -0,0 +1,14 @@ +use schemars::JsonSchema; +use serde::Serialize; +use uuid::Uuid; + +mod api; + +pub use self::api::register_api; + +/// User +#[derive(Serialize, JsonSchema)] +struct User { + id: Uuid, + name: String, +} diff --git a/instrumentation/src/lib.rs b/instrumentation/src/lib.rs index 31df7f6..fd69950 100644 --- a/instrumentation/src/lib.rs +++ b/instrumentation/src/lib.rs @@ -96,20 +96,18 @@ pub fn init_tracing(otel_endpoint: Option<&String>, log_stderr: bool) -> Result< None => None, }; - let metrics_layer = match meter_provider { - Some(ref p) => Some(MetricsLayer::new(p.to_owned())), - None => None, - }; + let metrics_layer = meter_provider + .as_ref() + .map(|p| MetricsLayer::new(p.to_owned())); let tracer_provider = match otel_endpoint { Some(endpoint) => Some(init_tracer_provider(endpoint, resource)?), None => None, }; - let trace_layer = match tracer_provider { - Some(ref p) => Some(OpenTelemetryLayer::new(p.tracer("tracing-otel-subscriber"))), - None => None, - }; + let trace_layer = tracer_provider + .as_ref() + .map(|p| OpenTelemetryLayer::new(p.tracer("tracing-otel-subscriber"))); opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); diff --git a/justfile b/justfile index 5c4779f..9dabd7b 100644 --- a/justfile +++ b/justfile @@ -11,7 +11,7 @@ run-controller $RUST_LOG="debug,h2=info,hyper_util=info,tower=info": # Run controller local development [group('controller')] -dev-controller: +dev-controller: dev-controller-db-migrate watchexec --clear --restart --stop-signal INT --debounce 300ms -- just run-controller # Run agent @@ -76,6 +76,7 @@ dev-postgres: --volume patagia-postgres:/var/lib/postgresql/data \ --volume "${XDG_RUNTIME_DIR}/patagia-postgres:/var/run/postgresql" \ docker.io/postgres:17 + sleep 0.3 # Clean up PostgreSQL data [group('controller')] @@ -90,7 +91,7 @@ dev-postgres-psql: [group('controller')] [working-directory: 'controller'] -dev-controller-db-migrate: +dev-controller-db-migrate: dev-postgres cargo sqlx migrate run [group('controller')]