This commit is contained in:
gurkenhabicht 2020-06-29 23:54:42 +02:00
parent b571cb06f5
commit 5c36509827
5 changed files with 66 additions and 155 deletions

View File

@ -7,7 +7,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[profile.release]
debug = true
#debug = true
#lto = "fat"
#codegen-units = 1
#panic = "abort"
@ -17,8 +17,6 @@ debug = true
tokio-postgres = { version="0.5.4", features = ["runtime","with-eui48-0_4","with-serde_json-1"] }
tokio = { version = "0.2", features = ["full"] }
pcap = "~0.7.0"
#postgres = { version = "~0.17.2", features = ["with-eui48-0_4","with-serde_json-1"] }
#libc = "0.2.68"
byteorder = "*"
bitfield = "*"
eui48 = { version = "~0.4.6", features = ["serde"] }

View File

@ -48,9 +48,12 @@ Speaking of optimization: Do yourself a favor an run release code not debug code
** TESTRUNS **:
Run 001 at 24.06.2020 of iCTF2020 PCAP (bpf filter: 'tcp') files resulted in a table of roundabout 74GB size (`$du -hs`), 30808676 rows, `cargo run --release 3627,41s user 156,47s system 23% cpu 4:29:19,27 total` . PostgreSQL12 server used was a vanilla `docker pull postgres` container on a 2008 Macbook, 2,4GHz dual core, 6GB RAM connected via wifi.
- Run 001 at 24.06.2020 of iCTF2020 PCAP (bpf filter: 'tcp') files resulted in a table of roundabout 74GB size (`$du -hs`), 30808676 rows, `cargo run --release 3627,41s user 156,47s system 23% cpu 4:29:19,27 total` . PostgreSQL12 server used was a vanilla `docker pull postgres` container on a 2008 Macbook, 2,4GHz dual core, 6GB RAM connected via wifi.
Memory usage of the Client was at about 11.5GB out of 14.7GB which results in 0.78 utilization. (An tokio mpsc pipe will be the next improvement. Thus, memory usage may be less afterwards)
Run 002 at 25.06.2020 of iCTF2020 PCAP (bpf filter: 'tcp') files resulted in a table size of roundabout 74GB size ('&du -hs')m 30808676 rows,`cargo run --release 3669,68s user 163,23s system 23% cpu 4:27:19,14 total`. PostgreSQL12 server used was a vanilla `docker pull postgres` container on a 2008 Macbook, 2,4GHz dual core, 6GB RAM connected via wifi. Memory usage of the Client was at about 11.5GB out of 14.7GB which results in 0.78 utilization. (An tokio mpsc pipe will be the next improvement. Thus, memory usage may be less afterwards)
Run 003 cargo run --release 3847,69s user 236,93s system 25% cpu 4:22:45,90 total
Run 004 cargo run --release 1176,24s user 146,11s system 30% cpu 1:12:49,93 total on localhost docker
Run 005 cargo run --release 1181,33s user 139,35s system 29% cpu 1:15:40,24 total on localhost docker
- Run 002 at 25.06.2020 of iCTF2020 PCAP (bpf filter: 'tcp') files resulted in a table size of roundabout 74GB size ('&du -hs')m 30808676 rows,`cargo run --release 3669,68s user 163,23s system 23% cpu 4:27:19,14 total`. PostgreSQL12 server used was a vanilla `docker pull postgres` container on a 2008 Macbook, 2,4GHz dual core, 6GB RAM connected via wifi. Memory usage of the Client was at about 11.5GB out of 14.7GB which results in 0.78 utilization. (An tokio mpsc pipe will be the next improvement. Thus, memory usage may be less afterwards)
- Run 003 cargo run --release 3847,69s user 236,93s system 25% cpu 4:22:45,90 total
- Run 004 cargo run --release 1176,24s user 146,11s system 30% cpu 1:12:49,93 total on localhost docker
- Run 005 cargo run --release 1181,33s user 139,35s system 29% cpu 1:15:40,24 total on localhost docker
- Run 006 at 29.06.2020 cargo run --release 1640,72s user 224,14s system 44% cpu 1:09:49,42 total on localhost docker, std::mpsc::sync_channel, memory usage ca 15MB
- Run 007 at 29.06.2020 cargo run --release 1243,53s user 166,47s system 33% cpu 1:09:24,14 total on localhost docker, std::mpsc::sync_channel, memory usage ca 17MB
- Run 008 at 29.06.2020 cargo run --release 1518,17s user 162,07s system 37% cpu 1:13:42,22 total

View File

@ -1,7 +1,3 @@
extern crate serde_json;
extern crate tokio;
extern crate tokio_postgres;
mod configure;
mod parser;
mod serializer;
@ -10,20 +6,18 @@ use tokio_postgres::{Error, NoTls};
use tokio_postgres::binary_copy::{BinaryCopyInWriter};
use futures::{pin_mut};
use tokio::task;
use tokio::sync::mpsc;
extern crate jemallocator;
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
/* conditionals */
const FROM_FILE: bool = false;
const FROM_DEVICE: bool = true;
//const NON_CHUNKED: bool = true;
//const CHUNKED: bool = false;
// Used for parsing from device
fn query_string(insert_max: &usize, table_name: &str) -> String {
let mut insert_template = String::with_capacity(insert_max * 8 + 96);
insert_template.push_str(&*format!("INSERT INTO {} (packet) Values ", table_name));
@ -77,19 +71,37 @@ async fn main() -> Result<(), Error> {
/* MPSC channeled serialization */
// This is just patched up atm, mix between std::sync::mpsc and tokio::sync::mpsc
let (qry_data, h1) = parser::mpsc_parser(_pcap_file.to_owned(), config.filter.to_owned(), config.regex_filter.to_owned()).await;
// let (data_serialized, h2) = serializer::mpsc_serialize(qry_data);
// let packets_serialized = serializer::mpsc_collect_serialized(data_serialized);
// let _r1 = h1.join().unwrap();
// let _r2 = h2.join().unwrap();
let (qry_data, h1) = parser::mpsc_parser(_pcap_file.to_owned(), config.filter.to_owned(), config.regex_filter.to_owned());
let (data_serialized, h2) = serializer::mpsc_serialize(qry_data);
/* Deprecated */
// let v: Vec<parser::QryData> =
// parser::parse(&_pcap_file, &config.filter, &config.regex_filter);
// let len = v.len();
/* tokio mpsc channel */
let (tx, mut rx) = mpsc::channel(100);
let sink = client.copy_in("COPY json_dump(packet) from STDIN BINARY").await.unwrap();
let writer = BinaryCopyInWriter::new(sink, &[Type::JSON]);
/* Copy data to db */
let join = task::spawn( async move {
pin_mut!(writer);
for pack in data_serialized {
//while let Some(res) = rx.recv().await {
writer.as_mut().write(&[&pack]).await.unwrap();
//drop(res);
// Reminder: write_raw() behavior is very strange, so it's write() for now.
// writer.as_mut().write_raw(chunk.into_iter().map(|p| p as &dyn ToSql).collect()).await.unwrap();
}
//thread::sleep(time::Duration::from_millis(3000));
writer.finish().await.unwrap();
});
assert!(join.await.is_ok());
assert!(h1.join().is_ok());
assert!(h2.join().is_ok());
/* Tokio mpsc channel*/
// 1. Reminder: Wonder why this is not in use? See : https://stackoverflow.com/questions/59674660/cannot-free-dynamic-memory-in-async-rust-task
// Using pt2malloc in addition is even worse: memory is not leaked but munmalloced() and reserved. It is not returned to the OS! This results in thrashing in some cases.
// Reducing MALLOC_ARENA_MAX does not do anything in this regard.
// let (tx, mut rx) = mpsc::channel(100);
// let pcap_file = _pcap_file.clone();
// let filter = config.filter.clone();
// let regex_filter = config.regex_filter.clone();
@ -99,44 +111,25 @@ async fn main() -> Result<(), Error> {
// let v = join_handle.await.unwrap();
// for packet in v.into_iter(){
for packet in qry_data {
let mut tx = tx.clone();
// for packet in qry_data {
// let mut tx = tx.clone();
//
// tokio::spawn( async move {
// //println!("serializing!number {:?}", i);
//
// let packet_serialized = serializer::tokio_serialize(packet).await;
// tx.send(packet_serialized).await.unwrap();
// });
tokio::spawn( async move {
//println!("serializing!number {:?}", i);
// }
// drop(tx);
///////////////////////////////////////////////////////////////////////////////////
let packet_serialized = serializer::tokio_serialize(packet).await;
tx.send(packet_serialized).await.unwrap();
});
}
drop(tx);
let sink = client.copy_in("COPY json_dump(packet) from STDIN BINARY").await.unwrap();
let writer = BinaryCopyInWriter::new(sink, &[Type::JSON]);
let join = task::spawn( async move {
pin_mut!(writer);
//for pack in packets_serialized {
while let Some(res) = rx.recv().await {
writer.as_mut().write(&[&res]).await.unwrap();
drop(res);
// Reminder: write_raw() behavior is very strange, so it's write() for now.
// writer.as_mut().write_raw(chunk.into_iter().map(|p| p as &dyn ToSql).collect()).await.unwrap();
}
//thread::sleep(time::Duration::from_millis(3000));
writer.finish().await.unwrap();
});
assert!(join.await.is_ok());
let _r1 = h1.join().unwrap();
// TODO: MPSC channel
// 2. Reminder: If COPY doesn't cut it and INSERT is the way to go, uncomment and use following logic inside FROM_FILE
// let mut v = Vec::<parser::QryData>::with_capacity(100000);
// v.extend(parser::parse(&_pcap_file, &config.filter, &config.regex_filter));
// let mut packets_serialized = Vec::<serde_json::Value>::with_capacity(100000);
// packets_serialized.extend(serializer::serialize_packets(v));
// Reminder: If COPY doesn't cut it and INSERT is the way to go, uncomment and use following logic inside FROM_FILE
// /* Do chunks and query data */
// let chunker = (&packets_serialized.len() < &config.insert_max) && (0 < packets_serialized.len()) ;
// match chunker {

View File

@ -1,6 +1,3 @@
extern crate bitfield;
extern crate byteorder;
extern crate eui48;
mod packet_handler;
use pcap::{Capture, Linktype};
use regex::bytes::Regex;
@ -8,10 +5,9 @@ use std::convert::TryInto;
use std::str;
use serde::Serialize;
use std::thread::{spawn, JoinHandle};
use std::sync::mpsc::{channel, Receiver};
use tokio::sync::mpsc;
use tokio::stream::{self, StreamExt};
use tokio::task;
use std::sync::mpsc::{sync_channel, Receiver};
//use tokio::sync::mpsc;
//use tokio::task;
/* protocol ids, LittleEndian */
const ETH_P_IPV6: usize = 0xDD86;
@ -60,9 +56,6 @@ enum EncapsulationType {
}
impl QryData {
// This is not cool!
// I don't know if object oriented is the way to go here. It's awful but modular!
// Maybe I'll do a roolback and do a different design
fn new() -> QryData {
QryData {
@ -229,19 +222,6 @@ pub fn parse(parse_file: &std::path::Path, filter_str: &str, regex_filter: &str)
me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex parser on complete packet [u8] data
//v.push(me.clone());
v.push(me.clone());
/* TODO: Will clone() call destructors correctly and the method below won't? */
// v.push(QryData {
// id: 0,
// time: me.time,
// data: me.data,
// ether_header: me.ether_header,
// ipv4_header: me.ipv4_header,
// ipv6_header: me.ipv6_header,
// tcp_header: me.tcp_header,
// udp_header: me.udp_header,
// arp_header: me.arp_header,
// reg_res: me.reg_res,
// });
}
v
}
@ -281,8 +261,8 @@ pub fn parse_device(
}
#[allow(dead_code)]
pub async fn mpsc_parser (parse_file: std::path::PathBuf, filter_str: String, regex_filter: String) -> (Receiver<QryData>, JoinHandle<()>) {
let (sender, receiver) = channel();
pub fn mpsc_parser (parse_file: std::path::PathBuf, filter_str: String, regex_filter: String) -> (Receiver<QryData>, JoinHandle<()>) {
let (sender, receiver) = sync_channel(100);
let handle = spawn( move || {
let mut cap = Capture::from_file(parse_file).unwrap();
Capture::filter(&mut cap, &filter_str).unwrap();
@ -299,8 +279,8 @@ pub async fn mpsc_parser (parse_file: std::path::PathBuf, filter_str: String, re
me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64;
me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex parser on complete packet [u8] data
//v.push(me.clone());
if sender.send(me.clone()).is_err(){
if sender.send(me).is_err(){
break;
}
}
@ -334,51 +314,3 @@ pub async fn tokio_parse <'a> (parse_file: std::path::PathBuf, filter_str: &'a s
v
}
//pub async fn tokio_parse(parse_file: &std::path::Path, filter_str: &str, regex_filter: &str) -> tokio::stream::Iter<Result<QryData>> {
// //let mut v: Vec<QryData> = Vec::new();
// let mut cap = Capture::from_file(parse_file).unwrap();
// Capture::filter(&mut cap, &filter_str).unwrap();
// let linktype = cap.get_datalink();
// let re = Regex::new(regex_filter).unwrap();
// while let Ok(packet) = cap.next() {
// let mut me = QryData::new();
// match linktype {
// Linktype(1) => me.encap_en10mb(packet.data).unwrap(), // I reversed encapsulation/linktype bytes in pcap/pcapng file by looking at https://www.tcpdump.org/linktypes.html
// Linktype(12) => me.encap_raw(packet.data).unwrap(), // Either this source + my implementation is wrong or pcap crate sucks
// _ => (),
// };
//
// me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64;
// me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex parser on complete packet [u8] data
// //v.push(me.clone());
// tx.send
// }
// let x = stream::iter(&mut v).collect().await
//}
//pub async fn tokio_parse (parse_file: std::path::PathBuf, filter_str: &'static str, regex_filter: &'static str) {
// //let mut v: Vec<QryData> = Vec::new();
// let (mut tx, rx) = mpsc::channel(1000);
//
// tokio::spawn (async move {
// let mut cap = Capture::from_file(parse_file).unwrap();
// Capture::filter(&mut cap, &filter_str).unwrap();
// let linktype = cap.get_datalink();
// let re = Regex::new(regex_filter).unwrap();
// while let Ok(packet) = cap.next() {
// let mut me = QryData::new();
// match linktype {
// Linktype(1) => me.encap_en10mb(packet.data).unwrap(), // I reversed encapsulation/linktype bytes in pcap/pcapng file by looking at https://www.tcpdump.org/linktypes.html
// Linktype(12) => me.encap_raw(packet.data).unwrap(), // Either this source + my implementation is wrong or pcap crate sucks
// _ => (),
// };
//
// me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64;
// me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex parser on complete packet [u8] data
// //v.push(me.clone());
// tx.send(me.clone()).await.unwrap();
// }
//
// });
//
//}

View File

@ -1,9 +1,7 @@
extern crate serde_json;
use crate::parser;
use rayon::prelude::*;
use std::thread::{spawn, JoinHandle};
use std::sync::mpsc::{channel, Receiver};
use std::sync::mpsc::{sync_channel, Receiver};
// This is not needed atm
//use serde::ser::{Serialize, SerializeStruct, Serializer};
@ -26,6 +24,7 @@ use std::sync::mpsc::{channel, Receiver};
// }
//}
#[allow(dead_code)]
pub fn serialize_packets(v: Vec<parser::QryData>) -> Vec<serde_json::Value> {
/* rayon parallelized */
// TODO: Benchmark. As far as I've tested, using rayon reaps no benefit nor does it run any slower. I leave it in for now.
@ -52,7 +51,7 @@ pub fn serialize_packets_as_string(v: Vec<parser::QryData>) -> Vec<serde_json::V
#[allow(dead_code)]
pub fn mpsc_serialize(packet: Receiver<parser::QryData>) -> (Receiver<serde_json::Value>, JoinHandle<()>) {
let (sender, receiver) = channel();
let (sender, receiver) = sync_channel(100);
let handle = spawn( move || {
for p in packet{
let serialized = serde_json::to_value(p).unwrap();
@ -64,17 +63,3 @@ pub fn mpsc_serialize(packet: Receiver<parser::QryData>) -> (Receiver<serde_json
(receiver, handle)
}
#[allow(dead_code)]
pub fn mpsc_collect_serialized( packet: Receiver<serde_json::Value> ) -> Vec<serde_json::Value> {
let mut packets_serialized: Vec<serde_json::Value> = Vec::new();
for p in packet {
packets_serialized.push(p);
}
packets_serialized
}
pub async fn tokio_serialize(packet: parser::QryData) -> serde_json::Value {
serde_json::to_value(packet).unwrap()
}