Browse Source

Switched from error_chain to failure.

master 0.0.17
flabbergast 2 years ago
parent
commit
1782b1136a
  1. 1
      .gitignore
  2. 12
      Cargo.lock
  3. 2
      Cargo.toml
  4. 22
      src/decode_jee.rs
  5. 8
      src/errors.rs
  6. 57
      src/lib.rs
  7. 44
      src/main.rs
  8. 23
      src/sensordata.rs

1
.gitignore

@ -1,2 +1,3 @@
/target/
**/*.rs.bk
builder-arm

12
Cargo.lock

@ -320,15 +320,6 @@ dependencies = [
"termcolor 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "error-chain"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"backtrace 0.3.30 (registry+https://github.com/rust-lang/crates.io-index)",
"version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "failure"
version = "0.1.5"
@ -450,7 +441,7 @@ dependencies = [
"chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)",
"config 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)",
"error-chain 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)",
"failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"loggerv 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"mongodb 0.3.12 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1638,7 +1629,6 @@ dependencies = [
"checksum derive_more 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3f57d78cf3bd45270dad4e70c21ec77a960b36c7a841ff9db76aaa775a8fb871"
"checksum digest 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)" = "03b072242a8cbaf9c145665af9d250c59af3b958f83ed6824e13533cf76d5b90"
"checksum env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)" = "15b0a4d2e39f8420210be8b27eeda28029729e2fd4291019455016c348240c38"
"checksum error-chain 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3ab49e9dcb602294bc42f9a7dfc9bc6e936fca4418ea300dbfb84fe16de0b7d9"
"checksum failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "795bd83d3abeb9220f257e597aa0080a508b27533824adf336529648f6abf7e2"
"checksum failure_derive 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "ea1063915fd7ef4309e222a5a07cf9c319fb9c7836b1f89b85458672dbb127e1"
"checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"

2
Cargo.toml

@ -8,7 +8,7 @@ clap = { version = "~2.32.0", features = ["yaml"] }
config = "0.9"
log = "0.4.6"
loggerv = "0.7.1"
error-chain = "~0.12.0"
failure = "~0.1.5"
bus = "~2.2.0"
rand = "0.5.6"
num-traits = "0.2.8"

22
src/decode_jee.rs

@ -7,10 +7,10 @@ use rumqtt;
use bson;
use chrono;
use mongodb;
use failure::{self,ResultExt};
use sensordata::{SensorData,SensorDatum};
use params;
use errors::*;
// sensor device types
@ -30,7 +30,7 @@ pub enum MessageType {
}
impl DeviceType {
pub fn process_bytes(&self, bytes: Vec<u8>) -> Result<Vec<SensorDatum>> {
pub fn process_bytes(&self, bytes: Vec<u8>) -> Result<Vec<SensorDatum>, failure::Error> {
match self {
//TempHumBattV1
//- 2bytes|LE|temp*100
@ -38,7 +38,7 @@ impl DeviceType {
//- 2bytes|LE|volt*1000 {1.5--4.0}
DeviceType::TempHumBattV1 => {
if bytes.len() < 8 {
bail!("Not enough bytes in message from TempHumBattV1 type.");
bail!("Not enough bytes in message from TempHumBattV1 type.")
} else { // TODO: perhaps should also implement sanity checks on the resulting values
Ok(vec![
SensorDatum::Temperature(le64fs(&bytes[2..4],100)),
@ -54,7 +54,7 @@ impl DeviceType {
//- 1byte|volt*100 {0.5--2.0}
DeviceType::TempHumBattV2 => {
if bytes.len() < 7 {
bail!("Not enough bytes in message from TempHumBattV2 type.");
bail!("Not enough bytes in message from TempHumBattV2 type.")
} else {
Ok(vec![
SensorDatum::Temperature(le64f(&bytes[2..4],100)),
@ -69,7 +69,7 @@ impl DeviceType {
//- 2bytes|LE|volt*1000 {1.0--4.0}
DeviceType::TempBattV1 => {
if bytes.len() < 6 {
bail!("Not enough bytes in message from TempBattV1 type.");
bail!("Not enough bytes in message from TempBattV1 type.")
} else {
Ok(vec![
SensorDatum::Temperature(le64f(&bytes[2..4],1000)),
@ -83,7 +83,7 @@ impl DeviceType {
//- 2bytes|LE|volt*1000 {1.5--4.0}
DeviceType::TempBattV2 => {
if bytes.len() < 6 {
bail!("Not enough bytes in message from TempBattV2 type.");
bail!("Not enough bytes in message from TempBattV2 type.")
} else {
Ok(vec![
SensorDatum::Temperature(le64f(&bytes[2..4],16)),
@ -99,7 +99,7 @@ impl DeviceType {
//- 2bytes|LE|volt*1000 {1.5--4.0}
DeviceType::TempHumLuxBattV1 => {
if bytes.len() < 10 {
bail!("Not enough bytes in message from TempBattV2 type.");
bail!("Not enough bytes in message from TempBattV2 type.")
} else {
Ok(vec![
SensorDatum::Temperature(le64f(&bytes[2..4],100)),
@ -132,7 +132,7 @@ pub fn process_gw_incoming(msg: &str,
node_list: &HashMap<String,DeviceDesc>,
opts: &params::AuxOptions,
mut mq: Option<&mut rumqtt::MqttClient>,
mdb: Option<&mongodb::Client>) -> Result<()> {
mdb: Option<&mongodb::Client>) -> Result<(), failure::Error> {
debug!("Processing incoming packet: {}.", msg);
// publish raw incoming
@ -195,18 +195,18 @@ pub fn process_gw_incoming(msg: &str,
if let Some(client) = mq {
packet.publish_to_mqtt(client, &opts.mqtt_post_processed_prefix)
.unwrap_or_else(|e| warn!("Problem publishing mqtt message ({}).", e.to_string()) );
}
} // do not panic if unsuccessful
// save the decoded data into mongodb
if let Some(client) = mdb {
packet.save_to_mongodb(&client)
.chain_err(|| "Failed to save to MongoDB.")?;
.context("Failed to save to MongoDB.")?;
}
// save the decoded data to csv files
if opts.csv_enabled {
packet.save_to_csv( &opts.csv_path )
.chain_err(|| "Failed to save to csv files.")?;
.context("Failed to save to csv files.")?;
}
Ok(())

8
src/errors.rs

@ -1,8 +0,0 @@
// Create the Error, ErrorKind, ResultExt, and Result types
error_chain!{
foreign_links {
Io(::std::io::Error);
}
}

57
src/lib.rs

@ -1,7 +1,7 @@
// crate jeethru
// all jeethru functionality is in this crate except parsing/setting up program parameters
#[macro_use] extern crate error_chain;
#[macro_use] extern crate failure;
#[macro_use] extern crate log;
extern crate bus;
@ -25,9 +25,9 @@ use std::collections::HashMap;
use mongodb::ThreadedClient;
use failure::ResultExt;
pub mod params;
pub mod errors;
use errors::*;
pub mod decode_jee;
mod sensordata;
@ -89,30 +89,31 @@ fn handle_output_serve(listen_serve: net::TcpListener,
// process one output client
fn process_output_client(stream: net::TcpStream,
bus_outhandle: &sync::Arc<sync::Mutex<bus::Bus<String>>>,
gw_stream: &net::TcpStream) -> Result<()> {
gw_stream: &net::TcpStream) -> Result<(), failure::Error> {
let rx = bus_outhandle.lock()
.map_err(|e| -> errors::Error { format!("Couldn't lock the bus: {}", e.to_string()).into() })?
.add_rx(); // not using chain_err: a lifetime parameter in the error type
let tx = gw_stream.try_clone().chain_err(|| "Couldn't clone input stream.")?;
let output_stream = stream.try_clone().chain_err(|| "Couldn't clone input stream.")?;
.map_err(|e| format_err!("Couldn't lock the bus: {}", e.to_string()) )?
.add_rx(); // can't use .context, weird error type
let tx = gw_stream.try_clone().context("Couldn't clone input stream.")?;
let output_stream = stream.try_clone().context("Couldn't clone input stream.")?;
thread::Builder::new()
.name(format!("rd:{:?}", stream.peer_addr()).to_string())
.spawn(|| {
handle_read_from_client(stream, tx);
}).chain_err(|| "Couldn't spawn read_from_client thread.")?;
}).context("Couldn't spawn read_from_client thread.")?;
debug!("Spawned thread for reading from client.");
thread::Builder::new()
.name(format!("wr:{:?}", output_stream.peer_addr()).to_string())
.spawn(|| {
handle_write_to_client(output_stream, rx);
}).chain_err(|| "Couldn't spawn write_to_client thread.")?;
}).context("Couldn't spawn write_to_client thread.")?;
debug!("Spawned thread for writing to client.");
Ok(())
}
pub fn processor(params: params::Params, node_list: HashMap<String,decode_jee::DeviceDesc>) -> Result<()> {
pub fn processor(params: params::Params, node_list: HashMap<String,decode_jee::DeviceDesc>)
-> Result<(), failure::Error> {
// mqtt setup and connect
let mqtt_client_options = rumqtt::MqttOptions::new(format!("jee-to-mqtt-rust-{:x}",rand::random::<u16>()),
params.host_mqtt.clone(), params.port_mqtt)
@ -121,10 +122,8 @@ pub fn processor(params: params::Params, node_list: HashMap<String,decode_jee::D
let mut mqtt_client : Option<rumqtt::MqttClient> = if params.mqtt_enabled {
Some(rumqtt::MqttClient::start(mqtt_client_options)
// for whatever reason (?) chain_err didn't want to go through (no method named
// chain_err found for type `... rumqtt::ConnectError>` in the current scope)
.map_err(|e| -> errors::Error { format!("Couldn't start mqtt client for {}:{} > {}",
params.host_mqtt.clone(), params.port_mqtt, e.to_string()).into() })?
.context(format!("Couldn't start mqtt client for {}:{}",
params.host_mqtt.clone(), params.port_mqtt))?
.0) // forgetting the notification object here
} else { None };
if mqtt_client.is_some() {
@ -137,7 +136,7 @@ pub fn processor(params: params::Params, node_list: HashMap<String,decode_jee::D
// mongodb connect
let mongo_client = if params.mongo_enabled {
Some(mongodb::Client::connect(&params.host_mongo, params.port_mongo)
.chain_err(|| format!("Couldn't connect to mongo database at {}:{}.", params.host_mongo, params.port_mongo))?)
.context(format!("Couldn't connect to mongo database at {}:{}.", params.host_mongo, params.port_mongo))?)
} else { None };
if mongo_client.is_some() {
info!("Connected to Mongo server {}:{}.", params.host_mongo, params.port_mongo);
@ -154,9 +153,9 @@ pub fn processor(params: params::Params, node_list: HashMap<String,decode_jee::D
// gateway (expected ser2net) stream connect
let gw_stream = net::TcpStream::connect( (params.host_gw.as_str(), params.port_gw) )
.chain_err(|| format!("Couldn't connect to '{}:{}'.", params.host_gw, params.port_gw))?;
.context(format!("Couldn't connect to '{}:{}'.", params.host_gw, params.port_gw))?;
// create some copies of the stream for reading/writing
let input_gw_stream = gw_stream.try_clone().chain_err(|| "Couldn't clone input stream.")?;
let input_gw_stream = gw_stream.try_clone().context("Couldn't clone input stream.")?;
let input_gw_reader = io::BufReader::new(input_gw_stream);
//let _output_gw_stream = gw_stream.try_clone().chain_err(|| "Couldn't clone output stream.")?;
info!("Connected to input stream {}:{}.", params.host_gw, params.port_gw);
@ -164,38 +163,38 @@ pub fn processor(params: params::Params, node_list: HashMap<String,decode_jee::D
// thread: consume gateway|ser2net input
let aux_opts_clone = params.auxopts.clone();
let input_gw_handler = thread::Builder::new().name("input_handler".to_string()).spawn(move || -> Result<()> {
let input_gw_handler = thread::Builder::new().name("input_handler".to_string()).spawn(move ||
-> Result<(), failure::Error> {
for line in input_gw_reader.lines() {
let line = line.chain_err(|| "TCP input stream error")?;
let line = line.context("TCP input stream error")?;
bus_inhandle.lock()
.map_err(|e| -> errors::Error { format!("Couldn't lock the bus: {}", e.to_string()).into() })?
.broadcast(line.clone()); // not using chain_err: a lifetime parameter in the error type
.map_err(|e| format_err!("Couldn't lock the bus: {}", e.to_string()) )?
.broadcast(line.clone()); // can't use .context, weird error type
decode_jee::process_gw_incoming(&line, &node_list, &aux_opts_clone,
mqtt_client.as_mut(), mongo_client.as_ref())
.chain_err(|| "Problem processing incoming packet.")?;
.context("Problem processing incoming packet.")?;
}
warn!("Incoming stream closed, exiting.");
Ok(())
}).chain_err(|| "Couldn't spawn input_handler thread.")?;
}).context("Couldn't spawn input_handler thread.")?;
debug!("Spawned input_handler thread.");
// start tcp server for forwarding messages to clients
let host_serve_clone = params.host_serve.clone();
let listen_serve = net::TcpListener::bind( (host_serve_clone.as_str(), params.port_serve) )
.chain_err(|| format!("Couldn't bind to {}:{}.", host_serve_clone, params.port_serve))?;
.context(format!("Couldn't bind to {}:{}.", host_serve_clone, params.port_serve))?;
info!("Bound TCP listener to {}:{}.", host_serve_clone, params.port_serve);
// thread: process tcp connect requests
let _output_serve_handler = thread::Builder::new().name("output_handler".to_string()).spawn(|| {
handle_output_serve(listen_serve, bus_outhandle, gw_stream);
}).chain_err(|| "Couldn't spawn output_handler thread.")?;
}).context("Couldn't spawn output_handler thread.")?;
debug!("Spawned output_handler thread.");
// wait for the input stream to close
// (using map_err because chain_err won't work on Box<...> returned by join())
input_gw_handler.join()
.map_err(|e| -> errors::Error { format!("Exiting. Input handler panicked: {:?}", e).into() })?
.chain_err(|| "Exiting. Error with input stream.")?;
.map_err(|e| format_err!("Exiting. Input handler panicked: {:?}.", e))?
.context("Exiting. Error with input stream.")?;
info!("End of input. Exiting.");
Ok(())

44
src/main.rs

@ -2,7 +2,7 @@
//#![allow(unused_imports)]
#[macro_use] extern crate clap;
extern crate error_chain;
#[macro_use] extern crate failure;
#[macro_use] extern crate log;
extern crate loggerv;
extern crate config;
@ -16,12 +16,13 @@ use std::str;
use std::collections::HashMap;
use std::process;
use failure::ResultExt;
use num_traits::cast;
use clap::App;
use jeethru::{decode_jee,params,errors::*};
use jeethru::{decode_jee,params};
// getting parameters: cmdline overrides cfg_file overrides defaults
@ -29,60 +30,60 @@ use jeethru::{decode_jee,params,errors::*};
//
// get parameter value (numerical)
fn get_param<T>(matches: &clap::ArgMatches, settings: &mut config::Config, param: &str, deft: T)
-> Result<T>
-> Result<T, failure::Error>
where T: str::FromStr+cast::NumCast {
if matches.is_present(param) {
Ok(value_t!(matches, param, T)
.chain_err(|| format!("Problem parsing '{}' as integer from the command line parameter.", param))?)
.context(format!("Problem parsing '{}' as integer from the command line parameter.", param))?)
} else {
settings.set_default::<i64>(param, cast::cast(deft).unwrap())
.chain_err(|| format!("Problem setting default value for '{}'.", param))?;
.context(format!("Problem setting default value for '{}'.", param))?;
Ok(cast::cast(settings.get_int(param)
.chain_err(|| format!("Problem parsing '{}' as integer from the configuration file.", param))?)
.chain_err(|| format!("Problem casting parsed {} to required number type.",param))?)
.context(format!("Problem parsing '{}' as integer from the configuration file.", param))?)
.ok_or(format_err!("Problem casting parsed {} to required number type.",param))?)
}
}
// get parameter value (string)
fn get_param_str(matches: &clap::ArgMatches, settings: &mut config::Config, param: &str, deft: &str)
-> Result<String> {
-> Result<String, failure::Error> {
if matches.is_present(param) {
Ok(matches.value_of(param)
.chain_err(|| format!("Problem parsing '{}' as string from the command line parameter.", param))?
.ok_or(format_err!("Problem parsing '{}' as string from the command line parameter.", param))?
.to_string())
} else {
settings.set_default(param, deft)
.chain_err(|| format!("Problem setting default value for '{}'.", param))?;
.context(format!("Problem setting default value for '{}'.", param))?;
Ok(settings.get_str(param)
.chain_err(|| format!("Problem parsing '{}' as string from the configuration file.", param))?)
.context(format!("Problem parsing '{}' as string from the configuration file.", param))?)
}
}
// get parameter value (bool)
fn get_param_bool(matches: &clap::ArgMatches, settings: &mut config::Config, param: &str, deft: bool)
-> Result<bool> {
-> Result<bool, failure::Error> {
settings.set_default(param, deft)
.chain_err(|| format!("Problem setting default value for '{}'.", param))?;
.context(format!("Problem setting default value for '{}'.", param))?;
Ok(settings.get_bool(param)
.chain_err(|| format!("Problem parsing '{}' as bool from the configuration file.", param))?
.context(format!("Problem parsing '{}' as bool from the configuration file.", param))?
^ matches.is_present(param))
}
// main function
fn run(matches: clap::ArgMatches) -> Result<()> {
fn run(matches: clap::ArgMatches) -> Result<(), failure::Error> {
let config_file = matches.value_of("config_file")
.chain_err(|| "Couldn't parse configuration file name.")?;
.ok_or(format_err!("Couldn't parse configuration file name."))?;
// settings
// the 'merge' below won't error out if the config file does not exist, but it will still panic
// if the config file exists but is not correctly parsable
let mut settings = config::Config::default();
settings.merge(config::File::new(config_file, config::FileFormat::Yaml).required(false))
.chain_err(|| format!("Problem parsing config file {}.", config_file))?
.context(format!("Problem parsing config file {}.", config_file))?
.merge(config::Environment::with_prefix("JEETHRU"))
.chain_err(|| "Problem parsing JEETHRU_ environment variables.")?;
.context("Problem parsing JEETHRU_ environment variables.")?;
// set up what's then passed into the main function
let node_list = settings.get::<HashMap<String,decode_jee::DeviceDesc>>("devices")
@ -134,11 +135,14 @@ fn main() {
warn!("v{} starting...", crate_version!());
// unwrapped and modified quick_main! from error_chain
// run run() and unwrap any returned errors
process::exit(match run(matches) {
Ok(_) => 0,
Err(ref e) => {
error!("{}", error_chain::ChainedError::display_chain(e));
error!("fatal: {}; caused by:", e);
for c in e.iter_causes() {
error!(" {:?}", c);
}
1
}
});

23
src/sensordata.rs

@ -9,8 +9,7 @@ use rumqtt;
use bson;
use mongodb::{self,ThreadedClient,db::ThreadedDatabase};
use chrono;
use errors::{self,*};
use failure::ResultExt;
// sensor data structures: individual sensor reading
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
@ -57,7 +56,7 @@ pub struct SensorData<'a> {
// publish a data packet to mqtt
impl<'a> SensorData<'a> {
pub fn publish_to_mqtt( &self, mq: &mut rumqtt::MqttClient, prefix: &str ) -> Result<()> {
pub fn publish_to_mqtt( &self, mq: &mut rumqtt::MqttClient, prefix: &str ) -> Result<(), failure::Error> {
trace!("Publishing decoded packet to MQTT broker.");
for datum in &self.data {
mq.publish(
@ -65,24 +64,22 @@ impl<'a> SensorData<'a> {
rumqtt::QoS::AtMostOnce,
false,
datum.to_value_string().into_bytes()
).map_err(|e| -> errors::Error { format!("Problem publishing mqtt message: {}", e.to_string()).into() })?;
// for whatever reason (?) chain_err didn't want to go through (no method named
// chain_err found for type `... rumqtt::ConnectError>` in the current scope)
).context("Problem publishing mqtt message.")?;
}
trace!("Published decoded packet to MQTT.");
Ok(())
}
// save a data packet to mongodb
pub fn save_to_mongodb( &self, mdb: &mongodb::Client ) -> Result<()> {
pub fn save_to_mongodb( &self, mdb: &mongodb::Client ) -> Result<(), failure::Error> {
trace!("Saving decoded packet to MongoDB.");
let mc = mdb.db("sensors").collection(
&self.name.trim_matches('/').replace("/","_"));
let serialized_packet = bson::to_bson(&self)
.chain_err(|| "Failed to serialize.")?; // Serialize
.context("Failed to serialize.")?; // Serialize
if let bson::Bson::Document(document) = serialized_packet {
mc.insert_one(document, None)
.chain_err(|| "Failed to insert to database.")?; // Insert
.context("Failed to insert to database.")?; // Insert
} else {
bail!("Failed to convert into MongoDB document.");
}
@ -91,7 +88,7 @@ impl<'a> SensorData<'a> {
}
// save data to text files
pub fn save_to_csv( &self, prefix: &str ) -> Result<()> {
pub fn save_to_csv( &self, prefix: &str ) -> Result<(), failure::Error> {
for datum in &self.data {
// one file for each datum...
let filename = path::Path::new(prefix)
@ -104,16 +101,16 @@ impl<'a> SensorData<'a> {
.append(true)
.create(true)
.open(filename)
.chain_err(|| "Failed to open file for appending.")?;
.context("Failed to open file for appending.")?;
if ! existed {
writeln!(file,"timestamp,isodate,{}",datum.to_name())
.chain_err(|| "Failed writing to file.")?;
.context("Failed writing to file.")?;
}
writeln!(file, "{},{},{}", &self.timestamp.timestamp(),
&self.timestamp.with_timezone(&chrono::Local).to_rfc3339(),
datum.to_value_string())
.chain_err(|| "Failed writing to file.")?;
.context("Failed writing to file.")?;
}
Ok(())
}

Loading…
Cancel
Save