From 2d96ddba3ad6374b81c2337106b8d4ce7227d9f7 Mon Sep 17 00:00:00 2001 From: gurkenhabicht Date: Wed, 27 May 2020 00:42:07 +0200 Subject: [PATCH] clean up --- Cargo.lock | 7 - Cargo.toml | 3 +- src/' | 215 ------------------------------- src/main.rs | 59 ++------- src/parser.json | 2 +- src/{parser.rs => parser/mod.rs} | 0 src/parser/reg_parser.rs | 22 ---- src/serializer/mod.rs | 39 ++++++ 8 files changed, 54 insertions(+), 293 deletions(-) delete mode 100644 src/' rename src/{parser.rs => parser/mod.rs} (100%) delete mode 100644 src/parser/reg_parser.rs create mode 100644 src/serializer/mod.rs diff --git a/Cargo.lock b/Cargo.lock index e6e8b48..e988a1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -441,12 +441,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "mime" -version = "0.3.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" - [[package]] name = "mio" version = "0.6.22" @@ -1006,7 +1000,6 @@ dependencies = [ "byteorder", "eui48", "libc", - "mime", "pcap", "rayon", "regex", diff --git a/Cargo.toml b/Cargo.toml index c76170b..d821165 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "tpcpr" version = "0.1.0" -authors = ["gurkenhabicht "] +authors = ["Stefan Friese "] edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -21,4 +21,3 @@ serde_json = { version = "1.0", features = ["raw_value"] } serde = { version = "1.0.3", features = ["derive"] } rayon = "1.3" regex = "1.3.7" -mime = "*" diff --git a/src/' b/src/' deleted file mode 100644 index 30eebe6..0000000 --- a/src/' +++ /dev/null @@ -1,215 +0,0 @@ -extern crate serde_json; -extern crate tokio; -extern crate tokio_postgres; -extern crate mime; -use rayon::prelude::*; -use serde::ser::{Serialize, SerializeStruct, Serializer}; -use serde_json::json; -use std::fs::File; -mod parser; -use std::fs; -use std::io::prelude::*; -use tokio_postgres::types::ToSql; -use tokio_postgres::{Error, NoTls}; - -impl Serialize for parser::QryData { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - // 34 (42) is the number of fields in the struct. - let mut state = serializer.serialize_struct("parser::QryData", 34)?; - state.serialize_field("time", &self.time)?; - state.serialize_field("ether_header.ether_dhost", &self.ether_header.ether_dhost)?; - state.serialize_field("ether_header.ether_shost", &self.ether_header.ether_shost)?; - state.serialize_field("ether_header.ether_type", &self.ether_header.ether_type)?; - state.serialize_field("ipv4_header", &self.ipv4_header)?; - state.serialize_field("ipv6_header", &self.ipv6_header)?; - state.serialize_field("tcp_header", &self.tcp_header)?; - state.serialize_field("udp_header", &self.udp_header)?; - state.serialize_field("data", &self.data)?; - state.serialize_field("reg_res", &self.reg_res)?; - state.end() - } -} - -fn serialize_packets(v: Vec) -> Vec { - // let mut packets_serialized: Vec<_> = Vec::new(); - - // for packet in v.iter() { - // // packets_serialized.push(json!(&packet)); - // } - - /* rayon parallelized */ - let packets_serialized = v.par_iter().map(|x| json!(x)).collect(); - - packets_serialized -} - -fn query_string(insert_max: &usize) -> String { - let mut insert_template: String = "INSERT INTO json_dump (packet) Values ".to_owned(); - - for insert in 0..insert_max - 1 { - insert_template.push_str(&(format!("(${}), ", insert + 1))); - } - insert_template.push_str(&(format!("(${})", insert_max))); - - insert_template -} - -#[tokio::main(core_threads = 4)] // By default, tokio_postgres uses the tokio crate as its runtime. -async fn main() -> Result<(), Error> { - /* Init values from file */ - let file = File::open("parser.json").expect("file should open read only"); - - let json: serde_json::Value = serde_json::from_reader(file).unwrap(); - let filter = json.get("filter").unwrap().as_str().unwrap(); - let insert_max = json.get("insert_max").unwrap().as_u64().unwrap() as usize; - let pcap_file = json.get("pcap_file").unwrap().as_str().unwrap(); - let host = [ - "host=", - json.get("database_host").unwrap().as_str().unwrap(), - ] - .join(""); - let user = [ - "user=", - json.get("database_user").unwrap().as_str().unwrap(), - ] - .join(""); - let password = [ - "password=", - json.get("database_password").unwrap().as_str().unwrap(), - ] - .join(""); - let connection = [host, user, password].join(" "); - let device = json.get("parse_device").unwrap().as_str().unwrap(); - let is_device = json.get("from_device").unwrap().as_bool().unwrap(); - let pcap_dir = json.get("pcap_dir").unwrap().as_str().unwrap(); - - if let Ok(entries) = fs::read_dir(pcap_dir) { - for entry in entries { - if let Ok(entry) = entry { - if let Ok(file_type) = entry.file_type() { - println!("{:?}: {:?}, {:?}", entry.path(), file_type, entry.metadata().unwrap().permissions()); - if entry.is_file() { - let magic_number: [u8;4] = File::open(entry.to_owned()).read_exact(mime_type).unwrap(); - println!("{:?}", magic_number); - } - } else { - println!("Couldn't get file type for {:?}", entry.path()); - } - } - } - } - /* db connection */ - let (client, connection) = tokio_postgres::connect(&connection, NoTls).await?; - - tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("connection error: {}", e); - } - }); - - client - .execute("DROP TABLE IF EXISTS json_dump", &[]) - .await?; - client - .execute( - "CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)", - &[], - ) - .await?; - - /* device or file input */ - if false == is_device { - let v: Vec = parser::parse(&pcap_file, &filter); - let packets_serialized = serialize_packets(v); - - /* Query */ - //let insert_max = 60; - let chunk_count = packets_serialized.len() / insert_max; - let remainder: usize = packets_serialized.len() % insert_max; - let chunker = &packets_serialized.len() < &insert_max; - match chunker { - true => { - let insert_str = query_string(&packets_serialized.len()); - let statement_false = client.prepare(&insert_str).await?; - client - .query_raw( - &statement_false, - packets_serialized.iter().map(|p| p as &dyn ToSql), - ) - .await?; - } - - false => { - let insert_str = query_string(&insert_max); - let statement = client.prepare(&insert_str).await?; - - for _i in 0..chunk_count { - let (_input, _) = packets_serialized.split_at(insert_max); - client - .query_raw(&statement, _input.to_vec().iter().map(|p| p as &dyn ToSql)) - .await?; - } - - println!("Packets, total:{:?}", packets_serialized.len()); - println!("Chunks, total:{}", chunk_count); - println!("Chunks, remainder:{}", remainder); - - if remainder > 0 { - let rem_str = query_string(&remainder); - let statement_remainder = client.prepare(&rem_str).await?; - let (_garbage, _input) = - packets_serialized.split_at(packets_serialized.len() - remainder); - client - .query_raw( - &statement_remainder, - _input.to_vec().iter().map(|p| p as &dyn ToSql), - ) - .await?; - } - } - } - } else { - let insert_str = query_string(&insert_max); - let statement = client.prepare(&insert_str).await?; - loop { - let v: Vec = parser::parse_device(&device, &filter, &insert_max); - let packets_serialized = serialize_packets(v); - client - .query_raw( - &statement, - packets_serialized.iter().map(|p| p as &dyn ToSql), - ) - .await?; - } - } - - Ok(()) -} - -#[test] -fn test_insert_json() { - use serde_json::json; - let mut client = - Client::connect("host=localhost user=postgres password=password", NoTls).unwrap(); - let john = json!({ - "name": "John Doe", - "age": 43, - "phones": [ - "+44 1234567", - "+44 2345678" - ], - "empty": [] - }); - - client - .execute("DROP TABLE IF EXISTS json_dump", &[]) - .unwrap(); - client.execute( - "CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, data json NOT NULL)", - &[], - ); - client.query("INSERT INTO json_dump ( data ) VALUES ($1)", &[&john]); -} diff --git a/src/main.rs b/src/main.rs index ba083d8..08b777e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,54 +1,18 @@ extern crate serde_json; extern crate tokio; extern crate tokio_postgres; -extern crate mime; -use rayon::prelude::*; -use serde::ser::{Serialize, SerializeStruct, Serializer}; -use serde_json::json; use std::fs::File; mod parser; +mod serializer; use std::fs; use std::io::prelude::*; +use std::collections::HashMap; use tokio_postgres::types::ToSql; use tokio_postgres::{Error, NoTls}; const PCAPNG_SIGNATURE: [u8;4] = [0x0a, 0x0d, 0x0d, 0x0a]; const PCAP_SIGNATURE: [u8;4] = [0xed, 0xab, 0xee, 0xdb]; -impl Serialize for parser::QryData { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - // 34 (42) is the number of fields in the struct. - let mut state = serializer.serialize_struct("parser::QryData", 34)?; - state.serialize_field("time", &self.time)?; - state.serialize_field("ether_header.ether_dhost", &self.ether_header.ether_dhost)?; - state.serialize_field("ether_header.ether_shost", &self.ether_header.ether_shost)?; - state.serialize_field("ether_header.ether_type", &self.ether_header.ether_type)?; - state.serialize_field("ipv4_header", &self.ipv4_header)?; - state.serialize_field("ipv6_header", &self.ipv6_header)?; - state.serialize_field("tcp_header", &self.tcp_header)?; - state.serialize_field("udp_header", &self.udp_header)?; - state.serialize_field("data", &self.data)?; - state.serialize_field("reg_res", &self.reg_res)?; - state.end() - } -} - -fn serialize_packets(v: Vec) -> Vec { - // let mut packets_serialized: Vec<_> = Vec::new(); - - // for packet in v.iter() { - // // packets_serialized.push(json!(&packet)); - // } - - /* rayon parallelized */ - let packets_serialized = v.par_iter().map(|x| json!(x)).collect(); - - packets_serialized -} - fn query_string(insert_max: &usize) -> String { let mut insert_template: String = "INSERT INTO json_dump (packet) Values ".to_owned(); @@ -89,20 +53,22 @@ async fn main() -> Result<(), Error> { let is_device = json.get("from_device").unwrap().as_bool().unwrap(); let pcap_dir = json.get("pcap_dir").unwrap().as_str().unwrap(); + let mut pcap_map = HashMap::new(); if let Ok(entries) = fs::read_dir(pcap_dir) { for entry in entries { if let Ok(entry) = entry { - if let Ok(file_type) = entry.file_type() { - println!("{:?}: {:?}, {:?}", entry.path(), file_type, entry.metadata().unwrap().permissions()); + if let Ok(_file_type) = entry.file_type() { if entry.metadata().unwrap().is_file() { let mut magic_number: [u8;4] = [0;4]; let _signature = File::open(entry.path().to_owned()).unwrap().read_exact(&mut magic_number).unwrap(); match magic_number { - PCAPNG_SIGNATURE => println!("pcapng!"), - PCAP_SIGNATURE => println!("pcap!"), - _ => println!("uninterresting"), + PCAPNG_SIGNATURE => pcap_map.insert(entry.path(), entry.metadata()), + PCAP_SIGNATURE => pcap_map.insert(entry.path(), entry.metadata()), + _ => None - } + }; + // println!("{:?}", &entry.metadata().unwrap().modified()); + } } else { println!("Couldn't get file type for {:?}", entry.path()); @@ -110,6 +76,7 @@ async fn main() -> Result<(), Error> { } } } + println!("{:?}", pcap_map.iter()); /* db connection */ let (client, connection) = tokio_postgres::connect(&connection, NoTls).await?; @@ -132,7 +99,7 @@ async fn main() -> Result<(), Error> { /* device or file input */ if false == is_device { let v: Vec = parser::parse(&pcap_file, &filter); - let packets_serialized = serialize_packets(v); + let packets_serialized = serializer::serialize_packets(v); /* Query */ //let insert_max = 60; @@ -185,7 +152,7 @@ async fn main() -> Result<(), Error> { let statement = client.prepare(&insert_str).await?; loop { let v: Vec = parser::parse_device(&device, &filter, &insert_max); - let packets_serialized = serialize_packets(v); + let packets_serialized = serializer::serialize_packets(v); client .query_raw( &statement, diff --git a/src/parser.json b/src/parser.json index 6b4fc45..0538f38 100644 --- a/src/parser.json +++ b/src/parser.json @@ -1,6 +1,6 @@ { "insert_max": 20000, - "filter": "udp && ip6", + "filter": "tcp && ip6", "from_device": false, "parse_device": "enp7s0", "pcap_file": "../target/wohnung2.pcapng", diff --git a/src/parser.rs b/src/parser/mod.rs similarity index 100% rename from src/parser.rs rename to src/parser/mod.rs diff --git a/src/parser/reg_parser.rs b/src/parser/reg_parser.rs deleted file mode 100644 index 62b5632..0000000 --- a/src/parser/reg_parser.rs +++ /dev/null @@ -1,22 +0,0 @@ -//extern crate rayon; -//extern crate regex; -//use regex::Regex; -// -//struct Regex { -// string: &'static str, -// regex: ::regex::bytes::Regex, -//} -// -//impl Regex { -// fn new (string: &'static str) ->Regex { -// Regex{ -// string: string, -// regex: ::regex::bytes::Regex::new(string).unwrap(), -// } -// } -// -//} -// -// -// -//pub fn parse_regex ( reg_str: &str, diff --git a/src/serializer/mod.rs b/src/serializer/mod.rs new file mode 100644 index 0000000..e93093d --- /dev/null +++ b/src/serializer/mod.rs @@ -0,0 +1,39 @@ +extern crate serde_json; +use rayon::prelude::*; +use serde::ser::{Serialize, SerializeStruct, Serializer}; +use serde_json::json; +use crate::parser; + +impl Serialize for parser::QryData { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + // 34 (42) is the number of fields in the struct. + let mut state = serializer.serialize_struct("parser::QryData", 34)?; + state.serialize_field("time", &self.time)?; + state.serialize_field("ether_header.ether_dhost", &self.ether_header.ether_dhost)?; + state.serialize_field("ether_header.ether_shost", &self.ether_header.ether_shost)?; + state.serialize_field("ether_header.ether_type", &self.ether_header.ether_type)?; + state.serialize_field("ipv4_header", &self.ipv4_header)?; + state.serialize_field("ipv6_header", &self.ipv6_header)?; + state.serialize_field("tcp_header", &self.tcp_header)?; + state.serialize_field("udp_header", &self.udp_header)?; + state.serialize_field("data", &self.data)?; + state.serialize_field("reg_res", &self.reg_res)?; + state.end() + } +} + +pub fn serialize_packets(v: Vec) -> Vec { + // let mut packets_serialized: Vec<_> = Vec::new(); + + // for packet in v.iter() { + // // packets_serialized.push(json!(&packet)); + // } + + /* rayon parallelized */ + let packets_serialized = v.par_iter().map(|x| json!(x)).collect(); + + packets_serialized +}