diff --git a/Cargo.lock b/Cargo.lock index 5226ac6..7cd8204 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -993,8 +993,19 @@ name = "hostd" version = "0.2.0" dependencies = [ "anyhow", - "ipc", + "clap", + "dropshot", + "http", + "schemars", + "serde", + "slog", + "slog-async", + "sqlx", "systemd-ipc", + "tokio", + "trace-request", + "tracing", + "tracing-slog", "varlink", ] diff --git a/hostd/Cargo.toml b/hostd/Cargo.toml index 4a8dee3..5eab87f 100644 --- a/hostd/Cargo.toml +++ b/hostd/Cargo.toml @@ -5,6 +5,19 @@ edition.workspace = true [dependencies] anyhow.workspace = true +tokio.workspace = true +sqlx = { version = "0.8.3", default-features = false, features = [ + "macros", "migrate", "postgres", "runtime-tokio", "tls-rustls", "time", "uuid" + ] } varlink = "11.0.1" -ipc = { path = "../ipc" } systemd-ipc = { path = "../systemd-ipc" } +dropshot.workspace = true +clap.workspace = true +slog.workspace = true +slog-async.workspace = true +tracing-slog.workspace = true +tracing.workspace = true +trace-request = { path = "../trace-request" } +schemars.workspace = true +serde.workspace = true +http.workspace = true diff --git a/hostd/src/api.rs b/hostd/src/api.rs new file mode 100644 index 0000000..edc0fcc --- /dev/null +++ b/hostd/src/api.rs @@ -0,0 +1,15 @@ +use anyhow::Result; +use dropshot::ApiDescription; + +use std::sync::Arc; + +use crate::context::ControllerContext; +use crate::machine; + +type ControllerApiDescription = ApiDescription<Arc<ControllerContext>>; + +pub fn api() -> Result<ControllerApiDescription> { + let mut api = ControllerApiDescription::new(); + api.register(machine::describe)?; + Ok(api) +} diff --git a/hostd/src/bin/hostd-controller.rs b/hostd/src/bin/hostd-controller.rs new file mode 100644 index 0000000..f6a7eb5 --- /dev/null +++ b/hostd/src/bin/hostd-controller.rs @@ -0,0 +1,79 @@ +use anyhow::{anyhow, Result}; + +use clap::Parser; +use dropshot::{ConfigDropshot, ServerBuilder}; +use slog::Drain; +use sqlx::PgPool; +use std::net::SocketAddr; +use std::str::FromStr; +use std::sync::Arc; +use tracing_slog::TracingSlogDrain; + +use hostd::api; +use hostd::context::ControllerContext; + +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Cli { + #[arg( + long = "telemetry-otlp-endpoint", + default_value = "http://localhost:4317", + env = "OTEL_EXPORTER_OTLP_ENDPOINT" + )] + otlp_endpoint: Option<String>, + + #[arg( + long = "log-stderr", + short = 'v', + default_value = "false", + env = "LOG_TO_STDERR" + )] + log_stderr: bool, + + #[arg( + long = "listen-address", + default_value = "0.0.0.0:9478", + env = "LISTEN_ADDRESS" + )] + listen_address: String, + + #[arg( + long = "database-url", + default_value = "postgresql://localhost/patagia", + env = "DATABASE_URL" + )] + database_url: Option<String>, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Cli::parse(); + + let config = ConfigDropshot { + bind_address: SocketAddr::from_str(&args.listen_address).unwrap(), + ..Default::default() + }; + + let logger = { + let level_drain = slog::LevelFilter(TracingSlogDrain, slog::Level::Debug).fuse(); + let async_drain = slog_async::Async::new(level_drain).build().fuse(); + slog::Logger::root(async_drain, slog::o!()) + }; + + // let database_url = args.database_url.unwrap(); + // dont connect to database pass null + // let pg = PgPool::connect(&database_url).await?; + + // sqlx::migrate!().run(&pg).await?; + + let ctx = ControllerContext::new(); + + let api = api::api()?; + + ServerBuilder::new(api, Arc::new(ctx), logger) + .config(config) + .start() + .map_err(|e| anyhow!("Error starting server: {:?}", e))? + .await + .map_err(|e| anyhow!(e)) +} diff --git a/hostd/src/context.rs b/hostd/src/context.rs new file mode 100644 index 0000000..e50d955 --- /dev/null +++ b/hostd/src/context.rs @@ -0,0 +1,11 @@ +// use sqlx::postgres::PgPool; + +pub struct ControllerContext { + // pub pg_pool: PgPool, +} + +impl ControllerContext { + pub fn new() -> ControllerContext { + ControllerContext {} + } +} diff --git a/hostd/src/lib.rs b/hostd/src/lib.rs new file mode 100644 index 0000000..94f3a31 --- /dev/null +++ b/hostd/src/lib.rs @@ -0,0 +1,3 @@ +pub mod api; +pub mod context; +pub mod machine; diff --git a/hostd/src/machine.rs b/hostd/src/machine.rs new file mode 100644 index 0000000..39d8437 --- /dev/null +++ b/hostd/src/machine.rs @@ -0,0 +1,46 @@ +use dropshot::{endpoint, HttpError, HttpResponseOk, RequestContext}; +use schemars::JsonSchema; +use serde::Serialize; +use trace_request::trace_request; + +use systemd_ipc::addrs::SYSTEMD_HOSTNAME; +use systemd_ipc::io_systemd_hostname::{self, VarlinkClientInterface}; + +use std::sync::Arc; + +use std::thread; + +use crate::context::ControllerContext; + +/// Version and build information +#[derive(Serialize, JsonSchema)] +struct MachineInfo { + machine_id: String, +} + +/// Fetch machine id +#[endpoint { + method = GET, + path = "/machine_id", +}] +#[trace_request] +pub async fn describe( + rqctx: RequestContext<Arc<ControllerContext>>, +) -> Result<HttpResponseOk<MachineInfo>, HttpError> { + // Connect to systemd.Hostname + // Make it tokio task blocking + tokio::task::block_in_place(move || { + let conn = varlink::Connection::with_address(SYSTEMD_HOSTNAME).unwrap(); + let mut sd = io_systemd_hostname::VarlinkClient::new(conn); + let machine_id = sd.describe().call().unwrap().MachineID; + + tracing::info_span!("Hello, span hostd!"); + + tracing::info!(monotonic_counter.version_calls = 1); + + let machine_info = MachineInfo { machine_id }; + thread::sleep(std::time::Duration::from_millis(10000)); + + Ok(HttpResponseOk(machine_info)) + }) +} diff --git a/hostd/src/main.rs b/hostd/src/main.rs deleted file mode 100644 index f5d0bea..0000000 --- a/hostd/src/main.rs +++ /dev/null @@ -1,59 +0,0 @@ -use anyhow::Result; - -use ipc::io_patagia_hostd; -use systemd_ipc::addrs::SYSTEMD_HOSTNAME; -use systemd_ipc::io_systemd_hostname::{self, VarlinkClientInterface}; - -struct PatagiaHostd; - -impl io_patagia_hostd::VarlinkInterface for PatagiaHostd { - fn apply( - &self, - call: &mut dyn io_patagia_hostd::Call_Apply, - machine: io_patagia_hostd::Machine, - ) -> varlink::Result<()> { - // FIXME: Do something useful - println!("Applying machine config: {:#?}", machine); - call.reply() - } - - fn describe(&self, call: &mut dyn io_patagia_hostd::Call_Describe) -> varlink::Result<()> { - // Connect to systemd.Hostname - let conn = varlink::Connection::with_address(SYSTEMD_HOSTNAME).unwrap(); - let mut sd = io_systemd_hostname::VarlinkClient::new(conn); - - let machine = io_patagia_hostd::Machine { - machineId: sd.describe().call().unwrap().MachineID, - nodeLabels: None, - patagiaAgent: None, - }; - call.reply(machine) - } -} - -fn main() -> Result<()> { - let hostd = PatagiaHostd; - let hostd_iface = io_patagia_hostd::new(Box::new(hostd)); - - let svc = varlink::VarlinkService::new( - "io.patagia.hostd", - "Host controller for patagia", - "0.1", - "https://patagia.dev", - vec![Box::new(hostd_iface)], - ); - - let addr = format!("unix:{}/{}", env!("XDG_RUNTIME_DIR"), "io.patagia.hostd"); - - println!("Varlink Listening on {}", addr); - - varlink::listen( - svc, - &addr, - &varlink::ListenConfig { - ..Default::default() - }, - )?; - - Ok(()) -} diff --git a/ipc/.gitignore b/ipc/.gitignore deleted file mode 100644 index ea8c4bf..0000000 --- a/ipc/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/target diff --git a/ipc/Cargo.toml b/ipc/Cargo.toml deleted file mode 100644 index b26a1f5..0000000 --- a/ipc/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -name = "ipc" -version.workspace = true -edition.workspace = true - -[dependencies] -serde.workspace = true -serde_derive = "1.0.217" -serde_json = "1.0.135" -varlink = "11.0.1" - -[build-dependencies] -varlink_generator = "10.1.0" -walkdir = "2.5.0" - -[package.metadata.cargo-machete] -ignored = ["serde"] diff --git a/ipc/build.rs b/ipc/build.rs deleted file mode 100644 index fcec0de..0000000 --- a/ipc/build.rs +++ /dev/null @@ -1,12 +0,0 @@ -extern crate varlink_generator; - -use walkdir::WalkDir; - -fn main() { - println!("cargo:rerun-if-changed=src/*.varlink"); - for entry in WalkDir::new("src").into_iter().filter_map(|e| e.ok()) { - if entry.file_name().to_str().unwrap().ends_with(".varlink") { - varlink_generator::cargo_build_tosource(&entry.path().display().to_string(), true); - } - } -} diff --git a/ipc/src/io.patagia.hostd.varlink b/ipc/src/io.patagia.hostd.varlink deleted file mode 100644 index ea34a12..0000000 --- a/ipc/src/io.patagia.hostd.varlink +++ /dev/null @@ -1,26 +0,0 @@ -interface io.patagia.hostd - -type Label ( - key: string, - value: string -) - -type PatagiaAgentConfig ( - url: ?string -) - -type Machine( - machineId: string, - nodeLabels: ?[]Label, - patagiaAgent: ?PatagiaAgentConfig -) - -method Describe() -> ( - machine: Machine -) - -method Apply( - machine: Machine -) -> () - -error InvalidMachineConfig() diff --git a/ipc/src/io_patagia_hostd.rs b/ipc/src/io_patagia_hostd.rs deleted file mode 100644 index 19d0df5..0000000 --- a/ipc/src/io_patagia_hostd.rs +++ /dev/null @@ -1,263 +0,0 @@ -#![doc = "This file was automatically generated by the varlink rust generator"] -#![allow(non_camel_case_types)] -#![allow(non_snake_case)] -use serde_derive::{Deserialize, Serialize}; -use std::io::BufRead; -use std::sync::{Arc, RwLock}; -use varlink::{self, CallTrait}; -#[allow(dead_code)] -#[derive(Clone, PartialEq, Debug)] -#[allow(clippy::enum_variant_names)] -pub enum ErrorKind { - Varlink_Error, - VarlinkReply_Error, - InvalidMachineConfig(Option<InvalidMachineConfig_Args>), -} -impl ::std::fmt::Display for ErrorKind { - fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { - match self { - ErrorKind::Varlink_Error => write!(f, "Varlink Error"), - ErrorKind::VarlinkReply_Error => write!(f, "Varlink error reply"), - ErrorKind::InvalidMachineConfig(v) => { - write!(f, "io.patagia.hostd.InvalidMachineConfig: {:#?}", v) - } - } - } -} -pub struct Error( - pub ErrorKind, - pub Option<Box<dyn std::error::Error + 'static + Send + Sync>>, - pub Option<&'static str>, -); -impl Error { - #[allow(dead_code)] - pub fn kind(&self) -> &ErrorKind { - &self.0 - } -} -impl From<ErrorKind> for Error { - fn from(e: ErrorKind) -> Self { - Error(e, None, None) - } -} -impl std::error::Error for Error { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - self.1 - .as_ref() - .map(|e| e.as_ref() as &(dyn std::error::Error + 'static)) - } -} -impl std::fmt::Display for Error { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - std::fmt::Display::fmt(&self.0, f) - } -} -impl std::fmt::Debug for Error { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - use std::error::Error as StdError; - if let Some(ref o) = self.2 { - std::fmt::Display::fmt(o, f)?; - } - std::fmt::Debug::fmt(&self.0, f)?; - if let Some(e) = self.source() { - std::fmt::Display::fmt("\nCaused by:\n", f)?; - std::fmt::Debug::fmt(&e, f)?; - } - Ok(()) - } -} -#[allow(dead_code)] -pub type Result<T> = std::result::Result<T, Error>; -impl From<varlink::Error> for Error { - fn from(e: varlink::Error) -> Self { - match e.kind() { - varlink::ErrorKind::VarlinkErrorReply(r) => Error( - ErrorKind::from(r), - Some(Box::from(e)), - Some(concat!(file!(), ":", line!(), ": ")), - ), - _ => Error( - ErrorKind::Varlink_Error, - Some(Box::from(e)), - Some(concat!(file!(), ":", line!(), ": ")), - ), - } - } -} -#[allow(dead_code)] -impl Error { - pub fn source_varlink_kind(&self) -> Option<&varlink::ErrorKind> { - use std::error::Error as StdError; - let mut s: &dyn StdError = self; - while let Some(c) = s.source() { - let k = self - .source() - .and_then(|e| e.downcast_ref::<varlink::Error>()) - .map(|e| e.kind()); - if k.is_some() { - return k; - } - s = c; - } - None - } -} -impl From<&varlink::Reply> for ErrorKind { - #[allow(unused_variables)] - fn from(e: &varlink::Reply) -> Self { - match e { - varlink::Reply { - error: Some(ref t), .. - } if t == "io.patagia.hostd.InvalidMachineConfig" => match e { - varlink::Reply { - parameters: Some(p), - .. - } => match serde_json::from_value(p.clone()) { - Ok(v) => ErrorKind::InvalidMachineConfig(v), - Err(_) => ErrorKind::InvalidMachineConfig(None), - }, - _ => ErrorKind::InvalidMachineConfig(None), - }, - _ => ErrorKind::VarlinkReply_Error, - } - } -} -pub trait VarlinkCallError: varlink::CallTrait { - fn reply_invalid_machine_config(&mut self) -> varlink::Result<()> { - self.reply_struct(varlink::Reply::error( - "io.patagia.hostd.InvalidMachineConfig", - None, - )) - } -} -impl<'a> VarlinkCallError for varlink::Call<'a> {} -#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] -pub struct r#Label { - pub r#key: String, - pub r#value: String, -} -#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] -pub struct r#Machine { - pub r#machineId: String, - pub r#nodeLabels: Option<Vec<Label>>, - pub r#patagiaAgent: Option<PatagiaAgentConfig>, -} -#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] -pub struct r#PatagiaAgentConfig { - pub r#url: Option<String>, -} -#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] -pub struct InvalidMachineConfig_Args {} -#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] -pub struct Apply_Reply {} -impl varlink::VarlinkReply for Apply_Reply {} -#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] -pub struct Apply_Args { - pub r#machine: Machine, -} -pub trait Call_Apply: VarlinkCallError { - fn reply(&mut self) -> varlink::Result<()> { - self.reply_struct(varlink::Reply::parameters(None)) - } -} -impl<'a> Call_Apply for varlink::Call<'a> {} -#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] -pub struct Describe_Reply { - pub r#machine: Machine, -} -impl varlink::VarlinkReply for Describe_Reply {} -#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] -pub struct Describe_Args {} -pub trait Call_Describe: VarlinkCallError { - fn reply(&mut self, r#machine: Machine) -> varlink::Result<()> { - self.reply_struct(Describe_Reply { r#machine }.into()) - } -} -impl<'a> Call_Describe for varlink::Call<'a> {} -pub trait VarlinkInterface { - fn apply(&self, call: &mut dyn Call_Apply, r#machine: Machine) -> varlink::Result<()>; - fn describe(&self, call: &mut dyn Call_Describe) -> varlink::Result<()>; - fn call_upgraded( - &self, - _call: &mut varlink::Call, - _bufreader: &mut dyn BufRead, - ) -> varlink::Result<Vec<u8>> { - Ok(Vec::new()) - } -} -pub trait VarlinkClientInterface { - fn apply(&mut self, r#machine: Machine) -> varlink::MethodCall<Apply_Args, Apply_Reply, Error>; - fn describe(&mut self) -> varlink::MethodCall<Describe_Args, Describe_Reply, Error>; -} -#[allow(dead_code)] -pub struct VarlinkClient { - connection: Arc<RwLock<varlink::Connection>>, -} -impl VarlinkClient { - #[allow(dead_code)] - pub fn new(connection: Arc<RwLock<varlink::Connection>>) -> Self { - VarlinkClient { connection } - } -} -impl VarlinkClientInterface for VarlinkClient { - fn apply(&mut self, r#machine: Machine) -> varlink::MethodCall<Apply_Args, Apply_Reply, Error> { - varlink::MethodCall::<Apply_Args, Apply_Reply, Error>::new( - self.connection.clone(), - "io.patagia.hostd.Apply", - Apply_Args { r#machine }, - ) - } - fn describe(&mut self) -> varlink::MethodCall<Describe_Args, Describe_Reply, Error> { - varlink::MethodCall::<Describe_Args, Describe_Reply, Error>::new( - self.connection.clone(), - "io.patagia.hostd.Describe", - Describe_Args {}, - ) - } -} -#[allow(dead_code)] -pub struct VarlinkInterfaceProxy { - inner: Box<dyn VarlinkInterface + Send + Sync>, -} -#[allow(dead_code)] -pub fn new(inner: Box<dyn VarlinkInterface + Send + Sync>) -> VarlinkInterfaceProxy { - VarlinkInterfaceProxy { inner } -} -impl varlink::Interface for VarlinkInterfaceProxy { - fn get_description(&self) -> &'static str { - "interface io.patagia.hostd\n\ntype Label (\n key: string,\n value: string\n)\n\ntype PatagiaAgentConfig (\n url: ?string\n)\n\ntype Machine(\n machineId: string,\n nodeLabels: ?[]Label,\n patagiaAgent: ?PatagiaAgentConfig\n)\n\nmethod Describe() -> (\n machine: Machine\n)\n\nmethod Apply(\n machine: Machine\n) -> ()\n\nerror InvalidMachineConfig()\n" - } - fn get_name(&self) -> &'static str { - "io.patagia.hostd" - } - fn call_upgraded( - &self, - call: &mut varlink::Call, - bufreader: &mut dyn BufRead, - ) -> varlink::Result<Vec<u8>> { - self.inner.call_upgraded(call, bufreader) - } - fn call(&self, call: &mut varlink::Call) -> varlink::Result<()> { - let req = call.request.unwrap(); - match req.method.as_ref() { - "io.patagia.hostd.Apply" => { - if let Some(args) = req.parameters.clone() { - let args: Apply_Args = match serde_json::from_value(args) { - Ok(v) => v, - Err(e) => { - let es = format!("{}", e); - let _ = call.reply_invalid_parameter(es.clone()); - return Err(varlink::context!(varlink::ErrorKind::SerdeJsonDe(es))); - } - }; - self.inner - .apply(call as &mut dyn Call_Apply, args.r#machine) - } else { - call.reply_invalid_parameter("parameters".into()) - } - } - "io.patagia.hostd.Describe" => self.inner.describe(call as &mut dyn Call_Describe), - m => call.reply_method_not_found(String::from(m)), - } - } -} diff --git a/ipc/src/lib.rs b/ipc/src/lib.rs deleted file mode 100644 index 02666ab..0000000 --- a/ipc/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod io_patagia_hostd;