diff --git a/Cargo.lock b/Cargo.lock index 7cd8204..5226ac6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -993,19 +993,8 @@ name = "hostd" version = "0.2.0" dependencies = [ "anyhow", - "clap", - "dropshot", - "http", - "schemars", - "serde", - "slog", - "slog-async", - "sqlx", + "ipc", "systemd-ipc", - "tokio", - "trace-request", - "tracing", - "tracing-slog", "varlink", ] diff --git a/hostd/Cargo.toml b/hostd/Cargo.toml index 5eab87f..4a8dee3 100644 --- a/hostd/Cargo.toml +++ b/hostd/Cargo.toml @@ -5,19 +5,6 @@ edition.workspace = true [dependencies] anyhow.workspace = true -tokio.workspace = true -sqlx = { version = "0.8.3", default-features = false, features = [ - "macros", "migrate", "postgres", "runtime-tokio", "tls-rustls", "time", "uuid" - ] } varlink = "11.0.1" +ipc = { path = "../ipc" } systemd-ipc = { path = "../systemd-ipc" } -dropshot.workspace = true -clap.workspace = true -slog.workspace = true -slog-async.workspace = true -tracing-slog.workspace = true -tracing.workspace = true -trace-request = { path = "../trace-request" } -schemars.workspace = true -serde.workspace = true -http.workspace = true diff --git a/hostd/src/api.rs b/hostd/src/api.rs deleted file mode 100644 index edc0fcc..0000000 --- a/hostd/src/api.rs +++ /dev/null @@ -1,15 +0,0 @@ -use anyhow::Result; -use dropshot::ApiDescription; - -use std::sync::Arc; - -use crate::context::ControllerContext; -use crate::machine; - -type ControllerApiDescription = ApiDescription<Arc<ControllerContext>>; - -pub fn api() -> Result<ControllerApiDescription> { - let mut api = ControllerApiDescription::new(); - api.register(machine::describe)?; - Ok(api) -} diff --git a/hostd/src/bin/hostd-controller.rs b/hostd/src/bin/hostd-controller.rs deleted file mode 100644 index f6a7eb5..0000000 --- a/hostd/src/bin/hostd-controller.rs +++ /dev/null @@ -1,79 +0,0 @@ -use anyhow::{anyhow, Result}; - -use clap::Parser; -use dropshot::{ConfigDropshot, ServerBuilder}; -use slog::Drain; -use sqlx::PgPool; -use std::net::SocketAddr; -use std::str::FromStr; -use std::sync::Arc; -use tracing_slog::TracingSlogDrain; - -use hostd::api; -use hostd::context::ControllerContext; - -#[derive(Parser, Debug)] -#[command(version, about, long_about = None)] -struct Cli { - #[arg( - long = "telemetry-otlp-endpoint", - default_value = "http://localhost:4317", - env = "OTEL_EXPORTER_OTLP_ENDPOINT" - )] - otlp_endpoint: Option<String>, - - #[arg( - long = "log-stderr", - short = 'v', - default_value = "false", - env = "LOG_TO_STDERR" - )] - log_stderr: bool, - - #[arg( - long = "listen-address", - default_value = "0.0.0.0:9478", - env = "LISTEN_ADDRESS" - )] - listen_address: String, - - #[arg( - long = "database-url", - default_value = "postgresql://localhost/patagia", - env = "DATABASE_URL" - )] - database_url: Option<String>, -} - -#[tokio::main] -async fn main() -> Result<()> { - let args = Cli::parse(); - - let config = ConfigDropshot { - bind_address: SocketAddr::from_str(&args.listen_address).unwrap(), - ..Default::default() - }; - - let logger = { - let level_drain = slog::LevelFilter(TracingSlogDrain, slog::Level::Debug).fuse(); - let async_drain = slog_async::Async::new(level_drain).build().fuse(); - slog::Logger::root(async_drain, slog::o!()) - }; - - // let database_url = args.database_url.unwrap(); - // dont connect to database pass null - // let pg = PgPool::connect(&database_url).await?; - - // sqlx::migrate!().run(&pg).await?; - - let ctx = ControllerContext::new(); - - let api = api::api()?; - - ServerBuilder::new(api, Arc::new(ctx), logger) - .config(config) - .start() - .map_err(|e| anyhow!("Error starting server: {:?}", e))? - .await - .map_err(|e| anyhow!(e)) -} diff --git a/hostd/src/context.rs b/hostd/src/context.rs deleted file mode 100644 index e50d955..0000000 --- a/hostd/src/context.rs +++ /dev/null @@ -1,11 +0,0 @@ -// use sqlx::postgres::PgPool; - -pub struct ControllerContext { - // pub pg_pool: PgPool, -} - -impl ControllerContext { - pub fn new() -> ControllerContext { - ControllerContext {} - } -} diff --git a/hostd/src/lib.rs b/hostd/src/lib.rs deleted file mode 100644 index 94f3a31..0000000 --- a/hostd/src/lib.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod api; -pub mod context; -pub mod machine; diff --git a/hostd/src/machine.rs b/hostd/src/machine.rs deleted file mode 100644 index 39d8437..0000000 --- a/hostd/src/machine.rs +++ /dev/null @@ -1,46 +0,0 @@ -use dropshot::{endpoint, HttpError, HttpResponseOk, RequestContext}; -use schemars::JsonSchema; -use serde::Serialize; -use trace_request::trace_request; - -use systemd_ipc::addrs::SYSTEMD_HOSTNAME; -use systemd_ipc::io_systemd_hostname::{self, VarlinkClientInterface}; - -use std::sync::Arc; - -use std::thread; - -use crate::context::ControllerContext; - -/// Version and build information -#[derive(Serialize, JsonSchema)] -struct MachineInfo { - machine_id: String, -} - -/// Fetch machine id -#[endpoint { - method = GET, - path = "/machine_id", -}] -#[trace_request] -pub async fn describe( - rqctx: RequestContext<Arc<ControllerContext>>, -) -> Result<HttpResponseOk<MachineInfo>, HttpError> { - // Connect to systemd.Hostname - // Make it tokio task blocking - tokio::task::block_in_place(move || { - let conn = varlink::Connection::with_address(SYSTEMD_HOSTNAME).unwrap(); - let mut sd = io_systemd_hostname::VarlinkClient::new(conn); - let machine_id = sd.describe().call().unwrap().MachineID; - - tracing::info_span!("Hello, span hostd!"); - - tracing::info!(monotonic_counter.version_calls = 1); - - let machine_info = MachineInfo { machine_id }; - thread::sleep(std::time::Duration::from_millis(10000)); - - Ok(HttpResponseOk(machine_info)) - }) -} diff --git a/hostd/src/main.rs b/hostd/src/main.rs new file mode 100644 index 0000000..f5d0bea --- /dev/null +++ b/hostd/src/main.rs @@ -0,0 +1,59 @@ +use anyhow::Result; + +use ipc::io_patagia_hostd; +use systemd_ipc::addrs::SYSTEMD_HOSTNAME; +use systemd_ipc::io_systemd_hostname::{self, VarlinkClientInterface}; + +struct PatagiaHostd; + +impl io_patagia_hostd::VarlinkInterface for PatagiaHostd { + fn apply( + &self, + call: &mut dyn io_patagia_hostd::Call_Apply, + machine: io_patagia_hostd::Machine, + ) -> varlink::Result<()> { + // FIXME: Do something useful + println!("Applying machine config: {:#?}", machine); + call.reply() + } + + fn describe(&self, call: &mut dyn io_patagia_hostd::Call_Describe) -> varlink::Result<()> { + // Connect to systemd.Hostname + let conn = varlink::Connection::with_address(SYSTEMD_HOSTNAME).unwrap(); + let mut sd = io_systemd_hostname::VarlinkClient::new(conn); + + let machine = io_patagia_hostd::Machine { + machineId: sd.describe().call().unwrap().MachineID, + nodeLabels: None, + patagiaAgent: None, + }; + call.reply(machine) + } +} + +fn main() -> Result<()> { + let hostd = PatagiaHostd; + let hostd_iface = io_patagia_hostd::new(Box::new(hostd)); + + let svc = varlink::VarlinkService::new( + "io.patagia.hostd", + "Host controller for patagia", + "0.1", + "https://patagia.dev", + vec![Box::new(hostd_iface)], + ); + + let addr = format!("unix:{}/{}", env!("XDG_RUNTIME_DIR"), "io.patagia.hostd"); + + println!("Varlink Listening on {}", addr); + + varlink::listen( + svc, + &addr, + &varlink::ListenConfig { + ..Default::default() + }, + )?; + + Ok(()) +} diff --git a/ipc/.gitignore b/ipc/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/ipc/.gitignore @@ -0,0 +1 @@ +/target diff --git a/ipc/Cargo.toml b/ipc/Cargo.toml new file mode 100644 index 0000000..b26a1f5 --- /dev/null +++ b/ipc/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "ipc" +version.workspace = true +edition.workspace = true + +[dependencies] +serde.workspace = true +serde_derive = "1.0.217" +serde_json = "1.0.135" +varlink = "11.0.1" + +[build-dependencies] +varlink_generator = "10.1.0" +walkdir = "2.5.0" + +[package.metadata.cargo-machete] +ignored = ["serde"] diff --git a/ipc/build.rs b/ipc/build.rs new file mode 100644 index 0000000..fcec0de --- /dev/null +++ b/ipc/build.rs @@ -0,0 +1,12 @@ +extern crate varlink_generator; + +use walkdir::WalkDir; + +fn main() { + println!("cargo:rerun-if-changed=src/*.varlink"); + for entry in WalkDir::new("src").into_iter().filter_map(|e| e.ok()) { + if entry.file_name().to_str().unwrap().ends_with(".varlink") { + varlink_generator::cargo_build_tosource(&entry.path().display().to_string(), true); + } + } +} diff --git a/ipc/src/io.patagia.hostd.varlink b/ipc/src/io.patagia.hostd.varlink new file mode 100644 index 0000000..ea34a12 --- /dev/null +++ b/ipc/src/io.patagia.hostd.varlink @@ -0,0 +1,26 @@ +interface io.patagia.hostd + +type Label ( + key: string, + value: string +) + +type PatagiaAgentConfig ( + url: ?string +) + +type Machine( + machineId: string, + nodeLabels: ?[]Label, + patagiaAgent: ?PatagiaAgentConfig +) + +method Describe() -> ( + machine: Machine +) + +method Apply( + machine: Machine +) -> () + +error InvalidMachineConfig() diff --git a/ipc/src/io_patagia_hostd.rs b/ipc/src/io_patagia_hostd.rs new file mode 100644 index 0000000..19d0df5 --- /dev/null +++ b/ipc/src/io_patagia_hostd.rs @@ -0,0 +1,263 @@ +#![doc = "This file was automatically generated by the varlink rust generator"] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +use serde_derive::{Deserialize, Serialize}; +use std::io::BufRead; +use std::sync::{Arc, RwLock}; +use varlink::{self, CallTrait}; +#[allow(dead_code)] +#[derive(Clone, PartialEq, Debug)] +#[allow(clippy::enum_variant_names)] +pub enum ErrorKind { + Varlink_Error, + VarlinkReply_Error, + InvalidMachineConfig(Option<InvalidMachineConfig_Args>), +} +impl ::std::fmt::Display for ErrorKind { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + match self { + ErrorKind::Varlink_Error => write!(f, "Varlink Error"), + ErrorKind::VarlinkReply_Error => write!(f, "Varlink error reply"), + ErrorKind::InvalidMachineConfig(v) => { + write!(f, "io.patagia.hostd.InvalidMachineConfig: {:#?}", v) + } + } + } +} +pub struct Error( + pub ErrorKind, + pub Option<Box<dyn std::error::Error + 'static + Send + Sync>>, + pub Option<&'static str>, +); +impl Error { + #[allow(dead_code)] + pub fn kind(&self) -> &ErrorKind { + &self.0 + } +} +impl From<ErrorKind> for Error { + fn from(e: ErrorKind) -> Self { + Error(e, None, None) + } +} +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.1 + .as_ref() + .map(|e| e.as_ref() as &(dyn std::error::Error + 'static)) + } +} +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + std::fmt::Display::fmt(&self.0, f) + } +} +impl std::fmt::Debug for Error { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + use std::error::Error as StdError; + if let Some(ref o) = self.2 { + std::fmt::Display::fmt(o, f)?; + } + std::fmt::Debug::fmt(&self.0, f)?; + if let Some(e) = self.source() { + std::fmt::Display::fmt("\nCaused by:\n", f)?; + std::fmt::Debug::fmt(&e, f)?; + } + Ok(()) + } +} +#[allow(dead_code)] +pub type Result<T> = std::result::Result<T, Error>; +impl From<varlink::Error> for Error { + fn from(e: varlink::Error) -> Self { + match e.kind() { + varlink::ErrorKind::VarlinkErrorReply(r) => Error( + ErrorKind::from(r), + Some(Box::from(e)), + Some(concat!(file!(), ":", line!(), ": ")), + ), + _ => Error( + ErrorKind::Varlink_Error, + Some(Box::from(e)), + Some(concat!(file!(), ":", line!(), ": ")), + ), + } + } +} +#[allow(dead_code)] +impl Error { + pub fn source_varlink_kind(&self) -> Option<&varlink::ErrorKind> { + use std::error::Error as StdError; + let mut s: &dyn StdError = self; + while let Some(c) = s.source() { + let k = self + .source() + .and_then(|e| e.downcast_ref::<varlink::Error>()) + .map(|e| e.kind()); + if k.is_some() { + return k; + } + s = c; + } + None + } +} +impl From<&varlink::Reply> for ErrorKind { + #[allow(unused_variables)] + fn from(e: &varlink::Reply) -> Self { + match e { + varlink::Reply { + error: Some(ref t), .. + } if t == "io.patagia.hostd.InvalidMachineConfig" => match e { + varlink::Reply { + parameters: Some(p), + .. + } => match serde_json::from_value(p.clone()) { + Ok(v) => ErrorKind::InvalidMachineConfig(v), + Err(_) => ErrorKind::InvalidMachineConfig(None), + }, + _ => ErrorKind::InvalidMachineConfig(None), + }, + _ => ErrorKind::VarlinkReply_Error, + } + } +} +pub trait VarlinkCallError: varlink::CallTrait { + fn reply_invalid_machine_config(&mut self) -> varlink::Result<()> { + self.reply_struct(varlink::Reply::error( + "io.patagia.hostd.InvalidMachineConfig", + None, + )) + } +} +impl<'a> VarlinkCallError for varlink::Call<'a> {} +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +pub struct r#Label { + pub r#key: String, + pub r#value: String, +} +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +pub struct r#Machine { + pub r#machineId: String, + pub r#nodeLabels: Option<Vec<Label>>, + pub r#patagiaAgent: Option<PatagiaAgentConfig>, +} +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +pub struct r#PatagiaAgentConfig { + pub r#url: Option<String>, +} +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +pub struct InvalidMachineConfig_Args {} +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +pub struct Apply_Reply {} +impl varlink::VarlinkReply for Apply_Reply {} +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +pub struct Apply_Args { + pub r#machine: Machine, +} +pub trait Call_Apply: VarlinkCallError { + fn reply(&mut self) -> varlink::Result<()> { + self.reply_struct(varlink::Reply::parameters(None)) + } +} +impl<'a> Call_Apply for varlink::Call<'a> {} +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +pub struct Describe_Reply { + pub r#machine: Machine, +} +impl varlink::VarlinkReply for Describe_Reply {} +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +pub struct Describe_Args {} +pub trait Call_Describe: VarlinkCallError { + fn reply(&mut self, r#machine: Machine) -> varlink::Result<()> { + self.reply_struct(Describe_Reply { r#machine }.into()) + } +} +impl<'a> Call_Describe for varlink::Call<'a> {} +pub trait VarlinkInterface { + fn apply(&self, call: &mut dyn Call_Apply, r#machine: Machine) -> varlink::Result<()>; + fn describe(&self, call: &mut dyn Call_Describe) -> varlink::Result<()>; + fn call_upgraded( + &self, + _call: &mut varlink::Call, + _bufreader: &mut dyn BufRead, + ) -> varlink::Result<Vec<u8>> { + Ok(Vec::new()) + } +} +pub trait VarlinkClientInterface { + fn apply(&mut self, r#machine: Machine) -> varlink::MethodCall<Apply_Args, Apply_Reply, Error>; + fn describe(&mut self) -> varlink::MethodCall<Describe_Args, Describe_Reply, Error>; +} +#[allow(dead_code)] +pub struct VarlinkClient { + connection: Arc<RwLock<varlink::Connection>>, +} +impl VarlinkClient { + #[allow(dead_code)] + pub fn new(connection: Arc<RwLock<varlink::Connection>>) -> Self { + VarlinkClient { connection } + } +} +impl VarlinkClientInterface for VarlinkClient { + fn apply(&mut self, r#machine: Machine) -> varlink::MethodCall<Apply_Args, Apply_Reply, Error> { + varlink::MethodCall::<Apply_Args, Apply_Reply, Error>::new( + self.connection.clone(), + "io.patagia.hostd.Apply", + Apply_Args { r#machine }, + ) + } + fn describe(&mut self) -> varlink::MethodCall<Describe_Args, Describe_Reply, Error> { + varlink::MethodCall::<Describe_Args, Describe_Reply, Error>::new( + self.connection.clone(), + "io.patagia.hostd.Describe", + Describe_Args {}, + ) + } +} +#[allow(dead_code)] +pub struct VarlinkInterfaceProxy { + inner: Box<dyn VarlinkInterface + Send + Sync>, +} +#[allow(dead_code)] +pub fn new(inner: Box<dyn VarlinkInterface + Send + Sync>) -> VarlinkInterfaceProxy { + VarlinkInterfaceProxy { inner } +} +impl varlink::Interface for VarlinkInterfaceProxy { + fn get_description(&self) -> &'static str { + "interface io.patagia.hostd\n\ntype Label (\n key: string,\n value: string\n)\n\ntype PatagiaAgentConfig (\n url: ?string\n)\n\ntype Machine(\n machineId: string,\n nodeLabels: ?[]Label,\n patagiaAgent: ?PatagiaAgentConfig\n)\n\nmethod Describe() -> (\n machine: Machine\n)\n\nmethod Apply(\n machine: Machine\n) -> ()\n\nerror InvalidMachineConfig()\n" + } + fn get_name(&self) -> &'static str { + "io.patagia.hostd" + } + fn call_upgraded( + &self, + call: &mut varlink::Call, + bufreader: &mut dyn BufRead, + ) -> varlink::Result<Vec<u8>> { + self.inner.call_upgraded(call, bufreader) + } + fn call(&self, call: &mut varlink::Call) -> varlink::Result<()> { + let req = call.request.unwrap(); + match req.method.as_ref() { + "io.patagia.hostd.Apply" => { + if let Some(args) = req.parameters.clone() { + let args: Apply_Args = match serde_json::from_value(args) { + Ok(v) => v, + Err(e) => { + let es = format!("{}", e); + let _ = call.reply_invalid_parameter(es.clone()); + return Err(varlink::context!(varlink::ErrorKind::SerdeJsonDe(es))); + } + }; + self.inner + .apply(call as &mut dyn Call_Apply, args.r#machine) + } else { + call.reply_invalid_parameter("parameters".into()) + } + } + "io.patagia.hostd.Describe" => self.inner.describe(call as &mut dyn Call_Describe), + m => call.reply_method_not_found(String::from(m)), + } + } +} diff --git a/ipc/src/lib.rs b/ipc/src/lib.rs new file mode 100644 index 0000000..02666ab --- /dev/null +++ b/ipc/src/lib.rs @@ -0,0 +1 @@ +pub mod io_patagia_hostd;