reworked chunker, performance inrease
This commit is contained in:
parent
668a22b4d0
commit
0e1b9435b1
|
@ -34,6 +34,7 @@ If this whole thing turns out to be viable, some future features may be:
|
||||||
- Update file hashmap through inotify crate, during runtime.
|
- Update file hashmap through inotify crate, during runtime.
|
||||||
- Restoration of fragmented ipv4 packages.
|
- Restoration of fragmented ipv4 packages.
|
||||||
- SIMD (via autovectorization). Which is easy enough to do in Rust.
|
- SIMD (via autovectorization). Which is easy enough to do in Rust.
|
||||||
|
- Support of more protocols
|
||||||
|
|
||||||
There are many other things left to be desired.
|
There are many other things left to be desired.
|
||||||
|
|
||||||
|
|
|
@ -57,7 +57,12 @@ pub fn from_json_file() -> Option<Config> {
|
||||||
.to_owned(),
|
.to_owned(),
|
||||||
insert_max: json.get("insert_max").unwrap().as_u64().unwrap() as usize,
|
insert_max: json.get("insert_max").unwrap().as_u64().unwrap() as usize,
|
||||||
pcap_file: json.get("pcap_file").unwrap().as_str().unwrap().to_owned(), // Not in use atm
|
pcap_file: json.get("pcap_file").unwrap().as_str().unwrap().to_owned(), // Not in use atm
|
||||||
tablename: json.get("database_tablename").unwrap().as_str().unwrap().to_owned(),
|
tablename: json
|
||||||
|
.get("database_tablename")
|
||||||
|
.unwrap()
|
||||||
|
.as_str()
|
||||||
|
.unwrap()
|
||||||
|
.to_owned(),
|
||||||
connection: format!(
|
connection: format!(
|
||||||
"host={} user={} password={}",
|
"host={} user={} password={}",
|
||||||
json.get("database_host").unwrap().as_str().unwrap(),
|
json.get("database_host").unwrap().as_str().unwrap(),
|
||||||
|
|
56
src/main.rs
56
src/main.rs
|
@ -26,17 +26,16 @@ fn query_string(insert_max: &usize, table_name: &str) -> String {
|
||||||
insert_template
|
insert_template
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main(core_threads = 4)] // Tokio is implemented for possible future use.
|
#[tokio::main(core_threads = 4)] // Tokio is implemented for possible future use.
|
||||||
async fn main() -> Result<(), Error> {
|
async fn main() -> Result<(), Error> {
|
||||||
|
|
||||||
/* Init values from file */
|
/* Init values from file */
|
||||||
let config: configure::Config = configure::from_json_file().unwrap();
|
let config: configure::Config = configure::from_json_file().unwrap();
|
||||||
let pcap_map = configure::map_pcap_dir(&config.pcap_dir).unwrap();
|
let pcap_map = configure::map_pcap_dir(&config.pcap_dir).unwrap();
|
||||||
|
|
||||||
// TODO: Create db table with pcap file hashes
|
// TODO: Create db table with pcap file hashes
|
||||||
// TODO: hash file metadata, so its state is comparable at times and can be written to a db table (and read e.g. after system crash)
|
// TODO: hash file metadata, so its state is comparable with future file updates and can be written to a db table (and read e.g. after system crash)
|
||||||
// This db table should include UUIDs so it can be joined effectively with former runs
|
// This db table should include UUIDs as primary keys, so it can be joined effectively with past and future runs.
|
||||||
// TODO: Use inotfy crate to update pcap_map according to files created while parser is running
|
// TODO: Use inotify crate to update pcap_map according to files created while parser is running
|
||||||
|
|
||||||
/* db connection */
|
/* db connection */
|
||||||
let (client, connection) = tokio_postgres::connect(&config.connection, NoTls).await?;
|
let (client, connection) = tokio_postgres::connect(&config.connection, NoTls).await?;
|
||||||
|
@ -46,11 +45,14 @@ async fn main() -> Result<(), Error> {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
client
|
client
|
||||||
.execute(&*format!("DROP TABLE IF EXISTS {}", &config.tablename), &[])
|
.execute(&*format!("DROP TABLE IF EXISTS {}", &config.tablename), &[])
|
||||||
.await?;
|
.await?;
|
||||||
client
|
client
|
||||||
.execute(
|
.execute(
|
||||||
&*format!("CREATE TABLE {} ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)", &config.tablename),
|
&*format!(
|
||||||
|
"CREATE TABLE {} ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)",
|
||||||
|
&config.tablename
|
||||||
|
),
|
||||||
&[],
|
&[],
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -66,18 +68,12 @@ async fn main() -> Result<(), Error> {
|
||||||
let packets_serialized = serializer::serialize_packets(v);
|
let packets_serialized = serializer::serialize_packets(v);
|
||||||
|
|
||||||
// TODO: Tuning vector capacity according to mean average & std dev of packet sizes
|
// TODO: Tuning vector capacity according to mean average & std dev of packet sizes
|
||||||
// let mut v = Vec::<parser::QryData>::with_capacity(100000);
|
// let mut v = Vec::<parser::QryData>::with_capacity(100000);
|
||||||
// v.extend(parser::parse(&_pcap_file, &config.filter, &config.regex_filter));
|
// v.extend(parser::parse(&_pcap_file, &config.filter, &config.regex_filter));
|
||||||
// let mut packets_serialized = Vec::<serde_json::Value>::with_capacity(100000);
|
// let mut packets_serialized = Vec::<serde_json::Value>::with_capacity(100000);
|
||||||
// packets_serialized.extend(serializer::serialize_packets(v));
|
// packets_serialized.extend(serializer::serialize_packets(v));
|
||||||
|
|
||||||
/* Do chunks and query data */
|
/* Do chunks and query data */
|
||||||
let chunk_count = packets_serialized.len() / config.insert_max;
|
|
||||||
let remainder: usize = packets_serialized.len() % config.insert_max;
|
|
||||||
println!("chunks: {:?}", &chunk_count);
|
|
||||||
println!("remainder: {:?}", &remainder);
|
|
||||||
|
|
||||||
|
|
||||||
let chunker = &packets_serialized.len() < &config.insert_max;
|
let chunker = &packets_serialized.len() < &config.insert_max;
|
||||||
match chunker {
|
match chunker {
|
||||||
NON_CHUNKED => {
|
NON_CHUNKED => {
|
||||||
|
@ -94,24 +90,26 @@ async fn main() -> Result<(), Error> {
|
||||||
let insert_str = query_string(&config.insert_max, &config.tablename);
|
let insert_str = query_string(&config.insert_max, &config.tablename);
|
||||||
let statement = client.prepare(&insert_str).await?;
|
let statement = client.prepare(&insert_str).await?;
|
||||||
|
|
||||||
for _i in 0..chunk_count {
|
for chunk in packets_serialized.chunks_exact(config.insert_max) {
|
||||||
let (_input, _) = packets_serialized.split_at(config.insert_max);
|
|
||||||
client
|
client
|
||||||
.query_raw(
|
.query_raw(&statement, chunk.iter().map(|p| p as &dyn ToSql))
|
||||||
&statement,
|
|
||||||
_input.iter().map(|p| p as &dyn ToSql),
|
|
||||||
)
|
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
if 0 < remainder {
|
let remainder_len = packets_serialized
|
||||||
let rem_str = query_string(&remainder, &config.tablename);
|
.chunks_exact(config.insert_max)
|
||||||
|
.remainder()
|
||||||
|
.len();
|
||||||
|
if 0 < remainder_len {
|
||||||
|
let rem_str = query_string(&remainder_len, &config.tablename);
|
||||||
let statement = client.prepare(&rem_str).await?;
|
let statement = client.prepare(&rem_str).await?;
|
||||||
let (_garbage, _input) =
|
|
||||||
packets_serialized.split_at(packets_serialized.len() - remainder);
|
|
||||||
client
|
client
|
||||||
.query_raw(
|
.query_raw(
|
||||||
&statement,
|
&statement,
|
||||||
_input.iter().map(|p| p as &dyn ToSql),
|
packets_serialized
|
||||||
|
.chunks_exact(config.insert_max)
|
||||||
|
.remainder()
|
||||||
|
.iter()
|
||||||
|
.map(|p| p as &dyn ToSql),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
{
|
{
|
||||||
"insert_max": 20000,
|
"insert_max": 16000,
|
||||||
"filter": "ip6 && tcp",
|
"filter": "!ip6 && tcp",
|
||||||
"regex_filter": "(?:http|https)[[::punct::]]//([[::word::]]+\\.)*",
|
"regex_filter": "(?:http|https)[[:punct:]]+[[:alnum:]]+[[:punct:]][[:alnum:]]+[[:punct:]](?:com|de|org)",
|
||||||
"from_device": false,
|
"from_device": false,
|
||||||
"parse_device": "enp7s0",
|
"parse_device": "enp7s0",
|
||||||
"pcap_file": "",
|
"pcap_file": "not in use right now",
|
||||||
"pcap_dir": "../target",
|
"pcap_dir": "../target/files",
|
||||||
"database_tablename": "json_dump",
|
"database_tablename": "json_dump",
|
||||||
"database_user": "postgres",
|
"database_user": "postgres",
|
||||||
"database_host": "localhost",
|
"database_host": "localhost",
|
||||||
|
|
|
@ -196,9 +196,9 @@ fn flag_carnage(re: &Regex, payload: &[u8]) -> Option<String> {
|
||||||
flags.push_str(std::str::from_utf8(mat.as_bytes()).unwrap());
|
flags.push_str(std::str::from_utf8(mat.as_bytes()).unwrap());
|
||||||
flags.push_str(";");
|
flags.push_str(";");
|
||||||
}
|
}
|
||||||
//if flags.len() > 0{
|
if flags.len() > 0{
|
||||||
//println!("{:?}", flags);
|
println!("{:?}", flags);
|
||||||
//}
|
}
|
||||||
match 0 < flags.len() {
|
match 0 < flags.len() {
|
||||||
false => None,
|
false => None,
|
||||||
true => Some(flags),
|
true => Some(flags),
|
||||||
|
@ -215,18 +215,15 @@ pub fn parse(parse_file: &std::path::Path, filter_str: &str, regex_filter: &str)
|
||||||
while let Ok(packet) = cap.next() {
|
while let Ok(packet) = cap.next() {
|
||||||
let mut me = QryData::new();
|
let mut me = QryData::new();
|
||||||
match linktype {
|
match linktype {
|
||||||
Linktype(1) => me.encap_en10mb(packet.data).unwrap(), // I reversed encapsulation/linktype bytes in pcap/pcapng file by looking at https://www.tcpdump.org/linktypes.html
|
Linktype(1) => me.encap_en10mb(packet.data).unwrap(), // I reversed encapsulation/linktype bytes in pcap/pcapng file by looking at https://www.tcpdump.org/linktypes.html
|
||||||
Linktype(12) => me.encap_raw(packet.data).unwrap(), // Either this source + my implementation is wrong or pcap crate sucks
|
Linktype(12) => me.encap_raw(packet.data).unwrap(), // Either this source + my implementation is wrong or pcap crate sucks
|
||||||
_ => (),
|
_ => (),
|
||||||
};
|
};
|
||||||
|
|
||||||
me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64;
|
me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64;
|
||||||
me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex overhead is between 4-9% --single threaded-- on complete packet [u8] data
|
me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex overhead is between 4-9% --single threaded-- on complete packet [u8] data
|
||||||
//v.push(me.clone());
|
//v.push(me.clone());
|
||||||
|
|
||||||
if me.reg_res.is_some(){
|
|
||||||
println!("{:?}", &me.reg_res);
|
|
||||||
}
|
|
||||||
v.push(QryData {
|
v.push(QryData {
|
||||||
id: 0,
|
id: 0,
|
||||||
time: me.time,
|
time: me.time,
|
||||||
|
|
|
@ -33,7 +33,7 @@ pub fn serialize_packets(v: Vec<parser::QryData>) -> Vec<serde_json::Value> {
|
||||||
.par_iter()
|
.par_iter()
|
||||||
.map(|x| serde_json::to_value(x).unwrap())
|
.map(|x| serde_json::to_value(x).unwrap())
|
||||||
.collect();
|
.collect();
|
||||||
// let packets_serialized: Vec<serde_json::Value> = v.par_iter().map(|x| json!(x)).collect();
|
// let packets_serialized: Vec<serde_json::Value> = v.par_iter().map(|x| json!(x)).collect();
|
||||||
|
|
||||||
packets_serialized
|
packets_serialized
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue