From e97e8f7d70a07a6c8953cfe7897df04aca167ea8 Mon Sep 17 00:00:00 2001 From: Jan Christian Gr??nhage Date: Mon, 6 Apr 2020 20:29:46 +0200 Subject: [PATCH] fix timing intervals fixes #3 --- CHANGELOG.md | 2 ++ config.toml.sample | 1 - src/ping.rs | 64 +++++++++++++++++++++++++++++----------------- 3 files changed, 43 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c086b85..0bdcee1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Fixed +- fix interval timings ## [v0.2.0] - 2020-04-06 ### Added diff --git a/config.toml.sample b/config.toml.sample index c46861f..4011880 100644 --- a/config.toml.sample +++ b/config.toml.sample @@ -3,4 +3,3 @@ listener = "[::]:9898" [hosts] "1.1.1.1" = 500 "1.0.0.1" = 500 - diff --git a/src/ping.rs b/src/ping.rs index 8d67009..0ecf05c 100644 --- a/src/ping.rs +++ b/src/ping.rs @@ -22,8 +22,9 @@ use futures_util::stream::StreamExt; use lazy_static::lazy_static; use log::{error, info, trace}; use prometheus::*; +use std::net::IpAddr; use std::time::Duration; -use tokio::time::delay_for; +use tokio_ping::Pinger; lazy_static! { static ref PING_HISTOGRAM: HistogramVec = register_histogram_vec!( @@ -42,7 +43,7 @@ lazy_static! { pub(crate) async fn start_pinging_hosts( config: Config, ) -> std::result::Result<(), tokio_ping::Error> { - let pinger = match tokio_ping::Pinger::new().await { + let pinger = match Pinger::new().await { Ok(pinger) => pinger, Err(error) => { error!("Couldn't create pinger: {}", error); @@ -51,27 +52,44 @@ pub(crate) async fn start_pinging_hosts( }; for (host, interval) in config.hosts.clone() { info!("Spawn ping task for {}", host); - let pingchain = pinger.chain(host).timeout(Duration::from_secs(3)); - let host = host.to_string(); - tokio::spawn(pingchain.stream().for_each(move |ping_result| { - match ping_result { - Ok(time) => match time { - Some(time) => { - let ms = time.as_millis(); - trace!("Received pong from {} after {} ms", &host, &ms); - PING_HISTOGRAM - .with_label_values(&[&host]) - .observe(ms as f64); - } - None => { - trace!("Received no response from {} within timeout", &host); - PING_HISTOGRAM.with_label_values(&[&host]).observe(3000.0); - } - }, - Err(error) => error!("Couldn't ping {}: {}", &host, error), - } - delay_for(Duration::from_millis(interval)) - })); + tokio::spawn(ping_host(pinger.clone(), host, interval)); } Ok(()) } + +async fn ping_host(pinger: Pinger, host: IpAddr, interval: u64) { + let pingchain = pinger.chain(host).timeout(Duration::from_secs(3)); + let host = host.to_string(); + let mut stream = pingchain.stream(); + let mut interval = tokio::time::interval(Duration::from_millis(interval)); + loop { + interval.tick().await; + handle_ping_result(stream.next().await.unwrap(), &host).await; + } +} + +async fn handle_ping_result( + result: std::result::Result, tokio_ping::Error>, + host: &str, +) { + let pong = match result { + Ok(pong) => pong, + Err(error) => { + error!("Couldn't ping {}: {}", &host, error); + return; + } + }; + match pong { + Some(time) => { + let ms = time.as_millis(); + trace!("Received pong from {} after {} ms", &host, &ms); + PING_HISTOGRAM + .with_label_values(&[&host]) + .observe(ms as f64); + } + None => { + trace!("Received no response from {} within timeout", &host); + PING_HISTOGRAM.with_label_values(&[&host]).observe(3000.0); + } + }; +}