hopefully finish implementation

This commit is contained in:
Jan Christian Grünhage
2019-02-06 19:29:27 +01:00
parent bde5127560
commit f68751df31
9 changed files with 315 additions and 175 deletions

28
src/README.md Normal file
View File

@ -0,0 +1,28 @@
## peshming
It's a prometheus exporter pinging hosts in the background.
It's been inspired by [meshping](https://bitbucket.org/Svedrin/meshping),
but instead of managing targets using a redis db this is using a simple config file.
In addition, this tool allows to set a ping frequency per target.
The name peshming is intended as a placeholder until
someone comes up with something better.
### Usage:
```
peshming 0.1.0
Jan Christian Grünhage <jan.christian@gruenhage.xyz>
Pings configured hosts in a configurable intervals and exposes metrics for prometheus.
USAGE:
peshming [FLAGS] <config>
FLAGS:
-h, --help Prints help information
-v, --verbose Be verbose (you can add this up to 4 times for more logs)
-V, --version Prints version information
ARGS:
<config> Set config file
```
For configuration options, see the included sample config file.

73
src/config.rs Normal file
View File

@ -0,0 +1,73 @@
use std::collections::HashMap;
use log::{error, warn, info, debug, trace};
use serde::{Serialize, Deserialize};
use clap::{clap_app, crate_name, crate_version, crate_description, crate_authors};
#[derive(Serialize, Deserialize)]
pub(crate) struct Config {
pub(crate) listener: std::net::SocketAddr,
pub(crate) hosts: HashMap<String, u64>,
}
pub(crate) fn setup_clap() -> (clap::ArgMatches<'static>) {
clap_app!(myapp =>
(name: crate_name!())
(version: crate_version!())
(author: crate_authors!())
(about: crate_description!())
(@arg config: +required "Set config file")
(@arg v: -v --verbose ... "Be verbose (you can add this up to 4 times for more logs)")
).get_matches()
}
pub(crate) fn setup_fern(level: u64) {
let level = match level {
0 => log::LevelFilter::Error,
1 => log::LevelFilter::Warn,
2 => log::LevelFilter::Info,
3 => log::LevelFilter::Debug,
_ => log::LevelFilter::Trace
};
match fern::Dispatch::new()
.format(|out, message, record| {
out.finish(format_args!(
"[{}][{}] {}",
chrono::Local::now().format("%Y-%m-%d %H:%M:%S"),
record.level(),
message
))
})
.level(level)
.chain(std::io::stdout())
.apply() {
Err(_) => {
eprintln!("error setting up logging!");
}
_ => info!("logging set up properly"),
}
}
pub(crate) fn read_config(path: &str) -> Result<Config, Error> {
let config_file_content = std::fs::read_to_string(path)?;
Ok(toml::from_str(&config_file_content)?)
}
pub(crate) struct Error {}
impl std::convert::From<std::io::Error> for Error {
fn from(_: std::io::Error) -> Self {
Error{}
}
}
impl std::convert::From<toml::de::Error> for Error {
fn from(_: toml::de::Error) -> Self {
Error{}
}
}
impl std::convert::From<oping::PingError> for Error {
fn from(_: oping::PingError) -> Self {
Error{}
}
}

View File

@ -1,85 +1,27 @@
extern crate serde_derive;
extern crate toml;
extern crate hyper;
extern crate lazy_static;
extern crate prometheus;
extern crate futures;
extern crate tokio;
extern crate env_logger;
use hyper::header::CONTENT_TYPE;
use hyper::{Body, Request, Response, Server};
use hyper::rt::Future;
use hyper::service::service_fn_ok;
use lazy_static::lazy_static;
use prometheus::{Counter, Encoder, Gauge, HistogramVec, TextEncoder};
use prometheus::*;
use log::{error, warn, info, debug, trace};
use futures::future::lazy;
//TODO: Replace with useful metrics, these are beyond useless
lazy_static! {
static ref HTTP_COUNTER: Counter = register_counter!(opts!(
"example_http_requests_total",
"Total number of HTTP requests made.",
labels! {"handler" => "all",}
))
.unwrap();
static ref HTTP_BODY_GAUGE: Gauge = register_gauge!(opts!(
"example_http_response_size_bytes",
"The HTTP response sizes in bytes.",
labels! {"handler" => "all",}
))
.unwrap();
static ref HTTP_REQ_HISTOGRAM: HistogramVec = register_histogram_vec!(
"example_http_request_duration_seconds",
"The HTTP request latencies in seconds.",
&["handler"]
)
.unwrap();
}
fn start_serving_metrics() {
let serve_metrics = || {
service_fn_ok(|_req| {
HTTP_COUNTER.inc();
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["all"]).start_timer();
let metric_families = prometheus::gather();
let mut buffer = vec![];
let encoder = TextEncoder::new();
encoder.encode(&metric_families, &mut buffer).unwrap();
HTTP_BODY_GAUGE.set(buffer.len() as f64);
let mut res = Response::new(Body::from(buffer));
res.headers_mut().insert(CONTENT_TYPE, encoder.format_type().parse().unwrap());
timer.observe_duration();
res
})
};
println!("listening addr 127.0.0.1:9898");
let server = Server::bind(&([127, 0, 0, 1], 9898).into())
.serve(serve_metrics)
.map_err(|err| eprintln!("server error: {}", err));
tokio::spawn(server);
}
fn start_pinging_hosts() {
tokio::spawn(lazy(|| {
//TODO: Implement the pinging, based on oping and the example over at
// https://tokio.rs/docs/futures/spawning/
Ok(())
}));
}
//TODO: Clean this shameful mess up!
//TODO: Do config reading, cli args, logging etc
mod config;
mod metrics;
mod ping;
use crate::config::{Config, read_config, setup_clap, setup_fern};
use crate::metrics::start_serving_metrics;
use crate::ping::start_pinging_hosts;
fn main() {
env_logger::init();
tokio::run(lazy(|| {
start_serving_metrics();
start_pinging_hosts();
let clap = setup_clap();
setup_fern(clap.occurrences_of("v"));
let config = match read_config(clap.value_of("config").unwrap()) {
Ok(config) => config,
Err(_) => {
error!("Couldn't read config file!");
return;
}
};
tokio::run(lazy(move || {
start_serving_metrics(&config);
start_pinging_hosts(&config);
Ok(())
}));
}

51
src/metrics.rs Normal file
View File

@ -0,0 +1,51 @@
use crate::config::{Config};
use lazy_static::lazy_static;
use hyper::{Server, Response, Body, header::CONTENT_TYPE, service::service_fn_ok};
use prometheus::{TextEncoder, Counter, Gauge, HistogramVec};
use prometheus::*;
use futures::future::Future;
lazy_static! {
static ref HTTP_COUNTER: Counter = register_counter!(opts!(
"http_requests_total",
"Total number of HTTP requests made.",
labels! {"handler" => "all",}
))
.unwrap();
static ref HTTP_BODY_GAUGE: Gauge = register_gauge!(opts!(
"http_response_size_bytes",
"The HTTP response sizes in bytes.",
labels! {"handler" => "all",}
))
.unwrap();
static ref HTTP_REQ_HISTOGRAM: HistogramVec = register_histogram_vec!(
"http_request_duration_seconds",
"The HTTP request latencies in seconds.",
&["handler"]
)
.unwrap();
}
pub(crate) fn start_serving_metrics(config: &Config) {
let serve_metrics = || {
service_fn_ok(|_req| {
HTTP_COUNTER.inc();
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["all"]).start_timer();
let metric_families = prometheus::gather();
let mut buffer = vec![];
let encoder = TextEncoder::new();
encoder.encode(&metric_families, &mut buffer).unwrap();
HTTP_BODY_GAUGE.set(buffer.len() as f64);
let mut res = Response::new(Body::from(buffer));
res.headers_mut().insert(CONTENT_TYPE, encoder.format_type().parse().unwrap());
timer.observe_duration();
res
})
};
println!("Listening on {}", &config.listener);
let server = Server::bind(&config.listener)
.serve(serve_metrics)
.map_err(|err| eprintln!("server error: {}", err));
tokio::spawn(server);
}

55
src/ping.rs Normal file
View File

@ -0,0 +1,55 @@
use crate::config::{Config, Error};
use std::time::{Duration, Instant};
use tokio::timer::{Interval};
use futures::{future::{lazy, Future}, stream::Stream};
use oping::{Ping};
use log::{trace, debug, info, warn, error};
use prometheus::*;
use lazy_static::lazy_static;
lazy_static! {
static ref PING_HISTOGRAM : HistogramVec = register_histogram_vec!(
"ping_rtt_milliseconds",
"The ping round trip time in milliseconds",
&["target"],
vec![0.5, 1.0, 5.0, 10.0, 15.0, 20.0, 25.0, 50.0, 75.0, 100.0, 150.0, 200.0, 250.0,
300.0, 350.0, 400.0, 450.0, 500.0, 550.0, 600.0, 650.0, 700.0, 750.0, 800.0, 900.0,
1000.0, 1250.0, 1500.0, 1750.0, 2000.0]
).unwrap();
}
pub(crate) fn start_pinging_hosts(config: &Config) {
for (host, interval) in config.hosts.clone() {
info!("Spawn ping task for {}", host);
tokio::spawn(
Interval::new(Instant::now(), Duration::from_millis(interval))
.for_each(move |_| {
let mut ping = Ping::new();
ping.set_timeout(2.5);
ping.add_host(&host);
for response in match ping.send() {
Ok(iterator) => iterator,
Err(e) => {
error!("Something went wrong sending the ping: {:?}", e);
return Ok(());
}
}{
if response.dropped > 0 {
debug!("No response from host: {}", response.hostname);
PING_HISTOGRAM
.with_label_values(&[&host])
.observe(2500.0)
} else {
debug!("Response from host {} (address {}): latency {} ms",
response.hostname, response.address, response.latency_ms);
trace!(" all details: {:?}", response);
PING_HISTOGRAM
.with_label_values(&[&host])
.observe(response.latency_ms);
}
}
Ok(())
}).map_err(|_| ())
);
}
}