formatting, cleanup, readme overhaul
This commit is contained in:
parent
5c36509827
commit
636c383e64
46
README.md
46
README.md
|
@ -4,23 +4,10 @@ The software is written in Rust (2018, safe mode only). At the current state I a
|
||||||
Error handling is subpar at the moment. There is no real unit testing to speak of since switching to asynchronous functionality. Testing will come back.
|
Error handling is subpar at the moment. There is no real unit testing to speak of since switching to asynchronous functionality. Testing will come back.
|
||||||
|
|
||||||
This version is a successor of the _POSIX_C_SOURCE 200809L implementation in which all data parsed from a pcap/pcapng files is written as a single and simple query. The ingestion time is rather fast (tested writes: 100*10^3 tcp packets in ~1.8 sec) but the procedure may be insecure. See the other repository for more information.
|
This version is a successor of the _POSIX_C_SOURCE 200809L implementation in which all data parsed from a pcap/pcapng files is written as a single and simple query. The ingestion time is rather fast (tested writes: 100*10^3 tcp packets in ~1.8 sec) but the procedure may be insecure. See the other repository for more information.
|
||||||
The idea of this iteration is to use a prepared statement and chunk the data according to maximum input. Postgres databases have a custom maximum limit on each insert query of prepared statements. Said chunk size is initialized through the config/interface file called parser.json as `insert_max`. Data can be read from PCAP/PCANG files, as well as network devices.
|
~~The idea of this iteration is to use a prepared statement and chunk the data according to maximum input. Postgres databases have a custom maximum limit on each insert query of prepared statements. Said chunk size is initialized through the config/interface file called parser.json as `insert_max`. Data can be read from PCAP/PCANG files, as well as network devices.~~
|
||||||
|
|
||||||
**UPDATE 0.2.0**: Chunking (more on this in the next paragraph) can be omitted completely when using PostgreSQL's `COPY` transferring binary data instead of using `Insert`. This is not only somewhat faster, but there are quite a few lines of code less in the end. Only parsing from network device still needs chunks.
|
**UPDATE 0.2.0**: Chunking can be omitted completely when using PostgreSQL's `COPY` transferring binary data instead of using `Insert`. This is not only somewhat faster, but there are quite a few lines of code less in the end. Only parsing from network device uses needs chunks, at the moment.
|
||||||
The other recent change is that only none NULL protocol data of a packet is serialized to json. Table insertion should be smaller this way.
|
The other recent change is that only none NULL protocol data of a packet is serialized to json. Table insertion should be smaller this way. Further an mspc `sync_channel` is implemented which reduces main memory pressure.
|
||||||
|
|
||||||
Process is as follows:
|
|
||||||
|
|
||||||
- Choose between network device (specify it as well) or file input
|
|
||||||
- Choosing device is straight forward -> data gets parsed, chunked and queries prepared according to `insert_max` size
|
|
||||||
- Encapsulation type / Linktype is chosen in beforehand. Currently Ethernet and RawIp is supported.
|
|
||||||
- Choosing file input means selecting a directory where your PCAP/PCAPNG files reside.
|
|
||||||
- A hash map is created out of key(paths):value(metadata) out of pcap files found in the specified directory.
|
|
||||||
- The parser gets invoked, which itself calls the appropriate protocol handler on to the byte data of packetsi yielded by pcap. A vector of type QryData is returned after EOF has been hit.
|
|
||||||
- QryData vector is serialized.
|
|
||||||
- Serialized data gets chunked.
|
|
||||||
- Prepared statements are prepared according to chunksize
|
|
||||||
- Queried data gets queried in chunks afterwards
|
|
||||||
|
|
||||||
Currently, ethernet, IPv4, IPV6, TCP, UDP and ARP/RARP network protocols are handled any additional session layer/wrapped data can be found in packet->data[u8] -- for now.
|
Currently, ethernet, IPv4, IPV6, TCP, UDP and ARP/RARP network protocols are handled any additional session layer/wrapped data can be found in packet->data[u8] -- for now.
|
||||||
Because of testing purposes, layout of the table is serialized json. Table layout is somewhat "dynamic". Any procotols not recognized in a parsed packet will be marked as NULL inside a resulting table row.
|
Because of testing purposes, layout of the table is serialized json. Table layout is somewhat "dynamic". Any procotols not recognized in a parsed packet will be marked as NULL inside a resulting table row.
|
||||||
|
@ -28,32 +15,33 @@ A query may look like this `select packet->>'ipv4_header' from json_dump;` or th
|
||||||
|
|
||||||
Another subgoal was the ability to compile a static binary, which --last time I tested-- works without dependencies, but the need for libpcap itself. It even executes on oracle linux, after linking against the elf64 interpreter in a direct manner. If you ever had the pleasure using this derivate it may come as a suprise to you. The key is to compile via `x86_64-unknown-linux-musl` target. See: https://doc.rust-lang.org/edition-guide/rust-2018/platform-and-target-support/musl-support-for-fully-static-binaries.html
|
Another subgoal was the ability to compile a static binary, which --last time I tested-- works without dependencies, but the need for libpcap itself. It even executes on oracle linux, after linking against the elf64 interpreter in a direct manner. If you ever had the pleasure using this derivate it may come as a suprise to you. The key is to compile via `x86_64-unknown-linux-musl` target. See: https://doc.rust-lang.org/edition-guide/rust-2018/platform-and-target-support/musl-support-for-fully-static-binaries.html
|
||||||
|
|
||||||
Caveats: Regex Syntax is limited at the moment, because it is not compiled from a Rawstring, but a common one. Escaping does not work properly, character classes do. I have to fiddle the correct synctactical way to get it out of the json file and into a raw. For already supported regular expression syntax see: https://docs.rs/regex/1.3.9/regex/#syntax , also see the example in `parser.json`.
|
Caveats: Regex Syntax is limited and needs soome love. Escaping common regular expression synta does not work properly, but character classes do. I have to fiddle the correct synctactical way to get it out of the json file and into a rawstring. For already supported regular expression syntax see: https://docs.rs/regex/1.3.9/regex/#syntax , also see the example in `parser.json` which parses some toplevel domains.
|
||||||
|
|
||||||
If this whole thing turns out to be viable, some future features may be:
|
If this whole thing turns out to be viable, some future features may be:
|
||||||
|
|
||||||
- Database containing file hash map to compare file status/sizes after the parser may have crashed, or to join a complete overview of any existing PCAP files.
|
- Database containing the already implemented file hash map to compare file status/sizes after the parser may have crashed, or to join a complete overview of any existing PCAP files inserted at previous CTFs.
|
||||||
- Concurrency. There are some interresting ways of parallelization I am working on to find a model that really benefits the use case. MPSC looks promising at the moment. Inplementing a MPSC pipe has the nice side effect of lower memory usage, parsed packages will directly be piped to json serialization function without beeing stored in a separate vector. In the sense of pcap from config -> parser (without vec usage) -> serializer -> insertion.
|
|
||||||
- Update file hashmap through inotify crate, during runtime.
|
- Update file hashmap through inotify crate, during runtime.
|
||||||
- Restoration of fragmented ipv4 packages.
|
- Restoration of fragmented ipv4 packages.
|
||||||
- SIMD (via autovectorization). Which is easy enough to do in Rust.
|
- SIMD (via autovectorization). Which is easy enough to do in Rust.
|
||||||
- Support of more protocols
|
- Support more network protocols
|
||||||
|
|
||||||
There are many other things left to be desired.
|
There are many other things left to be desired.
|
||||||
|
|
||||||
Bechmarking was done with the identical file that was used in the previous C implementation. Inserting none chunked data resulted in ~20 minutes of querying to database. Now, chunked data is below 12 seconds after compiler optimization.
|
Benchmarking was done with the identical file that was used in the previous C implementation, at first. Inserting none chunked data resulted in ~20 minutes of querying to database. Now, chunked data is below 12 seconds after compiler optimization.
|
||||||
|
|
||||||
Speaking of optimization: Do yourself a favor an run release code not debug code: `cargo run --release`. The compiler does a rather hefty optimization and you will save some time waiting for your precious data do be inserted. I did no further optimization besides trying to enable the compiler to do a better job. Just blackboxing, no assembly tweaking yet.
|
Speaking of optimization: Do yourself a favor an run release code not debug code: `cargo run --release`. The compiler does a rather hefty optimization and you will save some time waiting for your precious data do be inserted. I did no further optimization besides trying to enable the compiler to do a better job. Just blackboxing, no assembly tweaking yet.
|
||||||
|
|
||||||
|
|
||||||
** TESTRUNS **:
|
**TESTRUNS**:
|
||||||
|
|
||||||
- Run 001 at 24.06.2020 of iCTF2020 PCAP (bpf filter: 'tcp') files resulted in a table of roundabout 74GB size (`$du -hs`), 30808676 rows, `cargo run --release 3627,41s user 156,47s system 23% cpu 4:29:19,27 total` . PostgreSQL12 server used was a vanilla `docker pull postgres` container on a 2008 Macbook, 2,4GHz dual core, 6GB RAM connected via wifi.
|
- Run 001 at 24.06.2020 of complete iCTF2020 PCAPs (bpf filter: 'tcp') files resulted in a table of roundabout 74GB size (`$du -hs`), 30808676 rows, `cargo run --release 3627,41s user 156,47s system 23% cpu 4:29:19,27 total` . PostgreSQL12 server used was a vanilla `docker pull postgres` container on a 2008 Macbook, 2,4GHz dual core, 6GB RAM connected via wifi.
|
||||||
Memory usage of the Client was at about 11.5GB out of 14.7GB which results in 0.78 utilization. (An tokio mpsc pipe will be the next improvement. Thus, memory usage may be less afterwards)
|
Memory usage of the Client was at about 11.5GB out of 14.7GB which results in 0.78 utilization. (An tokio mpsc pipe will be the next improvement. Thus, memory usage may be less afterwards)
|
||||||
- Run 002 at 25.06.2020 of iCTF2020 PCAP (bpf filter: 'tcp') files resulted in a table size of roundabout 74GB size ('&du -hs')m 30808676 rows,`cargo run --release 3669,68s user 163,23s system 23% cpu 4:27:19,14 total`. PostgreSQL12 server used was a vanilla `docker pull postgres` container on a 2008 Macbook, 2,4GHz dual core, 6GB RAM connected via wifi. Memory usage of the Client was at about 11.5GB out of 14.7GB which results in 0.78 utilization. (An tokio mpsc pipe will be the next improvement. Thus, memory usage may be less afterwards)
|
- Run 002 at 25.06.2020 of iCTF2020 PCAP (bpf filter: 'tcp') files resulted in a table size of roundabout 74GB size ('&du -hs')m 30808676 rows,`cargo run --release 3669,68s user 163,23s system 23% cpu 4:27:19,14 total`. PostgreSQL12 server used was a vanilla `docker pull postgres` container on a 2008 Macbook, 2,4GHz dual core, 6GB RAM connected via wifi. Memory usage of the Client was at about 11.5GB out of 14.7GB which results in 0.78 utilization. (An tokio mpsc pipe will be the next improvement. Thus, memory usage may be less afterwards)
|
||||||
- Run 003 cargo run --release 3847,69s user 236,93s system 25% cpu 4:22:45,90 total
|
- Run 003 at 26.06.2020 of complete iCTF2020 PCAPs (bpf filter: 'tcp') files resulted in a table of roundabout 74GB size (`$du -hs`), 30808676 rows,cargo run --release 3847,69s user 236,93s system 25% cpu 4:22:45,90 total
|
||||||
- Run 004 cargo run --release 1176,24s user 146,11s system 30% cpu 1:12:49,93 total on localhost docker
|
- Run 004 at 26.06.2020 cargo run of complete iCTF2020 PCAPs (bpf filter: 'tcp') files resulted in a table of roundabout 74GB size (`$du -hs`), 30808676 rows,--release 1176,24s user 146,11s system 30% cpu 1:12:49,93 total on localhost docker
|
||||||
- Run 005 cargo run --release 1181,33s user 139,35s system 29% cpu 1:15:40,24 total on localhost docker
|
- Run 005 of complete iCTF2020 PCAPs (bpf filter: 'tcp') files resulted in a table of roundabout 74GB size (`$du -hs`), 30808676 rows,cargo run --release 1181,33s user 139,35s system 29% cpu 1:15:40,24 total on localhost docker
|
||||||
- Run 006 at 29.06.2020 cargo run --release 1640,72s user 224,14s system 44% cpu 1:09:49,42 total on localhost docker, std::mpsc::sync_channel, memory usage ca 15MB
|
- Run 006 of complete iCTF2020 PCAPs (bpf filter: 'tcp') files resulted in a table of roundabout 74GB size (`$du -hs`), 30808676 rows,at 29.06.2020 cargo run --release 1640,72s user 224,14s system 44% cpu 1:09:49,42 total on localhost docker, std::mpsc::sync_channel
|
||||||
- Run 007 at 29.06.2020 cargo run --release 1243,53s user 166,47s system 33% cpu 1:09:24,14 total on localhost docker, std::mpsc::sync_channel, memory usage ca 17MB
|
- Run 007 at 29.06.2020 of complete iCTF2020 PCAPs (bpf filter: 'tcp') files resulted in a table of roundabout 74GB size (`$du -hs`), 30808676 rows,cargo run --release 1243,53s user 166,47s system 33% cpu 1:09:24,14 total on localhost docker, std::mpsc::sync_channel
|
||||||
- Run 008 at 29.06.2020 cargo run --release 1518,17s user 162,07s system 37% cpu 1:13:42,22 total
|
- Run 008 at 29.06.2020 of complete iCTF2020 PCAPs (bpf filter: 'tcp') files resulted in a table of roundabout 74GB size (`$du -hs`), 30808676 rows,cargo run --release 1518,17s user 162,07s system 37% cpu 1:13:42,22 total on localhost docker, std::mpsc::sync_channel
|
||||||
|
- Run 009 at 30.06.2020 of complete iCTF2020 PCAPs (bpf filter: 'tcp') files resulted in a table of roundabout 74GB size (`$du -hs`), 30808676 rows,cargo run --release 1359,90s user 148,15s system 36% cpu 1:09:03,58 total on localhost docker, std::mpsc::sync_channel
|
||||||
|
|
||||||
|
|
210
src/main.rs
210
src/main.rs
|
@ -1,11 +1,11 @@
|
||||||
mod configure;
|
mod configure;
|
||||||
mod parser;
|
mod parser;
|
||||||
mod serializer;
|
mod serializer;
|
||||||
use tokio_postgres::types::{Type, ToSql};
|
use futures::pin_mut;
|
||||||
use tokio_postgres::{Error, NoTls};
|
|
||||||
use tokio_postgres::binary_copy::{BinaryCopyInWriter};
|
|
||||||
use futures::{pin_mut};
|
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
|
use tokio_postgres::binary_copy::BinaryCopyInWriter;
|
||||||
|
use tokio_postgres::types::{ToSql, Type};
|
||||||
|
use tokio_postgres::{Error, NoTls};
|
||||||
|
|
||||||
extern crate jemallocator;
|
extern crate jemallocator;
|
||||||
#[global_allocator]
|
#[global_allocator]
|
||||||
|
@ -17,7 +17,7 @@ const FROM_DEVICE: bool = true;
|
||||||
//const NON_CHUNKED: bool = true;
|
//const NON_CHUNKED: bool = true;
|
||||||
//const CHUNKED: bool = false;
|
//const CHUNKED: bool = false;
|
||||||
|
|
||||||
// Used for parsing from device
|
// Used for parsing from device
|
||||||
fn query_string(insert_max: &usize, table_name: &str) -> String {
|
fn query_string(insert_max: &usize, table_name: &str) -> String {
|
||||||
let mut insert_template = String::with_capacity(insert_max * 8 + 96);
|
let mut insert_template = String::with_capacity(insert_max * 8 + 96);
|
||||||
insert_template.push_str(&*format!("INSERT INTO {} (packet) Values ", table_name));
|
insert_template.push_str(&*format!("INSERT INTO {} (packet) Values ", table_name));
|
||||||
|
@ -30,7 +30,7 @@ fn query_string(insert_max: &usize, table_name: &str) -> String {
|
||||||
insert_template
|
insert_template
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main(core_threads = 4)]
|
#[tokio::main(core_threads = 4)]
|
||||||
async fn main() -> Result<(), Error> {
|
async fn main() -> Result<(), Error> {
|
||||||
/* Init values from file */
|
/* Init values from file */
|
||||||
let config: configure::Config = configure::from_json_file().unwrap();
|
let config: configure::Config = configure::from_json_file().unwrap();
|
||||||
|
@ -67,111 +67,109 @@ async fn main() -> Result<(), Error> {
|
||||||
for (_pcap_file, _pcap_info) in pcap_map.iter() {
|
for (_pcap_file, _pcap_info) in pcap_map.iter() {
|
||||||
//println!("{:?}: {:?}", &_pcap_file, &_pcap_info);
|
//println!("{:?}: {:?}", &_pcap_file, &_pcap_info);
|
||||||
|
|
||||||
|
/* MPSC serialization */
|
||||||
|
// This is just patched up atm
|
||||||
|
let (qry_data, h1) = parser::mpsc_parser(
|
||||||
|
_pcap_file.to_owned(),
|
||||||
|
config.filter.to_owned(),
|
||||||
|
config.regex_filter.to_owned(),
|
||||||
|
);
|
||||||
|
let (data_serialized, h2) = serializer::mpsc_serialize(qry_data);
|
||||||
|
|
||||||
/* MPSC channeled serialization */
|
/* Copy data to db */
|
||||||
// This is just patched up atm, mix between std::sync::mpsc and tokio::sync::mpsc
|
let sink = client
|
||||||
let (qry_data, h1) = parser::mpsc_parser(_pcap_file.to_owned(), config.filter.to_owned(), config.regex_filter.to_owned());
|
.copy_in("COPY json_dump(packet) from STDIN BINARY")
|
||||||
let (data_serialized, h2) = serializer::mpsc_serialize(qry_data);
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let writer = BinaryCopyInWriter::new(sink, &[Type::JSON]);
|
||||||
|
|
||||||
let sink = client.copy_in("COPY json_dump(packet) from STDIN BINARY").await.unwrap();
|
let join = task::spawn(async move {
|
||||||
let writer = BinaryCopyInWriter::new(sink, &[Type::JSON]);
|
pin_mut!(writer);
|
||||||
|
for pack in data_serialized {
|
||||||
/* Copy data to db */
|
writer.as_mut().write(&[&pack]).await.unwrap();
|
||||||
let join = task::spawn( async move {
|
}
|
||||||
pin_mut!(writer);
|
writer.finish().await.unwrap();
|
||||||
for pack in data_serialized {
|
});
|
||||||
//while let Some(res) = rx.recv().await {
|
assert!(join.await.is_ok());
|
||||||
writer.as_mut().write(&[&pack]).await.unwrap();
|
assert!(h1.join().is_ok());
|
||||||
//drop(res);
|
assert!(h2.join().is_ok());
|
||||||
// Reminder: write_raw() behavior is very strange, so it's write() for now.
|
|
||||||
// writer.as_mut().write_raw(chunk.into_iter().map(|p| p as &dyn ToSql).collect()).await.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
//thread::sleep(time::Duration::from_millis(3000));
|
|
||||||
writer.finish().await.unwrap();
|
|
||||||
});
|
|
||||||
assert!(join.await.is_ok());
|
|
||||||
assert!(h1.join().is_ok());
|
|
||||||
assert!(h2.join().is_ok());
|
|
||||||
|
|
||||||
/* Tokio mpsc channel*/
|
|
||||||
// 1. Reminder: Wonder why this is not in use? See : https://stackoverflow.com/questions/59674660/cannot-free-dynamic-memory-in-async-rust-task
|
|
||||||
// Using pt2malloc in addition is even worse: memory is not leaked but munmalloced() and reserved. It is not returned to the OS! This results in thrashing in some cases.
|
|
||||||
// Reducing MALLOC_ARENA_MAX does not do anything in this regard.
|
|
||||||
|
|
||||||
// let (tx, mut rx) = mpsc::channel(100);
|
/* Tokio mpsc channel*/
|
||||||
// let pcap_file = _pcap_file.clone();
|
// 1. Reminder: Wonder why this is not in use? See : https://stackoverflow.com/questions/59674660/cannot-free-dynamic-memory-in-async-rust-task
|
||||||
// let filter = config.filter.clone();
|
// Using pt2malloc in addition is even worse: memory is not leaked but munmalloced() and reserved. It is not returned to the OS! This results in thrashing in some cases.
|
||||||
// let regex_filter = config.regex_filter.clone();
|
// Reducing MALLOC_ARENA_MAX does not do anything in this regard.
|
||||||
// let join_handle: task::JoinHandle<Vec<parser::QryData>> = task::spawn( async move {
|
|
||||||
// parser::tokio_parse(pcap_file, &filter, ®ex_filter).await
|
|
||||||
// });
|
|
||||||
// let v = join_handle.await.unwrap();
|
|
||||||
// for packet in v.into_iter(){
|
|
||||||
|
|
||||||
// for packet in qry_data {
|
|
||||||
// let mut tx = tx.clone();
|
|
||||||
//
|
|
||||||
// tokio::spawn( async move {
|
|
||||||
// //println!("serializing!number {:?}", i);
|
|
||||||
//
|
|
||||||
// let packet_serialized = serializer::tokio_serialize(packet).await;
|
|
||||||
// tx.send(packet_serialized).await.unwrap();
|
|
||||||
// });
|
|
||||||
|
|
||||||
// }
|
// let (tx, mut rx) = mpsc::channel(100);
|
||||||
// drop(tx);
|
// let pcap_file = _pcap_file.clone();
|
||||||
///////////////////////////////////////////////////////////////////////////////////
|
// let filter = config.filter.clone();
|
||||||
|
// let regex_filter = config.regex_filter.clone();
|
||||||
|
// let join_handle: task::JoinHandle<Vec<parser::QryData>> = task::spawn( async move {
|
||||||
|
// parser::tokio_parse(pcap_file, &filter, ®ex_filter).await
|
||||||
|
// });
|
||||||
|
// let v = join_handle.await.unwrap();
|
||||||
|
// for packet in v.into_iter(){
|
||||||
|
|
||||||
// 2. Reminder: If COPY doesn't cut it and INSERT is the way to go, uncomment and use following logic inside FROM_FILE
|
// for packet in qry_data {
|
||||||
// let mut v = Vec::<parser::QryData>::with_capacity(100000);
|
// let mut tx = tx.clone();
|
||||||
// v.extend(parser::parse(&_pcap_file, &config.filter, &config.regex_filter));
|
//
|
||||||
// let mut packets_serialized = Vec::<serde_json::Value>::with_capacity(100000);
|
// tokio::spawn( async move {
|
||||||
// packets_serialized.extend(serializer::serialize_packets(v));
|
// //println!("serializing!number {:?}", i);
|
||||||
// /* Do chunks and query data */
|
//
|
||||||
// let chunker = (&packets_serialized.len() < &config.insert_max) && (0 < packets_serialized.len()) ;
|
// let packet_serialized = serializer::tokio_serialize(packet).await;
|
||||||
// match chunker {
|
// tx.send(packet_serialized).await.unwrap();
|
||||||
// NON_CHUNKED => {
|
// });
|
||||||
// let insert_str = query_string(&packets_serialized.len(), &config.tablename);
|
|
||||||
// let statement = client.prepare(&insert_str).await?;
|
// }
|
||||||
// client
|
// drop(tx);
|
||||||
// .query_raw(
|
///////////////////////////////////////////////////////////////////////////////////
|
||||||
// &statement,
|
|
||||||
// packets_serialized.iter().map(|p| p as &dyn ToSql),
|
// 2. Reminder: If COPY doesn't cut it and INSERT is the way to go, uncomment and use following logic inside FROM_FILE
|
||||||
// )
|
// let mut v = Vec::<parser::QryData>::with_capacity(100000);
|
||||||
// .await?;
|
// v.extend(parser::parse(&_pcap_file, &config.filter, &config.regex_filter));
|
||||||
// }
|
// let mut packets_serialized = Vec::<serde_json::Value>::with_capacity(100000);
|
||||||
// CHUNKED => {
|
// packets_serialized.extend(serializer::serialize_packets(v));
|
||||||
// let insert_str = query_string(&config.insert_max, &config.tablename);
|
// /* Do chunks and query data */
|
||||||
// let statement = client.prepare(&insert_str).await?;
|
// let chunker = (&packets_serialized.len() < &config.insert_max) && (0 < packets_serialized.len()) ;
|
||||||
//
|
// match chunker {
|
||||||
// for chunk in packets_serialized.chunks_exact(config.insert_max) {
|
// NON_CHUNKED => {
|
||||||
// client
|
// let insert_str = query_string(&packets_serialized.len(), &config.tablename);
|
||||||
// .query_raw(&statement, chunk.iter().map(|p| p as &dyn ToSql))
|
// let statement = client.prepare(&insert_str).await?;
|
||||||
// .await?;
|
// client
|
||||||
// }
|
// .query_raw(
|
||||||
// let remainder_len = packets_serialized
|
// &statement,
|
||||||
// .chunks_exact(config.insert_max)
|
// packets_serialized.iter().map(|p| p as &dyn ToSql),
|
||||||
// .remainder()
|
// )
|
||||||
// .len();
|
// .await?;
|
||||||
// if 0 < remainder_len {
|
// }
|
||||||
// let rem_str = query_string(&remainder_len, &config.tablename);
|
// CHUNKED => {
|
||||||
// let statement = client.prepare(&rem_str).await?;
|
// let insert_str = query_string(&config.insert_max, &config.tablename);
|
||||||
// client
|
// let statement = client.prepare(&insert_str).await?;
|
||||||
// .query_raw(
|
//
|
||||||
// &statement,
|
// for chunk in packets_serialized.chunks_exact(config.insert_max) {
|
||||||
// packets_serialized
|
// client
|
||||||
// .chunks_exact(config.insert_max)
|
// .query_raw(&statement, chunk.iter().map(|p| p as &dyn ToSql))
|
||||||
// .remainder()
|
// .await?;
|
||||||
// .iter()
|
// }
|
||||||
// .map(|p| p as &dyn ToSql),
|
// let remainder_len = packets_serialized
|
||||||
// )
|
// .chunks_exact(config.insert_max)
|
||||||
// .await?;
|
// .remainder()
|
||||||
// }
|
// .len();
|
||||||
// }
|
// if 0 < remainder_len {
|
||||||
// }
|
// let rem_str = query_string(&remainder_len, &config.tablename);
|
||||||
|
// let statement = client.prepare(&rem_str).await?;
|
||||||
|
// client
|
||||||
|
// .query_raw(
|
||||||
|
// &statement,
|
||||||
|
// packets_serialized
|
||||||
|
// .chunks_exact(config.insert_max)
|
||||||
|
// .remainder()
|
||||||
|
// .iter()
|
||||||
|
// .map(|p| p as &dyn ToSql),
|
||||||
|
// )
|
||||||
|
// .await?;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
FROM_DEVICE => {
|
FROM_DEVICE => {
|
||||||
|
|
|
@ -5,9 +5,9 @@
|
||||||
"from_device": false,
|
"from_device": false,
|
||||||
"parse_device": "enp7s0",
|
"parse_device": "enp7s0",
|
||||||
"pcap_file": "<not in use right now>",
|
"pcap_file": "<not in use right now>",
|
||||||
"pcap_dir": "../target",
|
"pcap_dir": "../target/files",
|
||||||
"database_tablename": "json_dump",
|
"database_tablename": "json_dump",
|
||||||
"database_user": "postgres",
|
"database_user": "postgres",
|
||||||
"database_host": "192.168.0.11",
|
"database_host": "localhost",
|
||||||
"database_password": "docker"
|
"database_password": "password"
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
mod packet_handler;
|
mod packet_handler;
|
||||||
use pcap::{Capture, Linktype};
|
use pcap::{Capture, Linktype};
|
||||||
use regex::bytes::Regex;
|
use regex::bytes::Regex;
|
||||||
|
use serde::Serialize;
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::str;
|
use std::str;
|
||||||
use serde::Serialize;
|
|
||||||
use std::thread::{spawn, JoinHandle};
|
|
||||||
use std::sync::mpsc::{sync_channel, Receiver};
|
use std::sync::mpsc::{sync_channel, Receiver};
|
||||||
|
use std::thread::{spawn, JoinHandle};
|
||||||
//use tokio::sync::mpsc;
|
//use tokio::sync::mpsc;
|
||||||
//use tokio::task;
|
//use tokio::task;
|
||||||
|
|
||||||
|
@ -30,21 +30,22 @@ const IPV6: usize = 0x6;
|
||||||
pub struct QryData {
|
pub struct QryData {
|
||||||
pub id: i32,
|
pub id: i32,
|
||||||
pub time: f64,
|
pub time: f64,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")] // I came to the conclusion, that importing serde_with crate just for a single struct decorator
|
// I came to the conclusion, that importing serde_with crate just for a single struct decorator is not worth it. So, this looks a bit ugly, having eight decorator. Deal with it.
|
||||||
pub data: Option<Vec<u8>>, // is not worth it. So, this looks a bit ugly, having eight decorator. Deal with it.
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
pub data: Option<Vec<u8>>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub ether_header: Option<packet_handler::EtherHeader>,
|
pub ether_header: Option<packet_handler::EtherHeader>,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub ipv4_header: Option<packet_handler::IpV4Header>,
|
pub ipv4_header: Option<packet_handler::IpV4Header>,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub ipv6_header: Option<packet_handler::IpV6Header>,
|
pub ipv6_header: Option<packet_handler::IpV6Header>,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub tcp_header: Option<packet_handler::TcpHeader>,
|
pub tcp_header: Option<packet_handler::TcpHeader>,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub udp_header: Option<packet_handler::UdpHeader>,
|
pub udp_header: Option<packet_handler::UdpHeader>,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub arp_header: Option<packet_handler::ArpHeader>,
|
pub arp_header: Option<packet_handler::ArpHeader>,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub reg_res: Option<String>,
|
pub reg_res: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,7 +57,6 @@ enum EncapsulationType {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl QryData {
|
impl QryData {
|
||||||
|
|
||||||
fn new() -> QryData {
|
fn new() -> QryData {
|
||||||
QryData {
|
QryData {
|
||||||
id: 0,
|
id: 0,
|
||||||
|
@ -184,17 +184,17 @@ impl QryData {
|
||||||
/* Regex parse _complete_ package */
|
/* Regex parse _complete_ package */
|
||||||
fn flag_carnage(re: &Regex, payload: &[u8]) -> Option<String> {
|
fn flag_carnage(re: &Regex, payload: &[u8]) -> Option<String> {
|
||||||
let mut flags: String = String::new();
|
let mut flags: String = String::new();
|
||||||
if !re.as_str().is_empty() {
|
if !re.as_str().is_empty() {
|
||||||
for mat in re.find_iter(payload) {
|
for mat in re.find_iter(payload) {
|
||||||
// TODO: Test benchmark format! vs. push_str()
|
// TODO: Test benchmark format! vs. push_str()
|
||||||
// flags.push_str(&format!("{} ",std::str::from_utf8(mat.as_bytes()).unwrap()));
|
// flags.push_str(&format!("{} ",std::str::from_utf8(mat.as_bytes()).unwrap()));
|
||||||
// See: https://github.com/hoodie/concatenation_benchmarks-rs
|
// See: https://github.com/hoodie/concatenation_benchmarks-rs
|
||||||
flags.push_str(std::str::from_utf8(mat.as_bytes()).unwrap());
|
flags.push_str(std::str::from_utf8(mat.as_bytes()).unwrap());
|
||||||
flags.push_str(";");
|
flags.push_str(";");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
if !flags.is_empty() {
|
||||||
if !flags.is_empty(){
|
// println!("{:?}", flags);
|
||||||
// println!("{:?}", flags);
|
|
||||||
}
|
}
|
||||||
match flags.is_empty() {
|
match flags.is_empty() {
|
||||||
true => None,
|
true => None,
|
||||||
|
@ -219,8 +219,7 @@ pub fn parse(parse_file: &std::path::Path, filter_str: &str, regex_filter: &str)
|
||||||
};
|
};
|
||||||
|
|
||||||
me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64;
|
me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64;
|
||||||
me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex parser on complete packet [u8] data
|
me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex parser on complete packet [u8] data
|
||||||
//v.push(me.clone());
|
|
||||||
v.push(me.clone());
|
v.push(me.clone());
|
||||||
}
|
}
|
||||||
v
|
v
|
||||||
|
@ -261,42 +260,51 @@ pub fn parse_device(
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn mpsc_parser (parse_file: std::path::PathBuf, filter_str: String, regex_filter: String) -> (Receiver<QryData>, JoinHandle<()>) {
|
pub fn mpsc_parser(
|
||||||
|
parse_file: std::path::PathBuf,
|
||||||
|
filter_str: String,
|
||||||
|
regex_filter: String,
|
||||||
|
) -> (Receiver<QryData>, JoinHandle<()>) {
|
||||||
let (sender, receiver) = sync_channel(100);
|
let (sender, receiver) = sync_channel(100);
|
||||||
let handle = spawn( move || {
|
let handle = spawn(move || {
|
||||||
let mut cap = Capture::from_file(parse_file).unwrap();
|
let mut cap = Capture::from_file(parse_file).unwrap();
|
||||||
Capture::filter(&mut cap, &filter_str).unwrap();
|
Capture::filter(&mut cap, &filter_str).unwrap();
|
||||||
let linktype = cap.get_datalink();
|
let linktype = cap.get_datalink();
|
||||||
//println!("{:?}", &linktype);
|
//println!("{:?}", &linktype);
|
||||||
let re = Regex::new(®ex_filter).unwrap();
|
let re = Regex::new(®ex_filter).unwrap();
|
||||||
while let Ok(packet) = cap.next() {
|
while let Ok(packet) = cap.next() {
|
||||||
let mut me = QryData::new();
|
let mut me = QryData::new();
|
||||||
match linktype {
|
match linktype {
|
||||||
Linktype(1) => me.encap_en10mb(packet.data).unwrap(), // I reversed encapsulation/linktype bytes in pcap/pcapng file by looking at https://www.tcpdump.org/linktypes.html
|
Linktype(1) => me.encap_en10mb(packet.data).unwrap(), // I reversed encapsulation/linktype bytes in pcap/pcapng file by looking at https://www.tcpdump.org/linktypes.html
|
||||||
Linktype(12) => me.encap_raw(packet.data).unwrap(), // Either this source + my implementation is wrong or pcap crate sucks
|
Linktype(12) => me.encap_raw(packet.data).unwrap(), // Either this source + my implementation is wrong or pcap crate sucks
|
||||||
_ => (),
|
_ => (),
|
||||||
};
|
};
|
||||||
|
|
||||||
me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64;
|
me.time =
|
||||||
me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex parser on complete packet [u8] data
|
(packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64;
|
||||||
|
me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex parser on complete packet [u8] data
|
||||||
if sender.send(me).is_err(){
|
|
||||||
break;
|
if sender.send(me).is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
});
|
});
|
||||||
(receiver, handle)
|
(receiver, handle)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub async fn tokio_parse <'a> (parse_file: std::path::PathBuf, filter_str: &'a str, regex_filter: &'a str ) -> Vec<QryData> {
|
pub async fn tokio_parse<'a>(
|
||||||
|
parse_file: std::path::PathBuf,
|
||||||
|
filter_str: &'a str,
|
||||||
|
regex_filter: &'a str,
|
||||||
|
) -> Vec<QryData> {
|
||||||
let mut v: Vec<QryData> = Vec::new();
|
let mut v: Vec<QryData> = Vec::new();
|
||||||
let mut cap = Capture::from_file(parse_file).unwrap();
|
let mut cap = Capture::from_file(parse_file).unwrap();
|
||||||
Capture::filter(&mut cap, &filter_str).unwrap();
|
Capture::filter(&mut cap, &filter_str).unwrap();
|
||||||
let linktype = cap.get_datalink();
|
let linktype = cap.get_datalink();
|
||||||
// println!("{:?}", &linktype);
|
// println!("{:?}", &linktype);
|
||||||
let re = Regex::new(®ex_filter).unwrap();
|
let re = Regex::new(®ex_filter).unwrap();
|
||||||
|
|
||||||
while let Ok(packet) = cap.next() {
|
while let Ok(packet) = cap.next() {
|
||||||
let mut me = QryData::new();
|
let mut me = QryData::new();
|
||||||
match linktype {
|
match linktype {
|
||||||
|
@ -308,9 +316,6 @@ pub async fn tokio_parse <'a> (parse_file: std::path::PathBuf, filter_str: &'a s
|
||||||
me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64;
|
me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64;
|
||||||
me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex parser on complete packet [u8] data
|
me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex parser on complete packet [u8] data
|
||||||
v.push(me.clone());
|
v.push(me.clone());
|
||||||
// drop(me);
|
|
||||||
//std::mem::replace(&mut me, QryData::new());
|
|
||||||
}
|
}
|
||||||
v
|
v
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
use crate::parser;
|
use crate::parser;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use std::thread::{spawn, JoinHandle};
|
|
||||||
use std::sync::mpsc::{sync_channel, Receiver};
|
use std::sync::mpsc::{sync_channel, Receiver};
|
||||||
|
use std::thread::{spawn, JoinHandle};
|
||||||
|
|
||||||
// This is not needed atm
|
// This is not needed atm
|
||||||
//use serde::ser::{Serialize, SerializeStruct, Serializer};
|
//use serde::ser::{Serialize, SerializeStruct, Serializer};
|
||||||
//impl Serialize for parser::QryData {
|
//impl Serialize for parser::QryData {
|
||||||
// fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
// fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
@ -50,16 +50,17 @@ pub fn serialize_packets_as_string(v: Vec<parser::QryData>) -> Vec<serde_json::V
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn mpsc_serialize(packet: Receiver<parser::QryData>) -> (Receiver<serde_json::Value>, JoinHandle<()>) {
|
pub fn mpsc_serialize(
|
||||||
|
packet: Receiver<parser::QryData>,
|
||||||
|
) -> (Receiver<serde_json::Value>, JoinHandle<()>) {
|
||||||
let (sender, receiver) = sync_channel(100);
|
let (sender, receiver) = sync_channel(100);
|
||||||
let handle = spawn( move || {
|
let handle = spawn(move || {
|
||||||
for p in packet{
|
for p in packet {
|
||||||
let serialized = serde_json::to_value(p).unwrap();
|
let serialized = serde_json::to_value(p).unwrap();
|
||||||
if sender.send(serialized).is_err(){
|
if sender.send(serialized).is_err() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
(receiver, handle)
|
(receiver, handle)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue