diff --git a/Cargo.lock b/Cargo.lock index e988a1e..e6e8b48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -441,6 +441,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + [[package]] name = "mio" version = "0.6.22" @@ -1000,6 +1006,7 @@ dependencies = [ "byteorder", "eui48", "libc", + "mime", "pcap", "rayon", "regex", diff --git a/Cargo.toml b/Cargo.toml index daf7689..c76170b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,3 +21,4 @@ serde_json = { version = "1.0", features = ["raw_value"] } serde = { version = "1.0.3", features = ["derive"] } rayon = "1.3" regex = "1.3.7" +mime = "*" diff --git a/src/' b/src/' new file mode 100644 index 0000000..30eebe6 --- /dev/null +++ b/src/' @@ -0,0 +1,215 @@ +extern crate serde_json; +extern crate tokio; +extern crate tokio_postgres; +extern crate mime; +use rayon::prelude::*; +use serde::ser::{Serialize, SerializeStruct, Serializer}; +use serde_json::json; +use std::fs::File; +mod parser; +use std::fs; +use std::io::prelude::*; +use tokio_postgres::types::ToSql; +use tokio_postgres::{Error, NoTls}; + +impl Serialize for parser::QryData { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + // 34 (42) is the number of fields in the struct. + let mut state = serializer.serialize_struct("parser::QryData", 34)?; + state.serialize_field("time", &self.time)?; + state.serialize_field("ether_header.ether_dhost", &self.ether_header.ether_dhost)?; + state.serialize_field("ether_header.ether_shost", &self.ether_header.ether_shost)?; + state.serialize_field("ether_header.ether_type", &self.ether_header.ether_type)?; + state.serialize_field("ipv4_header", &self.ipv4_header)?; + state.serialize_field("ipv6_header", &self.ipv6_header)?; + state.serialize_field("tcp_header", &self.tcp_header)?; + state.serialize_field("udp_header", &self.udp_header)?; + state.serialize_field("data", &self.data)?; + state.serialize_field("reg_res", &self.reg_res)?; + state.end() + } +} + +fn serialize_packets(v: Vec) -> Vec { + // let mut packets_serialized: Vec<_> = Vec::new(); + + // for packet in v.iter() { + // // packets_serialized.push(json!(&packet)); + // } + + /* rayon parallelized */ + let packets_serialized = v.par_iter().map(|x| json!(x)).collect(); + + packets_serialized +} + +fn query_string(insert_max: &usize) -> String { + let mut insert_template: String = "INSERT INTO json_dump (packet) Values ".to_owned(); + + for insert in 0..insert_max - 1 { + insert_template.push_str(&(format!("(${}), ", insert + 1))); + } + insert_template.push_str(&(format!("(${})", insert_max))); + + insert_template +} + +#[tokio::main(core_threads = 4)] // By default, tokio_postgres uses the tokio crate as its runtime. +async fn main() -> Result<(), Error> { + /* Init values from file */ + let file = File::open("parser.json").expect("file should open read only"); + + let json: serde_json::Value = serde_json::from_reader(file).unwrap(); + let filter = json.get("filter").unwrap().as_str().unwrap(); + let insert_max = json.get("insert_max").unwrap().as_u64().unwrap() as usize; + let pcap_file = json.get("pcap_file").unwrap().as_str().unwrap(); + let host = [ + "host=", + json.get("database_host").unwrap().as_str().unwrap(), + ] + .join(""); + let user = [ + "user=", + json.get("database_user").unwrap().as_str().unwrap(), + ] + .join(""); + let password = [ + "password=", + json.get("database_password").unwrap().as_str().unwrap(), + ] + .join(""); + let connection = [host, user, password].join(" "); + let device = json.get("parse_device").unwrap().as_str().unwrap(); + let is_device = json.get("from_device").unwrap().as_bool().unwrap(); + let pcap_dir = json.get("pcap_dir").unwrap().as_str().unwrap(); + + if let Ok(entries) = fs::read_dir(pcap_dir) { + for entry in entries { + if let Ok(entry) = entry { + if let Ok(file_type) = entry.file_type() { + println!("{:?}: {:?}, {:?}", entry.path(), file_type, entry.metadata().unwrap().permissions()); + if entry.is_file() { + let magic_number: [u8;4] = File::open(entry.to_owned()).read_exact(mime_type).unwrap(); + println!("{:?}", magic_number); + } + } else { + println!("Couldn't get file type for {:?}", entry.path()); + } + } + } + } + /* db connection */ + let (client, connection) = tokio_postgres::connect(&connection, NoTls).await?; + + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + + client + .execute("DROP TABLE IF EXISTS json_dump", &[]) + .await?; + client + .execute( + "CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)", + &[], + ) + .await?; + + /* device or file input */ + if false == is_device { + let v: Vec = parser::parse(&pcap_file, &filter); + let packets_serialized = serialize_packets(v); + + /* Query */ + //let insert_max = 60; + let chunk_count = packets_serialized.len() / insert_max; + let remainder: usize = packets_serialized.len() % insert_max; + let chunker = &packets_serialized.len() < &insert_max; + match chunker { + true => { + let insert_str = query_string(&packets_serialized.len()); + let statement_false = client.prepare(&insert_str).await?; + client + .query_raw( + &statement_false, + packets_serialized.iter().map(|p| p as &dyn ToSql), + ) + .await?; + } + + false => { + let insert_str = query_string(&insert_max); + let statement = client.prepare(&insert_str).await?; + + for _i in 0..chunk_count { + let (_input, _) = packets_serialized.split_at(insert_max); + client + .query_raw(&statement, _input.to_vec().iter().map(|p| p as &dyn ToSql)) + .await?; + } + + println!("Packets, total:{:?}", packets_serialized.len()); + println!("Chunks, total:{}", chunk_count); + println!("Chunks, remainder:{}", remainder); + + if remainder > 0 { + let rem_str = query_string(&remainder); + let statement_remainder = client.prepare(&rem_str).await?; + let (_garbage, _input) = + packets_serialized.split_at(packets_serialized.len() - remainder); + client + .query_raw( + &statement_remainder, + _input.to_vec().iter().map(|p| p as &dyn ToSql), + ) + .await?; + } + } + } + } else { + let insert_str = query_string(&insert_max); + let statement = client.prepare(&insert_str).await?; + loop { + let v: Vec = parser::parse_device(&device, &filter, &insert_max); + let packets_serialized = serialize_packets(v); + client + .query_raw( + &statement, + packets_serialized.iter().map(|p| p as &dyn ToSql), + ) + .await?; + } + } + + Ok(()) +} + +#[test] +fn test_insert_json() { + use serde_json::json; + let mut client = + Client::connect("host=localhost user=postgres password=password", NoTls).unwrap(); + let john = json!({ + "name": "John Doe", + "age": 43, + "phones": [ + "+44 1234567", + "+44 2345678" + ], + "empty": [] + }); + + client + .execute("DROP TABLE IF EXISTS json_dump", &[]) + .unwrap(); + client.execute( + "CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, data json NOT NULL)", + &[], + ); + client.query("INSERT INTO json_dump ( data ) VALUES ($1)", &[&john]); +} diff --git a/src/main.rs b/src/main.rs index d77672b..3ae93d9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,19 @@ extern crate serde_json; extern crate tokio; extern crate tokio_postgres; +extern crate mime; use rayon::prelude::*; use serde::ser::{Serialize, SerializeStruct, Serializer}; use serde_json::json; use std::fs::File; mod parser; +use std::fs; +use std::io::prelude::*; use tokio_postgres::types::ToSql; use tokio_postgres::{Error, NoTls}; -use std::fs; +const PCAPNG_SIGNATURE: [u8;4] = [0x0a, 0x0d, 0x0d, 0x0a]; +const PCAP_SIGNATURE: [u8;4] = [0xed, 0xab, 0xee, 0xdb]; impl Serialize for parser::QryData { fn serialize(&self, serializer: S) -> Result @@ -83,7 +87,29 @@ async fn main() -> Result<(), Error> { let connection = [host, user, password].join(" "); let device = json.get("parse_device").unwrap().as_str().unwrap(); let is_device = json.get("from_device").unwrap().as_bool().unwrap(); + let pcap_dir = json.get("pcap_dir").unwrap().as_str().unwrap(); + if let Ok(entries) = fs::read_dir(pcap_dir) { + for entry in entries { + if let Ok(entry) = entry { + if let Ok(file_type) = entry.file_type() { + println!("{:?}: {:?}, {:?}", entry.path(), file_type, entry.metadata().unwrap().permissions()); + if entry.metadata().unwrap().is_file() { + let mut magic_number: [u8;4] = [0;4]; + let signature = File::open(entry.path().to_owned()).unwrap().read_exact(&mut magic_number).unwrap(); + match magic_number { + PCAPNG_SIGNATURE => println!("pcapng!"), + PCAP_SIGNATURE => println!("pcap!"), + _ => println!("uninterresting"), + + } + } + } else { + println!("Couldn't get file type for {:?}", entry.path()); + } + } + } + } /* db connection */ let (client, connection) = tokio_postgres::connect(&connection, NoTls).await?; @@ -92,9 +118,7 @@ async fn main() -> Result<(), Error> { eprintln!("connection error: {}", e); } }); - let metadata = fs::metadata(pcap_file).unwrap(); - println!("{:?}", &metadata.file_type()); - + client .execute("DROP TABLE IF EXISTS json_dump", &[]) .await?; diff --git a/src/parser.json b/src/parser.json index cd7144c..6b4fc45 100644 --- a/src/parser.json +++ b/src/parser.json @@ -4,6 +4,7 @@ "from_device": false, "parse_device": "enp7s0", "pcap_file": "../target/wohnung2.pcapng", + "pcap_dir": "../target", "database_user": "postgres", "database_host": "localhost", "database_password": "password" diff --git a/src/parser.rs b/src/parser.rs index ddb60d4..38a510d 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -31,17 +31,17 @@ pub struct QryData { pub ipv6_header: Option, pub tcp_header: Option, pub udp_header: Option, - pub reg_res: Option, + pub reg_res: Option, } fn flag_carnage(re: &Regex, payload: &[u8]) -> Option { - let mut flags: String = String::new() ; - for mat in re.find_iter(payload) { - flags.push_str( std::str::from_utf8(mat.as_bytes()).unwrap() ); + let mut flags: String = String::new(); + for mat in re.find_iter(payload) { + flags.push_str(std::str::from_utf8(mat.as_bytes()).unwrap()); } - match 0 < flags.len() { + match 0 < flags.len() { false => None, - true => Some(flags) + true => Some(flags), } } @@ -56,8 +56,8 @@ pub fn parse(parse_file: &str, filter_str: &str) -> Vec { ipv4_header: None::, ipv6_header: None::, tcp_header: None::, - udp_header: None::, - reg_res: None::, + udp_header: None::, + reg_res: None::, }; let mut v: Vec = Vec::new(); @@ -74,54 +74,53 @@ pub fn parse(parse_file: &str, filter_str: &str) -> Vec { me.ipv6_header = None::; me.ipv4_header = Some(packet_handler::ip_handler(packet.data)).unwrap(); match me.ipv4_header.unwrap().ip_protocol as usize { - TCP => {me.tcp_header = Some(packet_handler::tcp_handler( - me.ipv4_header.unwrap().ip_ihl, - packet.data, - )) - .unwrap(); - me.data = packet_handler::payload_handler( - me.ipv4_header.unwrap().ip_ihl, - me.tcp_header.unwrap().data_offset, - packet.data, - ); - }, - UDP => { me.udp_header = Some(packet_handler::udp_handler( + TCP => { + me.tcp_header = Some(packet_handler::tcp_handler( me.ipv4_header.unwrap().ip_ihl, - packet.data - )).unwrap(); - me.data = packet_handler::payload_handler( - me.ipv4_header.unwrap().ip_ihl, - 7, - packet.data ); - }, + packet.data, + )) + .unwrap(); + me.data = packet_handler::payload_handler( + me.ipv4_header.unwrap().ip_ihl, + me.tcp_header.unwrap().data_offset, + packet.data, + ); + } + UDP => { + me.udp_header = Some(packet_handler::udp_handler( + me.ipv4_header.unwrap().ip_ihl, + packet.data, + )) + .unwrap(); + me.data = packet_handler::payload_handler( + me.ipv4_header.unwrap().ip_ihl, + 7, + packet.data, + ); + } _ => println!("network protocol not implemented"), } - }, + } ETH_P_IPV6 => { me.ipv4_header = None::; me.ipv6_header = Some(packet_handler::ipv6_handler(packet.data)).unwrap(); match me.ipv6_header.unwrap().next_header as usize { - TCP => { + TCP => { me.tcp_header = Some(packet_handler::tcp_handler(10, packet.data)).unwrap(); me.data = packet_handler::payload_handler( - 10, - me.tcp_header.unwrap().data_offset, - packet.data, - ); - }, - UDP => { me.udp_header = Some(packet_handler::udp_handler( 10, - packet.data - )).unwrap(); - me.data = packet_handler::payload_handler( - 10, - 7, - packet.data); - }, - _ => println!("network protocol not implemented"), + me.tcp_header.unwrap().data_offset, + packet.data, + ); + } + UDP => { + me.udp_header = Some(packet_handler::udp_handler(10, packet.data)).unwrap(); + me.data = packet_handler::payload_handler(10, 7, packet.data); + } + _ => println!("network protocol not implemented"), } - }, - _ => println!("network protocol not implemented"), + } + _ => println!("network protocol not implemented"), } v.push(QryData { id: 0, @@ -150,7 +149,7 @@ pub fn parse_device(parse_device: &str, filter_str: &str, insert_max: &usize) -> ipv6_header: None::, tcp_header: None::, udp_header: None::, - reg_res: None::, + reg_res: None::, }; let mut v: Vec = Vec::new(); let mut cap = Capture::from_device(parse_device).unwrap().open().unwrap(); @@ -167,54 +166,53 @@ pub fn parse_device(parse_device: &str, filter_str: &str, insert_max: &usize) -> me.ipv6_header = None::; me.ipv4_header = Some(packet_handler::ip_handler(packet.data)).unwrap(); match me.ipv4_header.unwrap().ip_protocol as usize { - TCP => {me.tcp_header = Some(packet_handler::tcp_handler( - me.ipv4_header.unwrap().ip_ihl, - packet.data, - )) - .unwrap(); - me.data = packet_handler::payload_handler( - me.ipv4_header.unwrap().ip_ihl, - me.tcp_header.unwrap().data_offset, - packet.data, - ); - }, - UDP => { me.udp_header = Some(packet_handler::udp_handler( + TCP => { + me.tcp_header = Some(packet_handler::tcp_handler( me.ipv4_header.unwrap().ip_ihl, - packet.data - )).unwrap(); - me.data = packet_handler::payload_handler( - me.ipv4_header.unwrap().ip_ihl, - 7, - packet.data ); - }, + packet.data, + )) + .unwrap(); + me.data = packet_handler::payload_handler( + me.ipv4_header.unwrap().ip_ihl, + me.tcp_header.unwrap().data_offset, + packet.data, + ); + } + UDP => { + me.udp_header = Some(packet_handler::udp_handler( + me.ipv4_header.unwrap().ip_ihl, + packet.data, + )) + .unwrap(); + me.data = packet_handler::payload_handler( + me.ipv4_header.unwrap().ip_ihl, + 7, + packet.data, + ); + } _ => println!("network protocol not implemented"), } - }, + } ETH_P_IPV6 => { me.ipv4_header = None::; me.ipv6_header = Some(packet_handler::ipv6_handler(packet.data)).unwrap(); match me.ipv6_header.unwrap().next_header as usize { - TCP => { + TCP => { me.tcp_header = Some(packet_handler::tcp_handler(10, packet.data)).unwrap(); me.data = packet_handler::payload_handler( - 10, - me.tcp_header.unwrap().data_offset, - packet.data, - ); - }, - UDP => { me.udp_header = Some(packet_handler::udp_handler( - 10, - packet.data - )).unwrap(); - me.data = packet_handler::payload_handler( - 10, - 7, - packet.data); - }, - _ => println!("network protocol not implemented"), + 10, + me.tcp_header.unwrap().data_offset, + packet.data, + ); + } + UDP => { + me.udp_header = Some(packet_handler::udp_handler(10, packet.data)).unwrap(); + me.data = packet_handler::payload_handler(10, 7, packet.data); + } + _ => println!("network protocol not implemented"), } - }, - _ => println!("network protocol not implemented"), + } + _ => println!("network protocol not implemented"), } v.push(QryData { id: 0, @@ -227,7 +225,7 @@ pub fn parse_device(parse_device: &str, filter_str: &str, insert_max: &usize) -> udp_header: me.udp_header, reg_res: me.reg_res, }); - + if &v.len() >= insert_max { break 'parse; } diff --git a/src/parser/packet_handler.rs b/src/parser/packet_handler.rs index 6228be3..baa9b6b 100644 --- a/src/parser/packet_handler.rs +++ b/src/parser/packet_handler.rs @@ -8,7 +8,7 @@ use eui48::{Eui48, MacAddress}; use serde::{Deserialize, Serialize}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; -// TODO Benchmark slice refrence vs. bitfield +// TODO Benchmark slice refrence vs. bitfield /* ethernet */ const ETH_ALEN: usize = 6; @@ -93,8 +93,8 @@ impl + AsMut<[u8]>> BitfieldIpV4Header { } pub fn ip_handler(packet_data: &[u8]) -> Option { - let mut raw_hdr: [u8;20] = [0;20]; - raw_hdr.copy_from_slice(&packet_data[ETHER_HDRLEN..ETHER_HDRLEN+20]); + let mut raw_hdr: [u8; 20] = [0; 20]; + raw_hdr.copy_from_slice(&packet_data[ETHER_HDRLEN..ETHER_HDRLEN + 20]); let ip_header = BitfieldIpV4Header(raw_hdr); Some(IpV4Header { @@ -129,8 +129,8 @@ pub struct IpV6Header { } pub fn ipv6_handler(packet_data: &[u8]) -> Option { - let mut raw_hdr: [u8;40] = [0;40]; - raw_hdr.copy_from_slice(&packet_data[ETHER_HDRLEN..ETHER_HDRLEN+40]); + let mut raw_hdr: [u8; 40] = [0; 40]; + raw_hdr.copy_from_slice(&packet_data[ETHER_HDRLEN..ETHER_HDRLEN + 40]); Some(IpV6Header { version: (&raw_hdr[0] & 0xf0) >> 4, @@ -238,8 +238,10 @@ bitfield! { } pub fn tcp_handler(ip_hlen: u32, packet_data: &[u8]) -> Option { - let mut raw_hdr: [u8; 20] = [0;20]; - raw_hdr.copy_from_slice(&packet_data[ETHER_HDRLEN+ip_hlen as usize * 4..ETHER_HDRLEN+ip_hlen as usize * 4 + 20]); + let mut raw_hdr: [u8; 20] = [0; 20]; + raw_hdr.copy_from_slice( + &packet_data[ETHER_HDRLEN + ip_hlen as usize * 4..ETHER_HDRLEN + ip_hlen as usize * 4 + 20], + ); let tcp_header = BitfieldTcpHeader(raw_hdr); Some(TcpHeader { @@ -307,9 +309,9 @@ impl + AsMut<[u8]>> BitfieldArpHeader { // TODO: Fix this crap pub fn arp_handler(packet_data: &[u8]) -> Option { - let mut raw_hdr: [u8;28] = [0;28]; + let mut raw_hdr: [u8; 28] = [0; 28]; raw_hdr.copy_from_slice(&packet_data[ETHER_HDRLEN..ETHER_HDRLEN + 28]); - + let arp_header = BitfieldArpHeader(raw_hdr); let _sha: [u8; 6] = [0; 6]; let _tha: [u8; 6] = [0; 6]; @@ -340,8 +342,10 @@ pub struct UdpHeader { } pub fn udp_handler(ip_hlen: u32, packet_data: &[u8]) -> Option { - let mut raw_hdr: [u8;8] = [0;8]; - raw_hdr.copy_from_slice(&packet_data[ETHER_HDRLEN + ip_hlen as usize * 4..ETHER_HDRLEN + ip_hlen as usize * 4 + 8]); + let mut raw_hdr: [u8; 8] = [0; 8]; + raw_hdr.copy_from_slice( + &packet_data[ETHER_HDRLEN + ip_hlen as usize * 4..ETHER_HDRLEN + ip_hlen as usize * 4 + 8], + ); Some(UdpHeader { source_port: BigEndian::read_u16(&raw_hdr[0..2]),