tpcpr/src/main.rs

138 lines
5.3 KiB
Rust

extern crate serde_json;
extern crate tokio;
extern crate tokio_postgres;
mod configure;
mod parser;
mod serializer;
use tokio_postgres::types::ToSql;
use tokio_postgres::{Error, NoTls};
fn query_string(insert_max: &usize) -> String {
let mut insert_template = String::with_capacity(insert_max * 8 + 43);
insert_template.push_str("INSERT INTO json_dump (packet) Values ");
for insert in 0..insert_max - 1 {
insert_template.push_str(&(format!("(${}), ", insert + 1)));
}
insert_template.push_str(&(format!("(${})", insert_max)));
insert_template
}
#[tokio::main(core_threads = 4)] // By default, tokio_postgres uses the tokio crate as its runtime.
async fn main() -> Result<(), Error> {
/* Init values from file */
let config: configure::Config = configure::from_json_file().unwrap();
// TODO: hash file metadata, so its state is comparable at times and can be written to a db table (e.g. after system crash)
// This db table should include UUIDs so it can be joined effectively
let pcap_map = configure::map_pcap_dir(&config.pcap_dir).unwrap();
println!("{:?}", pcap_map.iter());
/* db connection */
let (client, connection) = tokio_postgres::connect(&config.connection, NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
client
.execute("DROP TABLE IF EXISTS json_dump", &[])
.await?;
client
.execute(
"CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)",
&[],
)
.await?;
/* device or file input */
match config.is_device {
false => {
for (_pcap_file, _pcap_info) in pcap_map.iter() {
println!("{:?}", &_pcap_file);
// TODO: Tuning vector capacity according to mean average & std dev of packet size
let v: Vec<parser::QryData> =
parser::parse(&_pcap_file, &config.filter, &config.regex_filter);
//let mut v = Vec::<parser::QryData>::with_capacity(35536);
//v.extend(parser::parse(&_pcap_file, &config.filter));
let packets_serialized = serializer::serialize_packets(v);
//let mut packets_serialized = Vec::<serde_json::Value>::with_capacity(35536);
//packets_serialized.extend(serializer::serialize_packets(v));
/* Query */
let chunk_count = packets_serialized.len() / config.insert_max;
let remainder: usize = packets_serialized.len() % config.insert_max;
let chunker = &packets_serialized.len() < &config.insert_max;
match chunker {
true => {
let insert_str = query_string(&packets_serialized.len());
let statement_false = client.prepare(&insert_str).await?;
client
.query_raw(
&statement_false,
packets_serialized.iter().map(|p| p as &dyn ToSql),
)
.await?;
}
false => {
let insert_str = query_string(&config.insert_max);
let statement = client.prepare(&insert_str).await?;
for _i in 0..chunk_count {
let (_input, _) = packets_serialized.split_at(config.insert_max);
client
.query_raw(
&statement,
_input.to_vec().iter().map(|p| p as &dyn ToSql),
)
.await?;
}
if remainder > 0 {
let rem_str = query_string(&remainder);
let statement_remainder = client.prepare(&rem_str).await?;
let (_garbage, _input) =
packets_serialized.split_at(packets_serialized.len() - remainder);
client
.query_raw(
&statement_remainder,
_input.to_vec().iter().map(|p| p as &dyn ToSql),
)
.await?;
}
}
}
}
}
true => {
let insert_str = query_string(&config.insert_max);
let statement = client.prepare(&insert_str).await?;
loop {
let v: Vec<parser::QryData> = parser::parse_device(
&config.device,
&config.filter,
&config.insert_max,
&config.regex_filter,
);
let packets_serialized = serializer::serialize_packets(v);
client
.query_raw(
&statement,
packets_serialized.iter().map(|p| p as &dyn ToSql),
)
.await?;
}
}
}
Ok(())
}