added database table to config file

This commit is contained in:
gurkenhabicht 2020-06-18 01:45:12 +02:00
parent 6a2dc9cff6
commit 668a22b4d0
5 changed files with 52 additions and 37 deletions

View File

@ -1,37 +1,36 @@
# This is experimental # This is experimental
This version is a successor of the _POSIX_C_SOURCE 200809L implementation in which all of the data of a parsed a pcap/pcapng files is written as a single and simple query. The ingestion result is rather fast (tested writes: 100*10^3 tcp packets in ~1.8 sec) but may be insecure. The software is written in Rust (2018, safe mode only). At the current state I have some fun writing and testing language features. The code should be modular enough to change any function you deem awfull enough.
Error handling is subpar at the moment. There is no real unit testing to speak of since switching to asynchronous functionality. Testing will come back.
This version is a successor of the _POSIX_C_SOURCE 200809L implementation in which all data parsed from a cap/pcapng files is written as a single and simple query. The ingestion time is rather fast (tested writes: 100*10^3 tcp packets in ~1.8 sec) but may be insecure. See the other repository.
The idea of this iteration is to use a prepared statement and chunk the data according to maximum input. Postgres databases have a custom maximum limit on each insert query of prepared statements. Said chunk size is initialized through the config/interface file called parser.json as `insert_max`. Data can be read from PCAP/PCANG files, as well as network devices. The idea of this iteration is to use a prepared statement and chunk the data according to maximum input. Postgres databases have a custom maximum limit on each insert query of prepared statements. Said chunk size is initialized through the config/interface file called parser.json as `insert_max`. Data can be read from PCAP/PCANG files, as well as network devices.
The software is written in Rust (no unsafe mode). At the current state I am testing language features. The code should be modular enough to change any awfully written function.
Error handling is subpar at the moment. There is no real unit testing to speak of since switching to asynchronous functionality.
Process is as follows: Process is as follows:
- Choose between network device (which should be used as well) or file input - Choose between network device (specify it as well) or file input
- Choosing device is straight forward -> data gets parsed, chunked and queries prepared according to `insert_max` size - Choosing device is straight forward -> data gets parsed, chunked and queries prepared according to `insert_max` size
- Encapsulation type / Linktype is chosen in beforehand. Currently Ethernet and RawIp is supported. - Encapsulation type / Linktype is chosen in beforehand. Currently Ethernet and RawIp is supported.
- Choosing file input means selecting a directory where your PCAP/PCAPNG files reside. - Choosing file input means selecting a directory where your PCAP/PCAPNG files reside.
- A hash map is created out of key(paths):value(metadata) of the pcap files in the specified directory. - A hash map is created out of key(paths):value(metadata) out of pcap files found in the specified directory.
- The parser gets invoked, which itself calls the appropriate protocol handler on to the byte data of yielded packets. A vector of type QryData is returned after EOF has been hit. - The parser gets invoked, which itself calls the appropriate protocol handler on to the byte data of packetsi yielded by pcap. A vector of type QryData is returned after EOF has been hit.
- QryData vector is serialized. - QryData vector is serialized.
- Serialized data gets chunked. - Serialized data gets chunked.
- Prepared statements are prepared according to chunksize - Prepared statements are prepared according to chunksize
- Queried data gets queried in chunks afterwards - Queried data gets queried in chunks afterwards
Currently, ethernet, IPv4, IPV6, TCP, UDP and ARP/RARP network protocols are handled. Currently, ethernet, IPv4, IPV6, TCP, UDP and ARP/RARP network protocols are handled.
Because of testing purposes, layout of the table is serialized json. Table layout is somewhat "dynamic". Procotols inside the parsed packet are not null inside serialized json data, all others are. Because of testing purposes, layout of the table is serialized json. Table layout is somewhat "dynamic". Any procotols not recognized in a parsed packet will be marked as NULL inside a resulting table row.
A query may look like this `select packet from json_dump where packet->>'ipv4_header' is not null;` A query may look like this `select packet from json_dump where packet->>'ipv4_header' is not null;`
Speaking of serialization: After profiling it turns out that ~20% of cpu time is used for serialization to json. This, of course, could be saved completely. Speaking of serialization: After profiling it turns out that ~20% of cpu time is used for serialization to json. This, of course, could be saved completely.
Another subgoal was the ability to compile a static binary, which --last time I tested-- works but the need for libpcap itself. Everything else had no dependencies. It even executes on oracle linux, after linking against the elf64 interpreter directly. If you ever had the pleasure using this derivate it may come as a suprise to you. The key is to compile via `x86_64-unknown-linux-musl` target. See: https://doc.rust-lang.org/edition-guide/rust-2018/platform-and-target-support/musl-support-for-fully-static-binaries.html Another subgoal was the ability to compile a static binary, which --last time I tested-- works without dependencies, but the need for libpcap itself. It even executes on oracle linux, after linking against the elf64 interpreter in a direct manner. If you ever had the pleasure using this derivate it may come as a suprise to you. The key is to compile via `x86_64-unknown-linux-musl` target. See: https://doc.rust-lang.org/edition-guide/rust-2018/platform-and-target-support/musl-support-for-fully-static-binaries.html
If this whole thing turns out to be viable, some future features may be: If this whole thing turns out to be viable, some future features may be:
- Database containing file hash map to compare file status/sizes after the parser may have crashed, or to join a complete overview of any existing PCAP files. - Database containing file hash map to compare file status/sizes after the parser may have crashed, or to join a complete overview of any existing PCAP files.
- Concurrency. There are some interresting ways of parallelization I am working on to find a model that really benefits the use case. MPSC looks promising at the moment. Thats why tokio carte is already implemented for db queries, but has no performance benefit at the moment. - Concurrency. There are some interresting ways of parallelization I am working on to find a model that really benefits the use case. MPSC looks promising at the moment. Thats why tokio carte is already implemented for db queries, but has no performance benefit at the moment. Inplementing a MPSC pipe has the nice side effect of lower memory usage, parsed packages will directly be piped to json serialization function without beeing stored in a separate vector.
- Update file hashmap through inotify crate, during runtime. - Update file hashmap through inotify crate, during runtime.
- Restoration of fragmented ipv4 packages. - Restoration of fragmented ipv4 packages.
- SIMD (via autovectorization). Which is easy enough to do in Rust. - SIMD (via autovectorization). Which is easy enough to do in Rust.

View File

@ -1,5 +1,5 @@
// Init of configuration files could also be done via Config crate. // Init of configuration files could also be done via extern Config crate.
// But at this point of development it seems like this overhead is unjust. // But at this point of development it seems like this overhead would be unjust.
extern crate serde_json; extern crate serde_json;
use byteorder::{ByteOrder, LittleEndian}; use byteorder::{ByteOrder, LittleEndian};
@ -19,6 +19,7 @@ pub struct Config {
pub regex_filter: String, pub regex_filter: String,
pub insert_max: usize, pub insert_max: usize,
pub pcap_file: String, pub pcap_file: String,
pub tablename: String,
pub connection: String, pub connection: String,
pub device: String, pub device: String,
pub is_device: bool, pub is_device: bool,
@ -30,7 +31,7 @@ pub struct FileInfo {
pub encapsulation_type: u16, pub encapsulation_type: u16,
pub file_size: u64, pub file_size: u64,
pub metadata: std::fs::Metadata, pub metadata: std::fs::Metadata,
// std::fs::Metadata::FileType + FilePermission return identical values?! // std::fs::Metadata::FileType + FilePermission return identical values?! Am I missing something?
} }
impl FileInfo { impl FileInfo {
@ -55,7 +56,8 @@ pub fn from_json_file() -> Option<Config> {
.unwrap() .unwrap()
.to_owned(), .to_owned(),
insert_max: json.get("insert_max").unwrap().as_u64().unwrap() as usize, insert_max: json.get("insert_max").unwrap().as_u64().unwrap() as usize,
pcap_file: json.get("pcap_file").unwrap().as_str().unwrap().to_owned(), pcap_file: json.get("pcap_file").unwrap().as_str().unwrap().to_owned(), // Not in use atm
tablename: json.get("database_tablename").unwrap().as_str().unwrap().to_owned(),
connection: format!( connection: format!(
"host={} user={} password={}", "host={} user={} password={}",
json.get("database_host").unwrap().as_str().unwrap(), json.get("database_host").unwrap().as_str().unwrap(),

View File

@ -14,31 +14,30 @@ const FROM_DEVICE: bool = true;
const NON_CHUNKED: bool = true; const NON_CHUNKED: bool = true;
const CHUNKED: bool = false; const CHUNKED: bool = false;
fn query_string(insert_max: &usize) -> String { fn query_string(insert_max: &usize, table_name: &str) -> String {
let mut insert_template = String::with_capacity(insert_max * 8 + 43); let mut insert_template = String::with_capacity(insert_max * 8 + 96);
insert_template.push_str("INSERT INTO json_dump (packet) Values "); insert_template.push_str(&*format!("INSERT INTO {} (packet) Values ", table_name));
for insert in 0..insert_max - 1 { for insert in 0..insert_max - 1 {
insert_template.push_str(&(format!("(${}), ", insert + 1))); insert_template.push_str(&*format!("(${}), ", insert + 1));
} }
insert_template.push_str(&(format!("(${})", insert_max))); insert_template.push_str(&*format!("(${})", insert_max));
insert_template insert_template
} }
#[tokio::main(core_threads = 4)] // By default, tokio_postgres uses the tokio crate as its runtime. #[tokio::main(core_threads = 4)] // Tokio is implemented for possible future use.
async fn main() -> Result<(), Error> { async fn main() -> Result<(), Error> {
/* Init values from file */
/* Init values from file */
let config: configure::Config = configure::from_json_file().unwrap(); let config: configure::Config = configure::from_json_file().unwrap();
let pcap_map = configure::map_pcap_dir(&config.pcap_dir).unwrap(); let pcap_map = configure::map_pcap_dir(&config.pcap_dir).unwrap();
println!("{:?}", pcap_map.iter());
// TODO: Create db table with pcap file hashes // TODO: Create db table with pcap file hashes
// TODO: hash file metadata, so its state is comparable at times and can be written to a db table (and read e.g. after system crash) // TODO: hash file metadata, so its state is comparable at times and can be written to a db table (and read e.g. after system crash)
// This db table should include UUIDs so it can be joined effectively with former runs // This db table should include UUIDs so it can be joined effectively with former runs
// TODO: Use inotfy crate to update pcap_map according to files created while parser is running // TODO: Use inotfy crate to update pcap_map according to files created while parser is running
/* db connection */ /* db connection */
let (client, connection) = tokio_postgres::connect(&config.connection, NoTls).await?; let (client, connection) = tokio_postgres::connect(&config.connection, NoTls).await?;
tokio::spawn(async move { tokio::spawn(async move {
@ -47,11 +46,11 @@ async fn main() -> Result<(), Error> {
} }
}); });
client client
.execute("DROP TABLE IF EXISTS json_dump", &[]) .execute(&*format!("DROP TABLE IF EXISTS {}", &config.tablename), &[])
.await?; .await?;
client client
.execute( .execute(
"CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)", &*format!("CREATE TABLE {} ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)", &config.tablename),
&[], &[],
) )
.await?; .await?;
@ -60,12 +59,13 @@ async fn main() -> Result<(), Error> {
match config.is_device { match config.is_device {
FROM_FILE => { FROM_FILE => {
for (_pcap_file, _pcap_info) in pcap_map.iter() { for (_pcap_file, _pcap_info) in pcap_map.iter() {
println!("{:?}", &_pcap_file); println!("{:?}: {:?}", &_pcap_file, &_pcap_info);
// TODO: Tuning vector capacity according to mean average & std dev of packet sizes
let v: Vec<parser::QryData> = let v: Vec<parser::QryData> =
parser::parse(&_pcap_file, &config.filter, &config.regex_filter); parser::parse(&_pcap_file, &config.filter, &config.regex_filter);
let packets_serialized = serializer::serialize_packets(v); let packets_serialized = serializer::serialize_packets(v);
// TODO: Tuning vector capacity according to mean average & std dev of packet sizes
// let mut v = Vec::<parser::QryData>::with_capacity(100000); // let mut v = Vec::<parser::QryData>::with_capacity(100000);
// v.extend(parser::parse(&_pcap_file, &config.filter, &config.regex_filter)); // v.extend(parser::parse(&_pcap_file, &config.filter, &config.regex_filter));
// let mut packets_serialized = Vec::<serde_json::Value>::with_capacity(100000); // let mut packets_serialized = Vec::<serde_json::Value>::with_capacity(100000);
@ -74,10 +74,14 @@ async fn main() -> Result<(), Error> {
/* Do chunks and query data */ /* Do chunks and query data */
let chunk_count = packets_serialized.len() / config.insert_max; let chunk_count = packets_serialized.len() / config.insert_max;
let remainder: usize = packets_serialized.len() % config.insert_max; let remainder: usize = packets_serialized.len() % config.insert_max;
println!("chunks: {:?}", &chunk_count);
println!("remainder: {:?}", &remainder);
let chunker = &packets_serialized.len() < &config.insert_max; let chunker = &packets_serialized.len() < &config.insert_max;
match chunker { match chunker {
NON_CHUNKED => { NON_CHUNKED => {
let insert_str = query_string(&packets_serialized.len()); let insert_str = query_string(&packets_serialized.len(), &config.tablename);
let statement = client.prepare(&insert_str).await?; let statement = client.prepare(&insert_str).await?;
client client
.query_raw( .query_raw(
@ -87,7 +91,7 @@ async fn main() -> Result<(), Error> {
.await?; .await?;
} }
CHUNKED => { CHUNKED => {
let insert_str = query_string(&config.insert_max); let insert_str = query_string(&config.insert_max, &config.tablename);
let statement = client.prepare(&insert_str).await?; let statement = client.prepare(&insert_str).await?;
for _i in 0..chunk_count { for _i in 0..chunk_count {
@ -95,19 +99,19 @@ async fn main() -> Result<(), Error> {
client client
.query_raw( .query_raw(
&statement, &statement,
_input.to_vec().iter().map(|p| p as &dyn ToSql), _input.iter().map(|p| p as &dyn ToSql),
) )
.await?; .await?;
} }
if 0 < remainder { if 0 < remainder {
let rem_str = query_string(&remainder); let rem_str = query_string(&remainder, &config.tablename);
let statement = client.prepare(&rem_str).await?; let statement = client.prepare(&rem_str).await?;
let (_garbage, _input) = let (_garbage, _input) =
packets_serialized.split_at(packets_serialized.len() - remainder); packets_serialized.split_at(packets_serialized.len() - remainder);
client client
.query_raw( .query_raw(
&statement, &statement,
_input.to_vec().iter().map(|p| p as &dyn ToSql), _input.iter().map(|p| p as &dyn ToSql),
) )
.await?; .await?;
} }
@ -116,7 +120,7 @@ async fn main() -> Result<(), Error> {
} }
} }
FROM_DEVICE => { FROM_DEVICE => {
let insert_str = query_string(&config.insert_max); let insert_str = query_string(&config.insert_max, &config.tablename);
let statement = client.prepare(&insert_str).await?; let statement = client.prepare(&insert_str).await?;
loop { loop {
let v: Vec<parser::QryData> = parser::parse_device( let v: Vec<parser::QryData> = parser::parse_device(

View File

@ -6,6 +6,7 @@
"parse_device": "enp7s0", "parse_device": "enp7s0",
"pcap_file": "", "pcap_file": "",
"pcap_dir": "../target", "pcap_dir": "../target",
"database_tablename": "json_dump",
"database_user": "postgres", "database_user": "postgres",
"database_host": "localhost", "database_host": "localhost",
"database_password": "password" "database_password": "password"

View File

@ -52,14 +52,14 @@ pub struct QryData {
#[allow(dead_code)] #[allow(dead_code)]
enum EncapsulationType { enum EncapsulationType {
// pcap::Linktype::get_name() is unsafe. // pcap::Linktype::get_name() is unsafe. That's why this data structure would be an alternative.
EN10MB = 1, // See: https://docs.rs/pcap/0.7.0/src/pcap/lib.rs.html#247-261 EN10MB = 1, // See: https://docs.rs/pcap/0.7.0/src/pcap/lib.rs.html#247-261
RAW = 101, // Would this be an issue? RAW = 101, // Would this be an issue?
} }
impl QryData { impl QryData {
// This is not cool! // This is not cool!
// I don't know if object oriented is the way to go here.It's awful but modular! // I don't know if object oriented is the way to go here. It's awful but modular!
// Maybe I'll do a roolback and do a different design // Maybe I'll do a roolback and do a different design
fn new() -> QryData { fn new() -> QryData {
@ -141,7 +141,7 @@ impl QryData {
Ok(()) Ok(())
} }
// TODO: impl correct Err type and use in Result // TODO: impl correct Err type and use as Result
fn transport_layer( fn transport_layer(
&mut self, &mut self,
packet_data: &[u8], packet_data: &[u8],
@ -196,6 +196,9 @@ fn flag_carnage(re: &Regex, payload: &[u8]) -> Option<String> {
flags.push_str(std::str::from_utf8(mat.as_bytes()).unwrap()); flags.push_str(std::str::from_utf8(mat.as_bytes()).unwrap());
flags.push_str(";"); flags.push_str(";");
} }
//if flags.len() > 0{
//println!("{:?}", flags);
//}
match 0 < flags.len() { match 0 < flags.len() {
false => None, false => None,
true => Some(flags), true => Some(flags),
@ -218,8 +221,12 @@ pub fn parse(parse_file: &std::path::Path, filter_str: &str, regex_filter: &str)
}; };
me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64; me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64;
me.reg_res = flag_carnage(&re, packet.data); // Regex overhead is between 4-9% --single threaded-- on complete packet [u8] data me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex overhead is between 4-9% --single threaded-- on complete packet [u8] data
//v.push(me.clone()); //v.push(me.clone());
if me.reg_res.is_some(){
println!("{:?}", &me.reg_res);
}
v.push(QryData { v.push(QryData {
id: 0, id: 0,
time: me.time, time: me.time,
@ -238,6 +245,8 @@ pub fn parse(parse_file: &std::path::Path, filter_str: &str, regex_filter: &str)
/* This could need some love */ /* This could need some love */
pub fn parse_device( pub fn parse_device(
// Pcap file data parsing will result in less cpu cycles if device parsing is handled in a seperate function, I guess.
// It would result in less conditional overhead?!
parse_device: &str, parse_device: &str,
filter_str: &str, filter_str: &str,
insert_max: &usize, insert_max: &usize,