diff --git a/Cargo.toml b/Cargo.toml index d821165..fa3eb3b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,9 @@ edition = "2018" #[profile.dev] #opt-level = 3 +#[profile.test] +#postgres = "~0.1.7" + [dependencies] tokio-postgres = { version="0.5.4", features = ["runtime","with-eui48-0_4","with-serde_json-1"] } tokio = { version = "0.2", features = ["full"] } diff --git a/src/configure/mod.rs b/src/configure/mod.rs new file mode 100644 index 0000000..da70713 --- /dev/null +++ b/src/configure/mod.rs @@ -0,0 +1,36 @@ + +// Init of configuration files could also be done via Config crate. +// But at this point of development it seems like unjustified overhead. + +extern crate serde_json; +use std::fs::File; + +pub struct Config { + pub filter: String, + pub insert_max: usize, + pub pcap_file: String, + pub connection: String, + pub device: String, + pub is_device: bool, + pub pcap_dir: String, +} + + +pub fn from_json_file() -> Option { + let config_file = File::open("parser.json").expect("file should open read only"); + let json: serde_json::Value = serde_json::from_reader(config_file).unwrap(); + Some(Config { + filter: json.get("filter").unwrap().as_str().unwrap().to_owned(), + insert_max : json.get("insert_max").unwrap().as_u64().unwrap() as usize, + pcap_file : json.get("pcap_file").unwrap().as_str().unwrap().to_owned(), + connection : format!("host={} user={} password={}", + json.get("database_host").unwrap().as_str().unwrap(), + json.get("database_user").unwrap().as_str().unwrap(), + json.get("database_password").unwrap().as_str().unwrap(), + ), + device : json.get("parse_device").unwrap().as_str().unwrap().to_owned(), + is_device : json.get("from_device").unwrap().as_bool().unwrap(), + pcap_dir : json.get("pcap_dir").unwrap().as_str().unwrap().to_owned(), + }) +} + diff --git a/src/main.rs b/src/main.rs index 08b777e..8683429 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,9 @@ extern crate tokio_postgres; use std::fs::File; mod parser; mod serializer; +mod configure; use std::fs; +use rayon::prelude::*; use std::io::prelude::*; use std::collections::HashMap; use tokio_postgres::types::ToSql; @@ -24,37 +26,20 @@ fn query_string(insert_max: &usize) -> String { insert_template } + #[tokio::main(core_threads = 4)] // By default, tokio_postgres uses the tokio crate as its runtime. async fn main() -> Result<(), Error> { /* Init values from file */ - let file = File::open("parser.json").expect("file should open read only"); - - let json: serde_json::Value = serde_json::from_reader(file).unwrap(); - let filter = json.get("filter").unwrap().as_str().unwrap(); - let insert_max = json.get("insert_max").unwrap().as_u64().unwrap() as usize; - let pcap_file = json.get("pcap_file").unwrap().as_str().unwrap(); - let host = [ - "host=", - json.get("database_host").unwrap().as_str().unwrap(), - ] - .join(""); - let user = [ - "user=", - json.get("database_user").unwrap().as_str().unwrap(), - ] - .join(""); - let password = [ - "password=", - json.get("database_password").unwrap().as_str().unwrap(), - ] - .join(""); - let connection = [host, user, password].join(" "); - let device = json.get("parse_device").unwrap().as_str().unwrap(); - let is_device = json.get("from_device").unwrap().as_bool().unwrap(); - let pcap_dir = json.get("pcap_dir").unwrap().as_str().unwrap(); - + + let config: configure::Config = configure::from_json_file().unwrap(); + + // let mut pcap_map: Hashmap = HashMap::new(); + + + // TODO: hash file metadata, so its state is comparable at different times and can be written to a db table + // This db table should include UUIDs so it can be joined effectively let mut pcap_map = HashMap::new(); - if let Ok(entries) = fs::read_dir(pcap_dir) { + if let Ok(entries) = fs::read_dir(config.pcap_dir) { for entry in entries { if let Ok(entry) = entry { if let Ok(_file_type) = entry.file_type() { @@ -78,7 +63,7 @@ async fn main() -> Result<(), Error> { } println!("{:?}", pcap_map.iter()); /* db connection */ - let (client, connection) = tokio_postgres::connect(&connection, NoTls).await?; + let (client, connection) = tokio_postgres::connect(&config.connection, NoTls).await?; tokio::spawn(async move { if let Err(e) = connection.await { @@ -97,15 +82,16 @@ async fn main() -> Result<(), Error> { .await?; /* device or file input */ - if false == is_device { - let v: Vec = parser::parse(&pcap_file, &filter); + if false == config.is_device { + for _pcap_file in pcap_map.keys() { + let v: Vec = parser::parse(&_pcap_file, &config.filter); let packets_serialized = serializer::serialize_packets(v); /* Query */ - //let insert_max = 60; - let chunk_count = packets_serialized.len() / insert_max; - let remainder: usize = packets_serialized.len() % insert_max; - let chunker = &packets_serialized.len() < &insert_max; + + let chunk_count = packets_serialized.len() / config.insert_max; + let remainder: usize = packets_serialized.len() % config.insert_max; + let chunker = &packets_serialized.len() < &config.insert_max; match chunker { true => { let insert_str = query_string(&packets_serialized.len()); @@ -119,11 +105,11 @@ async fn main() -> Result<(), Error> { } false => { - let insert_str = query_string(&insert_max); + let insert_str = query_string(&config.insert_max); let statement = client.prepare(&insert_str).await?; for _i in 0..chunk_count { - let (_input, _) = packets_serialized.split_at(insert_max); + let (_input, _) = packets_serialized.split_at(config.insert_max); client .query_raw(&statement, _input.to_vec().iter().map(|p| p as &dyn ToSql)) .await?; @@ -147,11 +133,12 @@ async fn main() -> Result<(), Error> { } } } + } } else { - let insert_str = query_string(&insert_max); + let insert_str = query_string(&config.insert_max); let statement = client.prepare(&insert_str).await?; loop { - let v: Vec = parser::parse_device(&device, &filter, &insert_max); + let v: Vec = parser::parse_device(&config.device, &config.filter, &config.insert_max); let packets_serialized = serializer::serialize_packets(v); client .query_raw( @@ -164,28 +151,3 @@ async fn main() -> Result<(), Error> { Ok(()) } - -#[test] -fn test_insert_json() { - use serde_json::json; - let mut client = - Client::connect("host=localhost user=postgres password=password", NoTls).unwrap(); - let john = json!({ - "name": "John Doe", - "age": 43, - "phones": [ - "+44 1234567", - "+44 2345678" - ], - "empty": [] - }); - - client - .execute("DROP TABLE IF EXISTS json_dump", &[]) - .unwrap(); - client.execute( - "CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, data json NOT NULL)", - &[], - ); - client.query("INSERT INTO json_dump ( data ) VALUES ($1)", &[&john]); -} diff --git a/src/parser/mod.rs b/src/parser/mod.rs index 38a510d..1f76be0 100644 --- a/src/parser/mod.rs +++ b/src/parser/mod.rs @@ -45,7 +45,7 @@ fn flag_carnage(re: &Regex, payload: &[u8]) -> Option { } } -pub fn parse(parse_file: &str, filter_str: &str) -> Vec { +pub fn parse(parse_file: &std::path::Path, filter_str: &str) -> Vec { let ether_init = build_ether(); let mut me = QryData { diff --git a/tests/units.rs b/tests/units.rs new file mode 100644 index 0000000..7972ebd --- /dev/null +++ b/tests/units.rs @@ -0,0 +1,37 @@ +//extern crate serde_json; +//extern crate tokio; +//extern crate tokio_postgres; +//use std::fs::File; +//mod parser; +//mod serializer; +//use std::fs; +//use std::io::prelude::*; +//use std::collections::HashMap; +//use tokio_postgres::types::ToSql; +//use tokio_postgres::{Error, NoTls}; +// +// +//#[test] +//fn test_insert_json() { +// use serde_json::json; +// let mut client = +// tokio_postgres::connect("host=localhost user=postgres password=password", NoTls).unwrap(); +// let john = json!({ +// "name": "John Doe", +// "age": 43, +// "phones": [ +// "+44 1234567", +// "+44 2345678" +// ], +// "empty": [] +// }); +// +// client +// .execute("DROP TABLE IF EXISTS json_dump", &[]) +// .unwrap(); +// client.execute( +// "CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, data json NOT NULL)", +// &[], +// ); +// client.query("INSERT INTO json_dump ( data ) VALUES ($1)", &[&john]); +//}