extern crate serde_json; extern crate tokio; extern crate tokio_postgres; use rayon::prelude::*; use serde::ser::{Serialize, SerializeStruct, Serializer}; use serde_json::json; use std::fs::File; mod parser; use tokio_postgres::types::ToSql; use tokio_postgres::{Error, NoTls}; use std::fs; impl Serialize for parser::QryData { fn serialize(&self, serializer: S) -> Result where S: Serializer, { // 34 (42) is the number of fields in the struct. let mut state = serializer.serialize_struct("parser::QryData", 34)?; state.serialize_field("time", &self.time)?; state.serialize_field("ether_header.ether_dhost", &self.ether_header.ether_dhost)?; state.serialize_field("ether_header.ether_shost", &self.ether_header.ether_shost)?; state.serialize_field("ether_header.ether_type", &self.ether_header.ether_type)?; state.serialize_field("ipv4_header", &self.ipv4_header)?; state.serialize_field("ipv6_header", &self.ipv6_header)?; state.serialize_field("tcp_header", &self.tcp_header)?; state.serialize_field("udp_header", &self.udp_header)?; state.serialize_field("data", &self.data)?; state.serialize_field("reg_res", &self.reg_res)?; state.end() } } fn serialize_packets(v: Vec) -> Vec { // let mut packets_serialized: Vec<_> = Vec::new(); // for packet in v.iter() { // // packets_serialized.push(json!(&packet)); // } /* rayon parallelized */ let packets_serialized = v.par_iter().map(|x| json!(x)).collect(); packets_serialized } fn query_string(insert_max: &usize) -> String { let mut insert_template: String = "INSERT INTO json_dump (packet) Values ".to_owned(); for insert in 0..insert_max - 1 { insert_template.push_str(&(format!("(${}), ", insert + 1))); } insert_template.push_str(&(format!("(${})", insert_max))); insert_template } #[tokio::main(core_threads = 4)] // By default, tokio_postgres uses the tokio crate as its runtime. async fn main() -> Result<(), Error> { /* Init values from file */ let file = File::open("parser.json").expect("file should open read only"); let json: serde_json::Value = serde_json::from_reader(file).unwrap(); let filter = json.get("filter").unwrap().as_str().unwrap(); let insert_max = json.get("insert_max").unwrap().as_u64().unwrap() as usize; let pcap_file = json.get("pcap_file").unwrap().as_str().unwrap(); let host = [ "host=", json.get("database_host").unwrap().as_str().unwrap(), ] .join(""); let user = [ "user=", json.get("database_user").unwrap().as_str().unwrap(), ] .join(""); let password = [ "password=", json.get("database_password").unwrap().as_str().unwrap(), ] .join(""); let connection = [host, user, password].join(" "); let device = json.get("parse_device").unwrap().as_str().unwrap(); let is_device = json.get("from_device").unwrap().as_bool().unwrap(); /* db connection */ let (client, connection) = tokio_postgres::connect(&connection, NoTls).await?; tokio::spawn(async move { if let Err(e) = connection.await { eprintln!("connection error: {}", e); } }); let metadata = fs::metadata(pcap_file).unwrap(); println!("{:?}", &metadata.file_type()); client .execute("DROP TABLE IF EXISTS json_dump", &[]) .await?; client .execute( "CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)", &[], ) .await?; /* device or file input */ if false == is_device { let v: Vec = parser::parse(&pcap_file, &filter); let packets_serialized = serialize_packets(v); /* Query */ //let insert_max = 60; let chunk_count = packets_serialized.len() / insert_max; let remainder: usize = packets_serialized.len() % insert_max; let chunker = &packets_serialized.len() < &insert_max; match chunker { true => { let insert_str = query_string(&packets_serialized.len()); let statement_false = client.prepare(&insert_str).await?; client .query_raw( &statement_false, packets_serialized.iter().map(|p| p as &dyn ToSql), ) .await?; } false => { let insert_str = query_string(&insert_max); let statement = client.prepare(&insert_str).await?; for _i in 0..chunk_count { let (_input, _) = packets_serialized.split_at(insert_max); client .query_raw(&statement, _input.to_vec().iter().map(|p| p as &dyn ToSql)) .await?; } println!("Packets, total:{:?}", packets_serialized.len()); println!("Chunks, total:{}", chunk_count); println!("Chunks, remainder:{}", remainder); if remainder > 0 { let rem_str = query_string(&remainder); let statement_remainder = client.prepare(&rem_str).await?; let (_garbage, _input) = packets_serialized.split_at(packets_serialized.len() - remainder); client .query_raw( &statement_remainder, _input.to_vec().iter().map(|p| p as &dyn ToSql), ) .await?; } } } } else { let insert_str = query_string(&insert_max); let statement = client.prepare(&insert_str).await?; loop { let v: Vec = parser::parse_device(&device, &filter, &insert_max); let packets_serialized = serialize_packets(v); client .query_raw( &statement, packets_serialized.iter().map(|p| p as &dyn ToSql), ) .await?; } } Ok(()) } #[test] fn test_insert_json() { use serde_json::json; let mut client = Client::connect("host=localhost user=postgres password=password", NoTls).unwrap(); let john = json!({ "name": "John Doe", "age": 43, "phones": [ "+44 1234567", "+44 2345678" ], "empty": [] }); client .execute("DROP TABLE IF EXISTS json_dump", &[]) .unwrap(); client.execute( "CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, data json NOT NULL)", &[], ); client.query("INSERT INTO json_dump ( data ) VALUES ($1)", &[&john]); }