switched from INSERT to COPY + impelemented multithreading
This commit is contained in:
parent
2e2c8c5e5b
commit
d664ac8122
|
@ -23,12 +23,17 @@ Currently, ethernet, IPv4, IPV6, TCP, UDP and ARP/RARP network protocols are han
|
||||||
Because of testing purposes, layout of the table is serialized json. Table layout is somewhat "dynamic". Any procotols not recognized in a parsed packet will be marked as NULL inside a resulting table row.
|
Because of testing purposes, layout of the table is serialized json. Table layout is somewhat "dynamic". Any procotols not recognized in a parsed packet will be marked as NULL inside a resulting table row.
|
||||||
A query may look like this `select packet from json_dump where packet->>'ipv4_header' is not null;`
|
A query may look like this `select packet from json_dump where packet->>'ipv4_header' is not null;`
|
||||||
|
|
||||||
|
UPDATE: Chunking can be omitted completely when using PostgreSQL's `COPY` transferring binary data instead of using `Insert`. This is not only somewhat faster -- not as much as I expectedi, unfortunately -- but there are quite a few lines of code less in the end. Only parsing fromnetwork device still needs chunks.
|
||||||
|
|
||||||
Speaking of serialization: After profiling it turns out that ~20% of cpu time is used for serialization to json. This, of course, could be saved completely.
|
Speaking of serialization: After profiling it turns out that ~20% of cpu time is used for serialization to json. This, of course, could be saved completely.
|
||||||
|
|
||||||
Another subgoal was the ability to compile a static binary, which --last time I tested-- works without dependencies, but the need for libpcap itself. It even executes on oracle linux, after linking against the elf64 interpreter in a direct manner. If you ever had the pleasure using this derivate it may come as a suprise to you. The key is to compile via `x86_64-unknown-linux-musl` target. See: https://doc.rust-lang.org/edition-guide/rust-2018/platform-and-target-support/musl-support-for-fully-static-binaries.html
|
Another subgoal was the ability to compile a static binary, which --last time I tested-- works without dependencies, but the need for libpcap itself. It even executes on oracle linux, after linking against the elf64 interpreter in a direct manner. If you ever had the pleasure using this derivate it may come as a suprise to you. The key is to compile via `x86_64-unknown-linux-musl` target. See: https://doc.rust-lang.org/edition-guide/rust-2018/platform-and-target-support/musl-support-for-fully-static-binaries.html
|
||||||
|
|
||||||
Caveats: Regex Syntax is limited at the moment, because it is not compiled from a Rawstring, but a common one. Escaping does not work properly, character classes do. I have to fiddle the correct synctactical way to get it out of the json file and into a raw. For already supported regular expression syntax see: https://docs.rs/regex/1.3.9/regex/#syntax , also see the example in `parser.json`.
|
Caveats: Regex Syntax is limited at the moment, because it is not compiled from a Rawstring, but a common one. Escaping does not work properly, character classes do. I have to fiddle the correct synctactical way to get it out of the json file and into a raw. For already supported regular expression syntax see: https://docs.rs/regex/1.3.9/regex/#syntax , also see the example in `parser.json`.
|
||||||
Transmitting the data of the formerly described testing table layout results in a rather big table size. HDD space was no issue so far.
|
Transmitting all the data of the formerly described testing table layout results in a rather big table size. HDD space was no issue so far. Ingest of 30808676 TCP/IP Packets taken from iCTF 2020 PCAPs results in 99.4GB of json data. See: https://docs.docker.com/engine/reference/run/#runtime-constraints-on-resources for more details.
|
||||||
|
|
||||||
|
Gotchas: My test setup consists of a postgresql db inside a docker container. Main memory usage of said container is low ~300MB, but I had to set `--oom-score-adj=999` in order to not get the container quit automatically. `--oom-kill-disable=false` would turn it off complete, I guess.
|
||||||
|
|
||||||
|
|
||||||
If this whole thing turns out to be viable, some future features may be:
|
If this whole thing turns out to be viable, some future features may be:
|
||||||
|
|
||||||
|
@ -41,6 +46,6 @@ If this whole thing turns out to be viable, some future features may be:
|
||||||
|
|
||||||
There are many other things left to be desired.
|
There are many other things left to be desired.
|
||||||
|
|
||||||
The file used for testing was identical to the one used in the previous C implementation. Inserting none chunked data resulted in ~20 minutes of querying to database. Now, chunked data is below 20 seconds after compiler optimization.
|
Bechmarking was done with the identical file that was used in the previous C implementation. Inserting none chunked data resulted in ~20 minutes of querying to database. Now, chunked data is below 20 seconds after compiler optimization.
|
||||||
|
|
||||||
Speaking of optimization: Do yourself a favor an run release code not debug code: `cargo run --release`. The compiler does a rather hefty optimization and you will save some time waiting for your precious data do be inserted. I did no further optimization besides trying to enable the compiler to do a better job. Just blackboxing, no assembly tweaking yet.
|
Speaking of optimization: Do yourself a favor an run release code not debug code: `cargo run --release`. The compiler does a rather hefty optimization and you will save some time waiting for your precious data do be inserted. I did no further optimization besides trying to enable the compiler to do a better job. Just blackboxing, no assembly tweaking yet.
|
||||||
|
|
106
src/main.rs
106
src/main.rs
|
@ -5,8 +5,12 @@ extern crate tokio_postgres;
|
||||||
mod configure;
|
mod configure;
|
||||||
mod parser;
|
mod parser;
|
||||||
mod serializer;
|
mod serializer;
|
||||||
use tokio_postgres::types::ToSql;
|
use tokio_postgres::types::{Type, ToSql};
|
||||||
use tokio_postgres::{Error, NoTls};
|
use tokio_postgres::{Error, NoTls};
|
||||||
|
use tokio_postgres::binary_copy::{BinaryCopyInWriter};
|
||||||
|
use futures::{pin_mut};
|
||||||
|
use tokio::task;
|
||||||
|
|
||||||
|
|
||||||
/* conditionals */
|
/* conditionals */
|
||||||
const FROM_FILE: bool = false;
|
const FROM_FILE: bool = false;
|
||||||
|
@ -67,54 +71,70 @@ async fn main() -> Result<(), Error> {
|
||||||
parser::parse(&_pcap_file, &config.filter, &config.regex_filter);
|
parser::parse(&_pcap_file, &config.filter, &config.regex_filter);
|
||||||
let packets_serialized = serializer::serialize_packets(v);
|
let packets_serialized = serializer::serialize_packets(v);
|
||||||
|
|
||||||
|
let sink = client.copy_in("COPY json_dump(packet) from STDIN BINARY").await.unwrap();
|
||||||
|
let writer = BinaryCopyInWriter::new(sink, &[Type::JSON]);
|
||||||
|
|
||||||
|
let join = task::spawn( async move {
|
||||||
|
pin_mut!(writer);
|
||||||
|
for pack in packets_serialized {
|
||||||
|
writer.as_mut().write(&[&pack]).await.unwrap();
|
||||||
|
// Reminder: write_raw() behavior is very strange, so it's write() for now.
|
||||||
|
// writer.as_mut().write_raw(chunk.into_iter().map(|p| p as &dyn ToSql).collect()).await.unwrap();
|
||||||
|
}
|
||||||
|
writer.finish().await.unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
let result = join.await.unwrap();
|
||||||
|
|
||||||
|
|
||||||
// TODO: Tuning vector capacity according to mean average & std dev of packet sizes
|
// TODO: Tuning vector capacity according to mean average & std dev of packet sizes
|
||||||
// let mut v = Vec::<parser::QryData>::with_capacity(100000);
|
// let mut v = Vec::<parser::QryData>::with_capacity(100000);
|
||||||
// v.extend(parser::parse(&_pcap_file, &config.filter, &config.regex_filter));
|
// v.extend(parser::parse(&_pcap_file, &config.filter, &config.regex_filter));
|
||||||
// let mut packets_serialized = Vec::<serde_json::Value>::with_capacity(100000);
|
// let mut packets_serialized = Vec::<serde_json::Value>::with_capacity(100000);
|
||||||
// packets_serialized.extend(serializer::serialize_packets(v));
|
// packets_serialized.extend(serializer::serialize_packets(v));
|
||||||
|
|
||||||
/* Do chunks and query data */
|
// /* Do chunks and query data */
|
||||||
let chunker = (&packets_serialized.len() < &config.insert_max) && (0 < packets_serialized.len()) ;
|
// let chunker = (&packets_serialized.len() < &config.insert_max) && (0 < packets_serialized.len()) ;
|
||||||
match chunker {
|
// match chunker {
|
||||||
NON_CHUNKED => {
|
// NON_CHUNKED => {
|
||||||
let insert_str = query_string(&packets_serialized.len(), &config.tablename);
|
// let insert_str = query_string(&packets_serialized.len(), &config.tablename);
|
||||||
let statement = client.prepare(&insert_str).await?;
|
// let statement = client.prepare(&insert_str).await?;
|
||||||
client
|
// client
|
||||||
.query_raw(
|
// .query_raw(
|
||||||
&statement,
|
// &statement,
|
||||||
packets_serialized.iter().map(|p| p as &dyn ToSql),
|
// packets_serialized.iter().map(|p| p as &dyn ToSql),
|
||||||
)
|
// )
|
||||||
.await?;
|
// .await?;
|
||||||
}
|
// }
|
||||||
CHUNKED => {
|
// CHUNKED => {
|
||||||
let insert_str = query_string(&config.insert_max, &config.tablename);
|
// let insert_str = query_string(&config.insert_max, &config.tablename);
|
||||||
let statement = client.prepare(&insert_str).await?;
|
// let statement = client.prepare(&insert_str).await?;
|
||||||
|
//
|
||||||
for chunk in packets_serialized.chunks_exact(config.insert_max) {
|
// for chunk in packets_serialized.chunks_exact(config.insert_max) {
|
||||||
client
|
// client
|
||||||
.query_raw(&statement, chunk.iter().map(|p| p as &dyn ToSql))
|
// .query_raw(&statement, chunk.iter().map(|p| p as &dyn ToSql))
|
||||||
.await?;
|
// .await?;
|
||||||
}
|
// }
|
||||||
let remainder_len = packets_serialized
|
// let remainder_len = packets_serialized
|
||||||
.chunks_exact(config.insert_max)
|
// .chunks_exact(config.insert_max)
|
||||||
.remainder()
|
// .remainder()
|
||||||
.len();
|
// .len();
|
||||||
if 0 < remainder_len {
|
// if 0 < remainder_len {
|
||||||
let rem_str = query_string(&remainder_len, &config.tablename);
|
// let rem_str = query_string(&remainder_len, &config.tablename);
|
||||||
let statement = client.prepare(&rem_str).await?;
|
// let statement = client.prepare(&rem_str).await?;
|
||||||
client
|
// client
|
||||||
.query_raw(
|
// .query_raw(
|
||||||
&statement,
|
// &statement,
|
||||||
packets_serialized
|
// packets_serialized
|
||||||
.chunks_exact(config.insert_max)
|
// .chunks_exact(config.insert_max)
|
||||||
.remainder()
|
// .remainder()
|
||||||
.iter()
|
// .iter()
|
||||||
.map(|p| p as &dyn ToSql),
|
// .map(|p| p as &dyn ToSql),
|
||||||
)
|
// )
|
||||||
.await?;
|
// .await?;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
FROM_DEVICE => {
|
FROM_DEVICE => {
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
{
|
{
|
||||||
"insert_max": 20000,
|
"insert_max": 20000,
|
||||||
"filter": "tcp",
|
"filter": "tcp",
|
||||||
"regex_filter": "(?:http|https)[[:punct:]]+[[:alnum:]]+[[:punct:]][[:alnum:]]+[[:punct:]](?:com|de|org)",
|
"regex_filter": "(?:http|https)[[:punct:]]+[[:alnum:]]+[[:punct:]][[:alnum:]]+[[:punct:]](?:com|de|org|net)",
|
||||||
"from_device": false,
|
"from_device": false,
|
||||||
"parse_device": "enp7s0",
|
"parse_device": "enp7s0",
|
||||||
"pcap_file": "not in use right now",
|
"pcap_file": "<not in use right now>",
|
||||||
"pcap_dir": "../target",
|
"pcap_dir": "../target/files",
|
||||||
"database_tablename": "json_dump",
|
"database_tablename": "json_dump",
|
||||||
"database_user": "postgres",
|
"database_user": "postgres",
|
||||||
"database_host": "localhost",
|
"database_host": "localhost",
|
||||||
|
|
Loading…
Reference in New Issue