reworked qrydata

This commit is contained in:
gurkenhabicht 2020-06-21 20:08:35 +02:00
parent d664ac8122
commit e963232e82
5 changed files with 51 additions and 59 deletions

View File

@ -23,7 +23,8 @@ Currently, ethernet, IPv4, IPV6, TCP, UDP and ARP/RARP network protocols are han
Because of testing purposes, layout of the table is serialized json. Table layout is somewhat "dynamic". Any procotols not recognized in a parsed packet will be marked as NULL inside a resulting table row.
A query may look like this `select packet from json_dump where packet->>'ipv4_header' is not null;`
UPDATE: Chunking can be omitted completely when using PostgreSQL's `COPY` transferring binary data instead of using `Insert`. This is not only somewhat faster -- not as much as I expectedi, unfortunately -- but there are quite a few lines of code less in the end. Only parsing fromnetwork device still needs chunks.
**UPDATE 0.2.0**: Chunking can be omitted completely when using PostgreSQL's `COPY` transferring binary data instead of using `Insert`. This is not only somewhat faster -- not as much as I expectedi, unfortunately -- but there are quite a few lines of code less in the end. Only parsing fromnetwork device still needs chunks.
The other recent change is that only none NULL protocols data of a packet is serialized to json. Table insertion should be smaller this way.
Speaking of serialization: After profiling it turns out that ~20% of cpu time is used for serialization to json. This, of course, could be saved completely.
@ -32,13 +33,12 @@ Another subgoal was the ability to compile a static binary, which --last time I
Caveats: Regex Syntax is limited at the moment, because it is not compiled from a Rawstring, but a common one. Escaping does not work properly, character classes do. I have to fiddle the correct synctactical way to get it out of the json file and into a raw. For already supported regular expression syntax see: https://docs.rs/regex/1.3.9/regex/#syntax , also see the example in `parser.json`.
Transmitting all the data of the formerly described testing table layout results in a rather big table size. HDD space was no issue so far. Ingest of 30808676 TCP/IP Packets taken from iCTF 2020 PCAPs results in 99.4GB of json data. See: https://docs.docker.com/engine/reference/run/#runtime-constraints-on-resources for more details.
Gotchas: My test setup consists of a postgresql db inside a docker container. Main memory usage of said container is low ~300MB, but I had to set `--oom-score-adj=999` in order to not get the container quit automatically. `--oom-kill-disable=false` would turn it off complete, I guess.
Gotchas: My test setup consists of a postgresql db inside a docker container. Main memory usage of said container is low ~300MB, but I had to set `--oom-score-adj=999` in order to not get the container quit automatically. `--oom-kill-disable=false` would turn it off complete, I guess. I did no fine tuning of this value, yet.
If this whole thing turns out to be viable, some future features may be:
- Database containing file hash map to compare file status/sizes after the parser may have crashed, or to join a complete overview of any existing PCAP files.
- Concurrency. There are some interresting ways of parallelization I am working on to find a model that really benefits the use case. MPSC looks promising at the moment. Thats why tokio carte is already implemented for db queries, but has no performance benefit at the moment. Inplementing a MPSC pipe has the nice side effect of lower memory usage, parsed packages will directly be piped to json serialization function without beeing stored in a separate vector.
- Concurrency. There are some interresting ways of parallelization I am working on to find a model that really benefits the use case. MPSC looks promising at the moment. Inplementing a MPSC pipe has the nice side effect of lower memory usage, parsed packages will directly be piped to json serialization function without beeing stored in a separate vector. In the sense of pcap from config -> parser (without vec usage) -> serializer -> insertion.
- Update file hashmap through inotify crate, during runtime.
- Restoration of fragmented ipv4 packages.
- SIMD (via autovectorization). Which is easy enough to do in Rust.
@ -46,6 +46,6 @@ If this whole thing turns out to be viable, some future features may be:
There are many other things left to be desired.
Bechmarking was done with the identical file that was used in the previous C implementation. Inserting none chunked data resulted in ~20 minutes of querying to database. Now, chunked data is below 20 seconds after compiler optimization.
Bechmarking was done with the identical file that was used in the previous C implementation. Inserting none chunked data resulted in ~20 minutes of querying to database. Now, chunked data is below 12 seconds after compiler optimization.
Speaking of optimization: Do yourself a favor an run release code not debug code: `cargo run --release`. The compiler does a rather hefty optimization and you will save some time waiting for your precious data do be inserted. I did no further optimization besides trying to enable the compiler to do a better job. Just blackboxing, no assembly tweaking yet.

View File

@ -11,12 +11,11 @@ use tokio_postgres::binary_copy::{BinaryCopyInWriter};
use futures::{pin_mut};
use tokio::task;
/* conditionals */
const FROM_FILE: bool = false;
const FROM_DEVICE: bool = true;
const NON_CHUNKED: bool = true;
const CHUNKED: bool = false;
//const NON_CHUNKED: bool = true;
//const CHUNKED: bool = false;
fn query_string(insert_max: &usize, table_name: &str) -> String {
let mut insert_template = String::with_capacity(insert_max * 8 + 96);
@ -30,7 +29,7 @@ fn query_string(insert_max: &usize, table_name: &str) -> String {
insert_template
}
#[tokio::main(core_threads = 4)] // Tokio is implemented for possible future use.
#[tokio::main(core_threads = 4)]
async fn main() -> Result<(), Error> {
/* Init values from file */
let config: configure::Config = configure::from_json_file().unwrap();
@ -83,16 +82,14 @@ async fn main() -> Result<(), Error> {
}
writer.finish().await.unwrap();
});
assert!(join.await.is_ok());
let result = join.await.unwrap();
// TODO: Tuning vector capacity according to mean average & std dev of packet sizes
// TODO: MPSC channel
// let mut v = Vec::<parser::QryData>::with_capacity(100000);
// v.extend(parser::parse(&_pcap_file, &config.filter, &config.regex_filter));
// let mut packets_serialized = Vec::<serde_json::Value>::with_capacity(100000);
// packets_serialized.extend(serializer::serialize_packets(v));
// Reminder: If COPY doesn't cut it and INSERT is the way to go, uncomment and use following logic inside FROM_FILE
// /* Do chunks and query data */
// let chunker = (&packets_serialized.len() < &config.insert_max) && (0 < packets_serialized.len()) ;
// match chunker {

View File

@ -1,11 +1,11 @@
{
"insert_max": 20000,
"filter": "tcp",
"filter": "ip6 && tcp",
"regex_filter": "(?:http|https)[[:punct:]]+[[:alnum:]]+[[:punct:]][[:alnum:]]+[[:punct:]](?:com|de|org|net)",
"from_device": false,
"parse_device": "enp7s0",
"pcap_file": "<not in use right now>",
"pcap_dir": "../target/files",
"pcap_dir": "../target",
"database_tablename": "json_dump",
"database_user": "postgres",
"database_host": "localhost",

View File

@ -6,6 +6,7 @@ use pcap::{Capture, Linktype};
use regex::bytes::Regex;
use std::convert::TryInto;
use std::str;
use serde::Serialize;
//use std::thread::{spawn, JoinHandle};
//use std::sync::mpsc::{channel, Receiver};
@ -26,33 +27,31 @@ const IPV6_HDRLEN: u32 = 0xA; // I know, this will get changed. It works for now
const IPV4: usize = 0x4;
const IPV6: usize = 0x6;
/*
QryData could be written in the sense of QryData{ ... frame: .., packet: .., segment:.. }
On the one hand, only the actual type of frame/packet/segment would be contained in the resulting struct.
So, increased benefit in serialization/cpu time, could result in less data to be serialized, depending on layout.
On the other hand, each datagram::type needs to implement traits which would need to be dynamically dispatched by returning any of these types per iso level from a single function each. The result would be a performance decrease.
See: https://doc.rust-lang.org/book/ch10-02-traits.html#returning-types-that-implement-traits
See: https://doc.rust-lang.org/book/ch17-02-trait-objects.html#trait-objects-perform-dynamic-dispatch
Then again, parser logic would be fewer lines + more unified using the latter method. Maybe better optimizable as well? Maybe this is a nice tradeoff?
TODO: Implement and benchmark dynamically dispatched packet data in conjunction with restructured QryData.
*/
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct QryData {
pub id: i32,
pub time: f64,
pub data: Option<Vec<u8>>,
#[serde(skip_serializing_if = "Option::is_none")] // I came to the conclusion, that importing serde_with crate just for a single struct decorator
pub data: Option<Vec<u8>>, // is not worth it. So, this looks a bit ugly, having eight decorator. Deal with it.
#[serde(skip_serializing_if = "Option::is_none")]
pub ether_header: Option<packet_handler::EtherHeader>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ipv4_header: Option<packet_handler::IpV4Header>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ipv6_header: Option<packet_handler::IpV6Header>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tcp_header: Option<packet_handler::TcpHeader>,
#[serde(skip_serializing_if = "Option::is_none")]
pub udp_header: Option<packet_handler::UdpHeader>,
#[serde(skip_serializing_if = "Option::is_none")]
pub arp_header: Option<packet_handler::ArpHeader>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reg_res: Option<String>,
}
#[allow(dead_code)]
enum EncapsulationType {
// pcap::Linktype::get_name() is unsafe. That's why this data structure would be an alternative.
// pcap::Linktype::get_name() is unsafe. That's why this safe data structure would be an alternative.
EN10MB = 1, // See: https://docs.rs/pcap/0.7.0/src/pcap/lib.rs.html#247-261
RAW = 101, // Would this be an issue?
}

View File

@ -1,30 +1,26 @@
extern crate serde_json;
use crate::parser;
use rayon::prelude::*;
use serde::ser::{Serialize, SerializeStruct, Serializer};
//use std::thread::{spawn, JoinHandle};
//use std::thread;
//use std::sync::mpsc::{channel, Receiver};
//use serde_json::json;
//use serde::ser::{Serialize, SerializeStruct, Serializer};
impl Serialize for parser::QryData {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("parser::QryData", 9)?;
state.serialize_field("time", &self.time)?;
state.serialize_field("ether_header", &self.ether_header)?;
state.serialize_field("ipv4_header", &self.ipv4_header)?;
state.serialize_field("ipv6_header", &self.ipv6_header)?;
state.serialize_field("tcp_header", &self.tcp_header)?;
state.serialize_field("udp_header", &self.udp_header)?;
state.serialize_field("arp_header", &self.arp_header)?;
state.serialize_field("data", &self.data)?;
state.serialize_field("reg_res", &self.reg_res)?;
state.end()
}
}
//impl Serialize for parser::QryData {
// fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
// where
// S: Serializer,
// {
// let mut state = serializer.serialize_struct("parser::QryData", 9)?;
// state.serialize_field("time", &self.time)?;
// state.serialize_field("ether_header", &self.ether_header)?;
// state.serialize_field("ipv4_header", &self.ipv4_header)?;
// state.serialize_field("ipv6_header", &self.ipv6_header)?;
// state.serialize_field("tcp_header", &self.tcp_header)?;
// state.serialize_field("udp_header", &self.udp_header)?;
// state.serialize_field("arp_header", &self.arp_header)?;
// state.serialize_field("data", &self.data)?;
// state.serialize_field("reg_res", &self.reg_res)?;
// state.end()
// }
//}
pub fn serialize_packets(v: Vec<parser::QryData>) -> Vec<serde_json::Value> {
/* rayon parallelized */