From 0e1b9435b176f3fdf4f06c3cd0cd9c8ddc542a46 Mon Sep 17 00:00:00 2001 From: gurkenhabicht Date: Thu, 18 Jun 2020 20:22:18 +0200 Subject: [PATCH] reworked chunker, performance inrease --- README.md | 1 + src/configure/mod.rs | 7 +++++- src/main.rs | 56 +++++++++++++++++++++---------------------- src/parser.json | 10 ++++---- src/parser/mod.rs | 17 ++++++------- src/serializer/mod.rs | 2 +- 6 files changed, 47 insertions(+), 46 deletions(-) diff --git a/README.md b/README.md index 7ad3fd6..ee1f128 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ If this whole thing turns out to be viable, some future features may be: - Update file hashmap through inotify crate, during runtime. - Restoration of fragmented ipv4 packages. - SIMD (via autovectorization). Which is easy enough to do in Rust. +- Support of more protocols There are many other things left to be desired. diff --git a/src/configure/mod.rs b/src/configure/mod.rs index c00225d..3ffa766 100644 --- a/src/configure/mod.rs +++ b/src/configure/mod.rs @@ -57,7 +57,12 @@ pub fn from_json_file() -> Option { .to_owned(), insert_max: json.get("insert_max").unwrap().as_u64().unwrap() as usize, pcap_file: json.get("pcap_file").unwrap().as_str().unwrap().to_owned(), // Not in use atm - tablename: json.get("database_tablename").unwrap().as_str().unwrap().to_owned(), + tablename: json + .get("database_tablename") + .unwrap() + .as_str() + .unwrap() + .to_owned(), connection: format!( "host={} user={} password={}", json.get("database_host").unwrap().as_str().unwrap(), diff --git a/src/main.rs b/src/main.rs index 434cd81..cec8bcb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,17 +26,16 @@ fn query_string(insert_max: &usize, table_name: &str) -> String { insert_template } -#[tokio::main(core_threads = 4)] // Tokio is implemented for possible future use. +#[tokio::main(core_threads = 4)] // Tokio is implemented for possible future use. async fn main() -> Result<(), Error> { - /* Init values from file */ let config: configure::Config = configure::from_json_file().unwrap(); let pcap_map = configure::map_pcap_dir(&config.pcap_dir).unwrap(); - + // TODO: Create db table with pcap file hashes - // TODO: hash file metadata, so its state is comparable at times and can be written to a db table (and read e.g. after system crash) - // This db table should include UUIDs so it can be joined effectively with former runs - // TODO: Use inotfy crate to update pcap_map according to files created while parser is running + // TODO: hash file metadata, so its state is comparable with future file updates and can be written to a db table (and read e.g. after system crash) + // This db table should include UUIDs as primary keys, so it can be joined effectively with past and future runs. + // TODO: Use inotify crate to update pcap_map according to files created while parser is running /* db connection */ let (client, connection) = tokio_postgres::connect(&config.connection, NoTls).await?; @@ -46,11 +45,14 @@ async fn main() -> Result<(), Error> { } }); client - .execute(&*format!("DROP TABLE IF EXISTS {}", &config.tablename), &[]) + .execute(&*format!("DROP TABLE IF EXISTS {}", &config.tablename), &[]) .await?; client .execute( - &*format!("CREATE TABLE {} ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)", &config.tablename), + &*format!( + "CREATE TABLE {} ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)", + &config.tablename + ), &[], ) .await?; @@ -66,18 +68,12 @@ async fn main() -> Result<(), Error> { let packets_serialized = serializer::serialize_packets(v); // TODO: Tuning vector capacity according to mean average & std dev of packet sizes -// let mut v = Vec::::with_capacity(100000); -// v.extend(parser::parse(&_pcap_file, &config.filter, &config.regex_filter)); -// let mut packets_serialized = Vec::::with_capacity(100000); -// packets_serialized.extend(serializer::serialize_packets(v)); + // let mut v = Vec::::with_capacity(100000); + // v.extend(parser::parse(&_pcap_file, &config.filter, &config.regex_filter)); + // let mut packets_serialized = Vec::::with_capacity(100000); + // packets_serialized.extend(serializer::serialize_packets(v)); /* Do chunks and query data */ - let chunk_count = packets_serialized.len() / config.insert_max; - let remainder: usize = packets_serialized.len() % config.insert_max; - println!("chunks: {:?}", &chunk_count); - println!("remainder: {:?}", &remainder); - - let chunker = &packets_serialized.len() < &config.insert_max; match chunker { NON_CHUNKED => { @@ -94,24 +90,26 @@ async fn main() -> Result<(), Error> { let insert_str = query_string(&config.insert_max, &config.tablename); let statement = client.prepare(&insert_str).await?; - for _i in 0..chunk_count { - let (_input, _) = packets_serialized.split_at(config.insert_max); + for chunk in packets_serialized.chunks_exact(config.insert_max) { client - .query_raw( - &statement, - _input.iter().map(|p| p as &dyn ToSql), - ) + .query_raw(&statement, chunk.iter().map(|p| p as &dyn ToSql)) .await?; } - if 0 < remainder { - let rem_str = query_string(&remainder, &config.tablename); + let remainder_len = packets_serialized + .chunks_exact(config.insert_max) + .remainder() + .len(); + if 0 < remainder_len { + let rem_str = query_string(&remainder_len, &config.tablename); let statement = client.prepare(&rem_str).await?; - let (_garbage, _input) = - packets_serialized.split_at(packets_serialized.len() - remainder); client .query_raw( &statement, - _input.iter().map(|p| p as &dyn ToSql), + packets_serialized + .chunks_exact(config.insert_max) + .remainder() + .iter() + .map(|p| p as &dyn ToSql), ) .await?; } diff --git a/src/parser.json b/src/parser.json index 76609d5..17a2be2 100644 --- a/src/parser.json +++ b/src/parser.json @@ -1,11 +1,11 @@ { - "insert_max": 20000, - "filter": "ip6 && tcp", - "regex_filter": "(?:http|https)[[::punct::]]//([[::word::]]+\\.)*", + "insert_max": 16000, + "filter": "!ip6 && tcp", + "regex_filter": "(?:http|https)[[:punct:]]+[[:alnum:]]+[[:punct:]][[:alnum:]]+[[:punct:]](?:com|de|org)", "from_device": false, "parse_device": "enp7s0", - "pcap_file": "", - "pcap_dir": "../target", + "pcap_file": "not in use right now", + "pcap_dir": "../target/files", "database_tablename": "json_dump", "database_user": "postgres", "database_host": "localhost", diff --git a/src/parser/mod.rs b/src/parser/mod.rs index cd28777..d6ac4b0 100644 --- a/src/parser/mod.rs +++ b/src/parser/mod.rs @@ -196,9 +196,9 @@ fn flag_carnage(re: &Regex, payload: &[u8]) -> Option { flags.push_str(std::str::from_utf8(mat.as_bytes()).unwrap()); flags.push_str(";"); } - //if flags.len() > 0{ - //println!("{:?}", flags); - //} + if flags.len() > 0{ + println!("{:?}", flags); + } match 0 < flags.len() { false => None, true => Some(flags), @@ -215,18 +215,15 @@ pub fn parse(parse_file: &std::path::Path, filter_str: &str, regex_filter: &str) while let Ok(packet) = cap.next() { let mut me = QryData::new(); match linktype { - Linktype(1) => me.encap_en10mb(packet.data).unwrap(), // I reversed encapsulation/linktype bytes in pcap/pcapng file by looking at https://www.tcpdump.org/linktypes.html - Linktype(12) => me.encap_raw(packet.data).unwrap(), // Either this source + my implementation is wrong or pcap crate sucks + Linktype(1) => me.encap_en10mb(packet.data).unwrap(), // I reversed encapsulation/linktype bytes in pcap/pcapng file by looking at https://www.tcpdump.org/linktypes.html + Linktype(12) => me.encap_raw(packet.data).unwrap(), // Either this source + my implementation is wrong or pcap crate sucks _ => (), }; me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64; me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex overhead is between 4-9% --single threaded-- on complete packet [u8] data - //v.push(me.clone()); - - if me.reg_res.is_some(){ - println!("{:?}", &me.reg_res); - } + //v.push(me.clone()); + v.push(QryData { id: 0, time: me.time, diff --git a/src/serializer/mod.rs b/src/serializer/mod.rs index 15d42e9..cf8035e 100644 --- a/src/serializer/mod.rs +++ b/src/serializer/mod.rs @@ -33,7 +33,7 @@ pub fn serialize_packets(v: Vec) -> Vec { .par_iter() .map(|x| serde_json::to_value(x).unwrap()) .collect(); -// let packets_serialized: Vec = v.par_iter().map(|x| json!(x)).collect(); + // let packets_serialized: Vec = v.par_iter().map(|x| json!(x)).collect(); packets_serialized }