Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Local syslog support #6

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ chrono = { version = "0.4", features = ["serde"] }
colorful = "0.3"
console-subscriber = { version = "0.4", optional = true }
hostname = "0.4"
once_cell = "1"
libc = "0.2.164"
# We don't want aws-lc-rs, typically a library should leave the backend choice up to the consumer, but we want to be opinionated here.
rustls = { version = "0.23", default-features = false, features = ["logging", "std", "tls12", "ring"] }
serde = { version = "1", features = ["std", "rc", "derive"] }
Expand Down
311 changes: 311 additions & 0 deletions src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,311 @@
use crate::LogRecord;

/// A log backend
pub struct Backend {
inner: Inner,
}

enum Inner {
Network(network::Network),
System(system::System),
}

impl Backend {
pub(crate) fn network(host: String, port: u16, api_key_id: String, api_key: String) -> Self {
Self {
inner: Inner::Network(network::Network::new(host, port, api_key_id, api_key)),
}
}

pub(crate) fn system() -> Self {
Self {
inner: Inner::System(system::System::new().expect("Handle error")),
}
}

pub(crate) fn active(&self) -> bool {
match &self.inner {
Inner::Network(n) => n.active(),
Inner::System(s) => s.active(),
}
}
/// Log the record.
pub(crate) fn log(&self, record: &LogRecord) {
match &self.inner {
Inner::Network(n) => n.log(record),
Inner::System(s) => s.log(record),
}
}
}

mod network {
use std::io::Write;
use std::net::TcpStream;
use std::sync::{Arc, Mutex};

use rustls::pki_types::ServerName;
use rustls::{ClientConfig, ClientConnection, RootCertStore, StreamOwned};

use crate::{Error, LogRecord};

pub struct Network {
conn: Mutex<Option<StreamOwned<ClientConnection, TcpStream>>>,
api_key: String,
api_key_id: String,
host: String,
port: u16,
}

impl Network {
pub fn new(host: String, port: u16, api_key_id: String, api_key: String) -> Self {
Self {
conn: Mutex::new(None),
api_key,
api_key_id,
host,
port,
}
}

pub(super) fn active(&self) -> bool {
!self.api_key.is_empty()
}

pub(super) fn log(&self, record: &crate::LogRecord) {
let mut log_conn = self.conn.lock().unwrap();

// reconnect loop
loop {
// Connect up TLS connection to log server.
if log_conn.is_none() {
match connect(&self.host, self.port) {
Ok(v) => {
*log_conn = Some(v);
}
Err(e) => {
eprintln!("Failed to connect to log host: {:?}", e);

// TODO: Do we need backoff here? Since the logging is sync, it
// would lock up the callsite.

continue;
}
}
}

let str = self.log_record_to_string(record);
let bytes = str.as_bytes();

let stream = log_conn.as_mut().expect("Existing log connection");

match stream.write_all(bytes).and_then(|_| stream.flush()) {
Ok(_) => {
// Log message sent successfully.
break;
}
Err(e) => {
eprintln!("Log connection failed: {:?}", e);

// Remove previous failed connection to trigger reconnect.
*log_conn = None;
}
}
}
}

fn log_record_to_string(&self, record: &LogRecord) -> String {
let mut res = String::new();

let pri = (*record.facility as u8) * 8 + (record.severity as u8);

// 2019-03-18T13:12:27.000+00:00
let time = record.timestamp.format("%Y-%m-%dT%H:%M:%S%.3f%:z");

// 53595 is an private enterprise number (PEN) for Lookback
// as assigned by IANA. https://www.iana.org/assignments/enterprise-numbers
// we applied for it here:
// https://pen.iana.org/pen/PenApplication.page
let strct = format!(
"[{}@53595 apiKey=\"{}\" env=\"{}\"]",
self.api_key_id, self.api_key, record.env
);

let mut message = record
.message
.as_deref()
.map(|s| s.trim())
.unwrap_or("")
.to_owned();

if let Some(w) = &record.well_known {
let s = serde_json::to_string(&w).expect("Json serialize");
message.push(' ');
message.push_str(&s);
}

// replace any char < 32 with a space.
fn strip_ctrl(s: &str) -> String {
s.chars()
.map(|c| match c {
'\x00'..='\x1f' => ' ',
_ => c,
})
.collect()
}

message = strip_ctrl(&message);

fn chk(s: &str) -> &str {
if s.is_empty() {
"-"
} else {
s
}
}

use std::fmt::Write;
write!(
res,
"<{}>1 {} {} {} {} {} {} {}\n",
pri,
time,
chk(&*record.hostname),
chk(&*record.app_name),
record.pid,
chk(&record.msg_id),
strct,
chk(&message),
);

res
}
}

/// Connect a TLS connection to the log server.
fn connect(
log_host: &str,
log_port: u16,
) -> Result<StreamOwned<ClientConnection, TcpStream>, Error> {
let addr = format!("{}:{}", log_host, log_port);

let sock = TcpStream::connect(&addr)?;

let mut root_store = RootCertStore::empty();
root_store.extend(
webpki_roots::TLS_SERVER_ROOTS
.iter()
.map(|ta| ta.to_owned()),
);

let tls_config = ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();

let server_name: ServerName = log_host.try_into()?;
let conn = ClientConnection::new(Arc::new(tls_config), server_name.to_owned())?;

let stream = StreamOwned::new(conn, sock);

Ok(stream)
}
}

mod system {
use std::fmt;
use std::ops::DerefMut;
use std::os::unix::net::UnixDatagram;
use std::sync::Mutex;

use crate::Error;

pub struct System {
socket_and_buf: Mutex<(UnixDatagram, Vec<u8>)>,
}

impl System {
pub fn new() -> Result<Self, Error> {
let socket = UnixDatagram::unbound()?;

socket.connect("/dev/log")?;

Ok(Self {
socket_and_buf: Mutex::new((socket, Vec::with_capacity(1024 * 10))),
})
}

pub(super) fn active(&self) -> bool {
true
}

pub(super) fn log(&self, record: &crate::LogRecord) {
use std::io::Write;
let pri = (*record.facility as u8) * 8 + (record.severity as u8);

let app_name = chk(record.app_name.as_str());
let pid = record.pid;
let msg = StripCtrl(record.message.as_ref().map(|s| s.as_str()).unwrap_or(""));
let mut socket_and_buf = self.socket_and_buf.lock().unwrap();
let (socket, buf) = socket_and_buf.deref_mut();
// TODO: Handle errors

// Clear out the buffer
buf.clear();

write!(buf, r#"<{pri}>{app_name}[{pid}]: {msg}"#);

if let Some(w) = &record.well_known {
write!(buf, " ");
serde_json::to_writer(&mut *buf, w).expect("JSON serialize");
}
write!(buf, "\n");

socket.send(&buf);
}
}

struct StripCtrl<T>(T);

impl<T: fmt::Display> fmt::Display for StripCtrl<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use fmt::Write;
// Custom writing mechanism that replaces control characters on-the-fly
struct ControlSanitizer<'a, 'b> {
formatter: &'a mut fmt::Formatter<'b>,
}

impl<'a, 'b> fmt::Write for ControlSanitizer<'a, 'b> {
fn write_str(&mut self, s: &str) -> fmt::Result {
for c in s.chars() {
self.formatter
.write_char(if c.is_control() { ' ' } else { c })?;
}

Ok(())
}

fn write_char(&mut self, c: char) -> fmt::Result {
self.formatter
.write_char(if c.is_control() { ' ' } else { c })
}
}

let mut sanitizer = ControlSanitizer { formatter: f };

write!(sanitizer, "{}", self.0)
}
}

impl<T: fmt::Display> StripCtrl<T> {
fn new(value: T) -> Self {
StripCtrl(value)
}
}

fn chk(s: &str) -> &str {
if s.is_empty() {
"-"
} else {
s
}
}
}
Loading