extern crate serde_json; extern crate tokio; extern crate tokio_postgres; mod configure; mod parser; mod serializer; //use postgres::{Client, NoTls}; //use postgres::types::ToSql; //use postgres::binary_copy::{BinaryCopyInWriter}; use tokio_postgres::types::{Type, ToSql}; use tokio_postgres::{Error, NoTls}; use tokio_postgres::binary_copy::{BinaryCopyInWriter}; use futures::{pin_mut}; use tokio::task; use tokio::sync::mpsc; //use std::thread::{spawn, JoinHandle}; //use std::sync::mpsc::{channel, Receiver}; //use std::sync::mpsc; //use std::alloc::System; // //#[global_allocator] //static A: System = System; /* conditionals */ const FROM_FILE: bool = false; const FROM_DEVICE: bool = true; //const NON_CHUNKED: bool = true; //const CHUNKED: bool = false; fn query_string(insert_max: &usize, table_name: &str) -> String { let mut insert_template = String::with_capacity(insert_max * 8 + 96); insert_template.push_str(&*format!("INSERT INTO {} (packet) Values ", table_name)); for insert in 0..insert_max - 1 { insert_template.push_str(&*format!("(${}), ", insert + 1)); } insert_template.push_str(&*format!("(${})", insert_max)); insert_template } #[tokio::main(core_threads = 4)] async fn main() -> Result<(), Error> { /* Init values from file */ let config: configure::Config = configure::from_json_file().unwrap(); let pcap_map = configure::map_pcap_dir(&config.pcap_dir).unwrap(); // TODO: Create db table with pcap file hashes // TODO: hash file metadata, so its state is comparable with future file updates and can be written to a db table (and read e.g. after system crash) // This db table should include UUIDs as primary keys, so it can be joined effectively with past and future runs. // TODO: Use inotify crate to update pcap_map according to files created while parser is running /* db connection */ let (client, connection) = tokio_postgres::connect(&config.connection, NoTls).await?; tokio::spawn(async move { if let Err(e) = connection.await { eprintln!("connection error: {}", e); } }); client .execute(&*format!("DROP TABLE IF EXISTS {}", &config.tablename), &[]) .await?; client .execute( &*format!( "CREATE TABLE {} ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)", &config.tablename ), &[], ) .await?; /* device or file input */ match config.is_device { FROM_FILE => { for (_pcap_file, _pcap_info) in pcap_map.iter() { //println!("{:?}: {:?}", &_pcap_file, &_pcap_info); /* MPSC channeled serialization */ // let (qry_data, h1) = parser::mpsc_parser(_pcap_file.to_owned(), config.filter.to_owned(), config.regex_filter.to_owned()); // let (data_serialized, h2) = serializer::mpsc_serialize(qry_data); // let packets_serialized = serializer::mpsc_collect_serialized(data_serialized); // let _r1 = h1.join().unwrap(); // let _r2 = h2.join().unwrap(); /* This is serializing data without mpsc, which results in higher memory consumption, it's faster but 12GB main memory is needed */ // let v: Vec = // parser::parse(&_pcap_file, &config.filter, &config.regex_filter); // let len = v.len(); /* tokio mpsc channel */ let (tx, mut rx) = mpsc::channel(1000); let pcap_file = _pcap_file.clone(); let filter = config.filter.clone(); let regex_filter = config.regex_filter.clone(); let join_handle: task::JoinHandle> = task::spawn( async move { parser::tokio_parse(pcap_file, &filter, ®ex_filter).await }); let v = join_handle.await.unwrap(); for packet in v.into_iter(){ let mut tx = tx.clone(); tokio::spawn( async move { //println!("serializing!number {:?}", i); let packet_serialized = serializer::tokio_serialize(packet).await; tx.send(packet_serialized).await.unwrap(); }); } drop(tx); // let mut packets_serialized: Vec = Vec::new(); // while let Some(res) = rx.recv().await { // //println!("collecting"); // packets_serialized.push(res); // //let packets_serialized = serializer::serialize_packets(v); // } let sink = client.copy_in("COPY json_dump(packet) from STDIN BINARY").await.unwrap(); let writer = BinaryCopyInWriter::new(sink, &[Type::JSON]); let join = task::spawn( async move { pin_mut!(writer); //for pack in packets_serialized { while let Some(res) = rx.recv().await { writer.as_mut().write(&[&res]).await.unwrap(); drop(res); // Reminder: write_raw() behavior is very strange, so it's write() for now. // writer.as_mut().write_raw(chunk.into_iter().map(|p| p as &dyn ToSql).collect()).await.unwrap(); } //thread::sleep(time::Duration::from_millis(3000)); writer.finish().await.unwrap(); }); assert!(join.await.is_ok()); // TODO: MPSC channel // let mut v = Vec::::with_capacity(100000); // v.extend(parser::parse(&_pcap_file, &config.filter, &config.regex_filter)); // let mut packets_serialized = Vec::::with_capacity(100000); // packets_serialized.extend(serializer::serialize_packets(v)); // Reminder: If COPY doesn't cut it and INSERT is the way to go, uncomment and use following logic inside FROM_FILE // /* Do chunks and query data */ // let chunker = (&packets_serialized.len() < &config.insert_max) && (0 < packets_serialized.len()) ; // match chunker { // NON_CHUNKED => { // let insert_str = query_string(&packets_serialized.len(), &config.tablename); // let statement = client.prepare(&insert_str).await?; // client // .query_raw( // &statement, // packets_serialized.iter().map(|p| p as &dyn ToSql), // ) // .await?; // } // CHUNKED => { // let insert_str = query_string(&config.insert_max, &config.tablename); // let statement = client.prepare(&insert_str).await?; // // for chunk in packets_serialized.chunks_exact(config.insert_max) { // client // .query_raw(&statement, chunk.iter().map(|p| p as &dyn ToSql)) // .await?; // } // let remainder_len = packets_serialized // .chunks_exact(config.insert_max) // .remainder() // .len(); // if 0 < remainder_len { // let rem_str = query_string(&remainder_len, &config.tablename); // let statement = client.prepare(&rem_str).await?; // client // .query_raw( // &statement, // packets_serialized // .chunks_exact(config.insert_max) // .remainder() // .iter() // .map(|p| p as &dyn ToSql), // ) // .await?; // } // } // } } } FROM_DEVICE => { let insert_str = query_string(&config.insert_max, &config.tablename); let statement = client.prepare(&insert_str).await?; loop { let v: Vec = parser::parse_device( &config.device, &config.filter, &config.insert_max, &config.regex_filter, ); let packets_serialized = serializer::serialize_packets(v); client .query_raw( &statement, packets_serialized.iter().map(|p| p as &dyn ToSql), ) .await?; } } } Ok(()) }