From d664ac812285756fda72c1a02b792e0f4dbd7e53 Mon Sep 17 00:00:00 2001 From: gurkenhabicht Date: Sun, 21 Jun 2020 03:39:58 +0200 Subject: [PATCH] switched from INSERT to COPY + impelemented multithreading --- README.md | 9 +++- src/main.rs | 106 ++++++++++++++++++++++++++++-------------------- src/parser.json | 6 +-- 3 files changed, 73 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index ceca250..93e0312 100644 --- a/README.md +++ b/README.md @@ -23,12 +23,17 @@ Currently, ethernet, IPv4, IPV6, TCP, UDP and ARP/RARP network protocols are han Because of testing purposes, layout of the table is serialized json. Table layout is somewhat "dynamic". Any procotols not recognized in a parsed packet will be marked as NULL inside a resulting table row. A query may look like this `select packet from json_dump where packet->>'ipv4_header' is not null;` +UPDATE: Chunking can be omitted completely when using PostgreSQL's `COPY` transferring binary data instead of using `Insert`. This is not only somewhat faster -- not as much as I expectedi, unfortunately -- but there are quite a few lines of code less in the end. Only parsing fromnetwork device still needs chunks. + Speaking of serialization: After profiling it turns out that ~20% of cpu time is used for serialization to json. This, of course, could be saved completely. Another subgoal was the ability to compile a static binary, which --last time I tested-- works without dependencies, but the need for libpcap itself. It even executes on oracle linux, after linking against the elf64 interpreter in a direct manner. If you ever had the pleasure using this derivate it may come as a suprise to you. The key is to compile via `x86_64-unknown-linux-musl` target. See: https://doc.rust-lang.org/edition-guide/rust-2018/platform-and-target-support/musl-support-for-fully-static-binaries.html Caveats: Regex Syntax is limited at the moment, because it is not compiled from a Rawstring, but a common one. Escaping does not work properly, character classes do. I have to fiddle the correct synctactical way to get it out of the json file and into a raw. For already supported regular expression syntax see: https://docs.rs/regex/1.3.9/regex/#syntax , also see the example in `parser.json`. -Transmitting the data of the formerly described testing table layout results in a rather big table size. HDD space was no issue so far. +Transmitting all the data of the formerly described testing table layout results in a rather big table size. HDD space was no issue so far. Ingest of 30808676 TCP/IP Packets taken from iCTF 2020 PCAPs results in 99.4GB of json data. See: https://docs.docker.com/engine/reference/run/#runtime-constraints-on-resources for more details. + +Gotchas: My test setup consists of a postgresql db inside a docker container. Main memory usage of said container is low ~300MB, but I had to set `--oom-score-adj=999` in order to not get the container quit automatically. `--oom-kill-disable=false` would turn it off complete, I guess. + If this whole thing turns out to be viable, some future features may be: @@ -41,6 +46,6 @@ If this whole thing turns out to be viable, some future features may be: There are many other things left to be desired. -The file used for testing was identical to the one used in the previous C implementation. Inserting none chunked data resulted in ~20 minutes of querying to database. Now, chunked data is below 20 seconds after compiler optimization. +Bechmarking was done with the identical file that was used in the previous C implementation. Inserting none chunked data resulted in ~20 minutes of querying to database. Now, chunked data is below 20 seconds after compiler optimization. Speaking of optimization: Do yourself a favor an run release code not debug code: `cargo run --release`. The compiler does a rather hefty optimization and you will save some time waiting for your precious data do be inserted. I did no further optimization besides trying to enable the compiler to do a better job. Just blackboxing, no assembly tweaking yet. diff --git a/src/main.rs b/src/main.rs index 00a452d..1c663ac 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,8 +5,12 @@ extern crate tokio_postgres; mod configure; mod parser; mod serializer; -use tokio_postgres::types::ToSql; +use tokio_postgres::types::{Type, ToSql}; use tokio_postgres::{Error, NoTls}; +use tokio_postgres::binary_copy::{BinaryCopyInWriter}; +use futures::{pin_mut}; +use tokio::task; + /* conditionals */ const FROM_FILE: bool = false; @@ -67,54 +71,70 @@ async fn main() -> Result<(), Error> { parser::parse(&_pcap_file, &config.filter, &config.regex_filter); let packets_serialized = serializer::serialize_packets(v); + let sink = client.copy_in("COPY json_dump(packet) from STDIN BINARY").await.unwrap(); + let writer = BinaryCopyInWriter::new(sink, &[Type::JSON]); + + let join = task::spawn( async move { + pin_mut!(writer); + for pack in packets_serialized { + writer.as_mut().write(&[&pack]).await.unwrap(); + // Reminder: write_raw() behavior is very strange, so it's write() for now. + // writer.as_mut().write_raw(chunk.into_iter().map(|p| p as &dyn ToSql).collect()).await.unwrap(); + } + writer.finish().await.unwrap(); + }); + + let result = join.await.unwrap(); + + // TODO: Tuning vector capacity according to mean average & std dev of packet sizes // let mut v = Vec::::with_capacity(100000); // v.extend(parser::parse(&_pcap_file, &config.filter, &config.regex_filter)); // let mut packets_serialized = Vec::::with_capacity(100000); // packets_serialized.extend(serializer::serialize_packets(v)); - /* Do chunks and query data */ - let chunker = (&packets_serialized.len() < &config.insert_max) && (0 < packets_serialized.len()) ; - match chunker { - NON_CHUNKED => { - let insert_str = query_string(&packets_serialized.len(), &config.tablename); - let statement = client.prepare(&insert_str).await?; - client - .query_raw( - &statement, - packets_serialized.iter().map(|p| p as &dyn ToSql), - ) - .await?; - } - CHUNKED => { - let insert_str = query_string(&config.insert_max, &config.tablename); - let statement = client.prepare(&insert_str).await?; - - for chunk in packets_serialized.chunks_exact(config.insert_max) { - client - .query_raw(&statement, chunk.iter().map(|p| p as &dyn ToSql)) - .await?; - } - let remainder_len = packets_serialized - .chunks_exact(config.insert_max) - .remainder() - .len(); - if 0 < remainder_len { - let rem_str = query_string(&remainder_len, &config.tablename); - let statement = client.prepare(&rem_str).await?; - client - .query_raw( - &statement, - packets_serialized - .chunks_exact(config.insert_max) - .remainder() - .iter() - .map(|p| p as &dyn ToSql), - ) - .await?; - } - } - } +// /* Do chunks and query data */ +// let chunker = (&packets_serialized.len() < &config.insert_max) && (0 < packets_serialized.len()) ; +// match chunker { +// NON_CHUNKED => { +// let insert_str = query_string(&packets_serialized.len(), &config.tablename); +// let statement = client.prepare(&insert_str).await?; +// client +// .query_raw( +// &statement, +// packets_serialized.iter().map(|p| p as &dyn ToSql), +// ) +// .await?; +// } +// CHUNKED => { +// let insert_str = query_string(&config.insert_max, &config.tablename); +// let statement = client.prepare(&insert_str).await?; +// +// for chunk in packets_serialized.chunks_exact(config.insert_max) { +// client +// .query_raw(&statement, chunk.iter().map(|p| p as &dyn ToSql)) +// .await?; +// } +// let remainder_len = packets_serialized +// .chunks_exact(config.insert_max) +// .remainder() +// .len(); +// if 0 < remainder_len { +// let rem_str = query_string(&remainder_len, &config.tablename); +// let statement = client.prepare(&rem_str).await?; +// client +// .query_raw( +// &statement, +// packets_serialized +// .chunks_exact(config.insert_max) +// .remainder() +// .iter() +// .map(|p| p as &dyn ToSql), +// ) +// .await?; +// } +// } +// } } } FROM_DEVICE => { diff --git a/src/parser.json b/src/parser.json index caa7be3..bfe340b 100644 --- a/src/parser.json +++ b/src/parser.json @@ -1,11 +1,11 @@ { "insert_max": 20000, "filter": "tcp", - "regex_filter": "(?:http|https)[[:punct:]]+[[:alnum:]]+[[:punct:]][[:alnum:]]+[[:punct:]](?:com|de|org)", + "regex_filter": "(?:http|https)[[:punct:]]+[[:alnum:]]+[[:punct:]][[:alnum:]]+[[:punct:]](?:com|de|org|net)", "from_device": false, "parse_device": "enp7s0", - "pcap_file": "not in use right now", - "pcap_dir": "../target", + "pcap_file": "", + "pcap_dir": "../target/files", "database_tablename": "json_dump", "database_user": "postgres", "database_host": "localhost",