192 lines
6.6 KiB
Rust
192 lines
6.6 KiB
Rust
extern crate serde_json;
|
|
extern crate tokio;
|
|
extern crate tokio_postgres;
|
|
use std::fs::File;
|
|
mod parser;
|
|
mod serializer;
|
|
use std::fs;
|
|
use std::io::prelude::*;
|
|
use std::collections::HashMap;
|
|
use tokio_postgres::types::ToSql;
|
|
use tokio_postgres::{Error, NoTls};
|
|
|
|
const PCAPNG_SIGNATURE: [u8;4] = [0x0a, 0x0d, 0x0d, 0x0a];
|
|
const PCAP_SIGNATURE: [u8;4] = [0xed, 0xab, 0xee, 0xdb];
|
|
|
|
fn query_string(insert_max: &usize) -> String {
|
|
let mut insert_template: String = "INSERT INTO json_dump (packet) Values ".to_owned();
|
|
|
|
for insert in 0..insert_max - 1 {
|
|
insert_template.push_str(&(format!("(${}), ", insert + 1)));
|
|
}
|
|
insert_template.push_str(&(format!("(${})", insert_max)));
|
|
|
|
insert_template
|
|
}
|
|
|
|
#[tokio::main(core_threads = 4)] // By default, tokio_postgres uses the tokio crate as its runtime.
|
|
async fn main() -> Result<(), Error> {
|
|
/* Init values from file */
|
|
let file = File::open("parser.json").expect("file should open read only");
|
|
|
|
let json: serde_json::Value = serde_json::from_reader(file).unwrap();
|
|
let filter = json.get("filter").unwrap().as_str().unwrap();
|
|
let insert_max = json.get("insert_max").unwrap().as_u64().unwrap() as usize;
|
|
let pcap_file = json.get("pcap_file").unwrap().as_str().unwrap();
|
|
let host = [
|
|
"host=",
|
|
json.get("database_host").unwrap().as_str().unwrap(),
|
|
]
|
|
.join("");
|
|
let user = [
|
|
"user=",
|
|
json.get("database_user").unwrap().as_str().unwrap(),
|
|
]
|
|
.join("");
|
|
let password = [
|
|
"password=",
|
|
json.get("database_password").unwrap().as_str().unwrap(),
|
|
]
|
|
.join("");
|
|
let connection = [host, user, password].join(" ");
|
|
let device = json.get("parse_device").unwrap().as_str().unwrap();
|
|
let is_device = json.get("from_device").unwrap().as_bool().unwrap();
|
|
let pcap_dir = json.get("pcap_dir").unwrap().as_str().unwrap();
|
|
|
|
let mut pcap_map = HashMap::new();
|
|
if let Ok(entries) = fs::read_dir(pcap_dir) {
|
|
for entry in entries {
|
|
if let Ok(entry) = entry {
|
|
if let Ok(_file_type) = entry.file_type() {
|
|
if entry.metadata().unwrap().is_file() {
|
|
let mut magic_number: [u8;4] = [0;4];
|
|
let _signature = File::open(entry.path().to_owned()).unwrap().read_exact(&mut magic_number).unwrap();
|
|
match magic_number {
|
|
PCAPNG_SIGNATURE => pcap_map.insert(entry.path(), entry.metadata()),
|
|
PCAP_SIGNATURE => pcap_map.insert(entry.path(), entry.metadata()),
|
|
_ => None
|
|
|
|
};
|
|
// println!("{:?}", &entry.metadata().unwrap().modified());
|
|
|
|
}
|
|
} else {
|
|
println!("Couldn't get file type for {:?}", entry.path());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
println!("{:?}", pcap_map.iter());
|
|
/* db connection */
|
|
let (client, connection) = tokio_postgres::connect(&connection, NoTls).await?;
|
|
|
|
tokio::spawn(async move {
|
|
if let Err(e) = connection.await {
|
|
eprintln!("connection error: {}", e);
|
|
}
|
|
});
|
|
|
|
client
|
|
.execute("DROP TABLE IF EXISTS json_dump", &[])
|
|
.await?;
|
|
client
|
|
.execute(
|
|
"CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)",
|
|
&[],
|
|
)
|
|
.await?;
|
|
|
|
/* device or file input */
|
|
if false == is_device {
|
|
let v: Vec<parser::QryData> = parser::parse(&pcap_file, &filter);
|
|
let packets_serialized = serializer::serialize_packets(v);
|
|
|
|
/* Query */
|
|
//let insert_max = 60;
|
|
let chunk_count = packets_serialized.len() / insert_max;
|
|
let remainder: usize = packets_serialized.len() % insert_max;
|
|
let chunker = &packets_serialized.len() < &insert_max;
|
|
match chunker {
|
|
true => {
|
|
let insert_str = query_string(&packets_serialized.len());
|
|
let statement_false = client.prepare(&insert_str).await?;
|
|
client
|
|
.query_raw(
|
|
&statement_false,
|
|
packets_serialized.iter().map(|p| p as &dyn ToSql),
|
|
)
|
|
.await?;
|
|
}
|
|
|
|
false => {
|
|
let insert_str = query_string(&insert_max);
|
|
let statement = client.prepare(&insert_str).await?;
|
|
|
|
for _i in 0..chunk_count {
|
|
let (_input, _) = packets_serialized.split_at(insert_max);
|
|
client
|
|
.query_raw(&statement, _input.to_vec().iter().map(|p| p as &dyn ToSql))
|
|
.await?;
|
|
}
|
|
|
|
println!("Packets, total:{:?}", packets_serialized.len());
|
|
println!("Chunks, total:{}", chunk_count);
|
|
println!("Chunks, remainder:{}", remainder);
|
|
|
|
if remainder > 0 {
|
|
let rem_str = query_string(&remainder);
|
|
let statement_remainder = client.prepare(&rem_str).await?;
|
|
let (_garbage, _input) =
|
|
packets_serialized.split_at(packets_serialized.len() - remainder);
|
|
client
|
|
.query_raw(
|
|
&statement_remainder,
|
|
_input.to_vec().iter().map(|p| p as &dyn ToSql),
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
let insert_str = query_string(&insert_max);
|
|
let statement = client.prepare(&insert_str).await?;
|
|
loop {
|
|
let v: Vec<parser::QryData> = parser::parse_device(&device, &filter, &insert_max);
|
|
let packets_serialized = serializer::serialize_packets(v);
|
|
client
|
|
.query_raw(
|
|
&statement,
|
|
packets_serialized.iter().map(|p| p as &dyn ToSql),
|
|
)
|
|
.await?;
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn test_insert_json() {
|
|
use serde_json::json;
|
|
let mut client =
|
|
Client::connect("host=localhost user=postgres password=password", NoTls).unwrap();
|
|
let john = json!({
|
|
"name": "John Doe",
|
|
"age": 43,
|
|
"phones": [
|
|
"+44 1234567",
|
|
"+44 2345678"
|
|
],
|
|
"empty": []
|
|
});
|
|
|
|
client
|
|
.execute("DROP TABLE IF EXISTS json_dump", &[])
|
|
.unwrap();
|
|
client.execute(
|
|
"CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, data json NOT NULL)",
|
|
&[],
|
|
);
|
|
client.query("INSERT INTO json_dump ( data ) VALUES ($1)", &[&john]);
|
|
}
|