pcap iterator now in configure, performance improvements

This commit is contained in:
gurkenhabicht 2020-06-01 20:55:11 +02:00
parent 0b2dd3a59c
commit 6f56f5a930
9 changed files with 5503 additions and 71 deletions

10
Cargo.lock generated
View File

@ -199,9 +199,9 @@ checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fnv"
version = "1.0.6"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "fuchsia-zircon"
@ -468,7 +468,7 @@ checksum = "f5e374eff525ce1c5b7687c4cef63943e7686524a387933ad27ca7ec43779cb3"
dependencies = [
"log",
"mio",
"miow 0.3.3",
"miow 0.3.4",
"winapi 0.3.8",
]
@ -497,9 +497,9 @@ dependencies = [
[[package]]
name = "miow"
version = "0.3.3"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "396aa0f2003d7df8395cb93e09871561ccc3e785f0acb369170e8cc74ddf9226"
checksum = "22dfdd1d51b2639a5abd17ed07005c3af05fb7a2a3b1a1d0d7af1000a520c1c7"
dependencies = [
"socket2",
"winapi 0.3.8",

View File

@ -16,6 +16,7 @@ edition = "2018"
tokio-postgres = { version="0.5.4", features = ["runtime","with-eui48-0_4","with-serde_json-1"] }
tokio = { version = "0.2", features = ["full"] }
pcap = "~0.7.0"
#postgres = { version = "~0.17.2", features = ["with-eui48-0_4","with-serde_json-1"] }
libc = "0.2.68"
byteorder = "*"
bitfield = "*"

5237
src/cachegrind.out Normal file

File diff suppressed because it is too large Load Diff

View File

@ -3,6 +3,12 @@
extern crate serde_json;
use std::fs::File;
use std::collections::HashMap;
use std::fs;
use std::io::prelude::*;
const PCAPNG_SIGNATURE: [u8; 4] = [0x0a, 0x0d, 0x0d, 0x0a];
const PCAP_SIGNATURE: [u8; 4] = [0xed, 0xab, 0xee, 0xdb];
pub struct Config {
pub filter: String,
@ -37,3 +43,31 @@ pub fn from_json_file() -> Option<Config> {
pcap_dir: json.get("pcap_dir").unwrap().as_str().unwrap().to_owned(),
})
}
pub fn map_pcap_dir ( pcap_dir: &str ) -> Option<std::collections::HashMap<std::path::PathBuf, std::fs::Metadata>> {
let mut pcap_map = HashMap::new();
if let Ok(entries) = fs::read_dir(pcap_dir) {
for entry in entries {
if let Ok(entry) = entry {
if let Ok(_file_type) = entry.file_type() {
if entry.metadata().unwrap().is_file() {
let mut magic_number: [u8; 4] = [0; 4];
let _signature = File::open(entry.path().to_owned())
.unwrap()
.read_exact(&mut magic_number)
.unwrap();
match magic_number {
PCAPNG_SIGNATURE => pcap_map.insert(entry.path(), entry.metadata().unwrap()),
PCAP_SIGNATURE => pcap_map.insert(entry.path(), entry.metadata().unwrap()),
_ => None,
};
// println!("{:?}", &entry.metadata().unwrap().modified());
}
} else {
println!("Couldn't get file type for {:?}", entry.path());
}
}
}
}
Some(pcap_map)
}

View File

@ -1,19 +1,13 @@
extern crate serde_json;
extern crate tokio;
extern crate tokio_postgres;
use std::fs::File;
mod configure;
mod parser;
mod serializer;
use rayon::prelude::*;
use std::collections::HashMap;
use std::fs;
use std::io::prelude::*;
use tokio_postgres::types::ToSql;
use tokio_postgres::{Error, NoTls};
const PCAPNG_SIGNATURE: [u8; 4] = [0x0a, 0x0d, 0x0d, 0x0a];
const PCAP_SIGNATURE: [u8; 4] = [0xed, 0xab, 0xee, 0xdb];
fn query_string(insert_max: &usize) -> String {
@ -30,10 +24,6 @@ fn query_string(insert_max: &usize) -> String {
insert_template
}
fn smallest_prime_divisor(remainder: usize ) -> usize {
let smallest_divisor: usize = (2..(remainder/2)).into_par_iter().find_first(|x| remainder % x == 0).unwrap();
smallest_divisor
}
#[tokio::main(core_threads = 4)] // By default, tokio_postgres uses the tokio crate as its runtime.
async fn main() -> Result<(), Error> {
@ -41,34 +31,10 @@ async fn main() -> Result<(), Error> {
let config: configure::Config = configure::from_json_file().unwrap();
// let mut pcap_map: Hashmap<std::path::Path, bool> = HashMap::new();
// TODO: hash file metadata, so its state is comparable at different times and can be written to a db table
// This db table should include UUIDs so it can be joined effectively
let mut pcap_map = HashMap::new();
if let Ok(entries) = fs::read_dir(config.pcap_dir) {
for entry in entries {
if let Ok(entry) = entry {
if let Ok(_file_type) = entry.file_type() {
if entry.metadata().unwrap().is_file() {
let mut magic_number: [u8; 4] = [0; 4];
let _signature = File::open(entry.path().to_owned())
.unwrap()
.read_exact(&mut magic_number)
.unwrap();
match magic_number {
PCAPNG_SIGNATURE => pcap_map.insert(entry.path(), entry.metadata()),
PCAP_SIGNATURE => pcap_map.insert(entry.path(), entry.metadata()),
_ => None,
};
// println!("{:?}", &entry.metadata().unwrap().modified());
}
} else {
println!("Couldn't get file type for {:?}", entry.path());
}
}
}
}
let pcap_map = configure::map_pcap_dir( &config.pcap_dir ).unwrap();
println!("{:?}", pcap_map.iter());
/* db connection */
let (client, connection) = tokio_postgres::connect(&config.connection, NoTls).await?;
@ -93,7 +59,7 @@ async fn main() -> Result<(), Error> {
match config.is_device {
false => for _pcap_file in pcap_map.keys() {
println!("{:?}",&_pcap_file);
// TODO: Tuning vector capacity according to actuarial excpectation, mean average & std dev of packet size
let v: Vec<parser::QryData> = parser::parse(&_pcap_file, &config.filter);
//let mut v = Vec::<parser::QryData>::with_capacity(35536);
@ -131,10 +97,6 @@ async fn main() -> Result<(), Error> {
.await?;
}
println!("Packets, total:{:?}", packets_serialized.len());
println!("Chunks, total:{}", chunk_count);
println!("Chunks, remainder:{}", remainder);
if remainder > 0 {
let rem_str = query_string(&remainder);
let statement_remainder = client.prepare(&rem_str).await?;

View File

@ -1,5 +1,5 @@
{
"insert_max": 20000,
"insert_max": 10000,
"filter": "tcp && ip6",
"from_device": false,
"parse_device": "enp7s0",

View File

@ -84,11 +84,11 @@ pub fn parse(parse_file: &std::path::Path, filter_str: &str) -> Vec<QryData> {
packet.data,
))
.unwrap();
me.data = packet_handler::payload_handler(
me.data = Some(packet_handler::payload_handler(
me.ipv4_header.unwrap().ip_ihl,
me.tcp_header.unwrap().data_offset,
packet.data,
);
)).unwrap();
}
UDP => {
me.udp_header = Some(packet_handler::udp_handler(
@ -96,11 +96,11 @@ pub fn parse(parse_file: &std::path::Path, filter_str: &str) -> Vec<QryData> {
packet.data,
))
.unwrap();
me.data = packet_handler::payload_handler(
me.data = Some(packet_handler::payload_handler(
me.ipv4_header.unwrap().ip_ihl,
7,
packet.data,
);
)).unwrap();
}
_ => println!("network protocol not implemented"),
}
@ -111,21 +111,22 @@ pub fn parse(parse_file: &std::path::Path, filter_str: &str) -> Vec<QryData> {
match me.ipv6_header.unwrap().next_header as usize {
TCP => {
me.tcp_header = Some(packet_handler::tcp_handler(10, packet.data)).unwrap();
me.data = packet_handler::payload_handler(
me.data = Some(packet_handler::payload_handler(
10,
me.tcp_header.unwrap().data_offset,
packet.data,
);
)).unwrap();
}
UDP => {
me.udp_header = Some(packet_handler::udp_handler(10, packet.data)).unwrap();
me.data = packet_handler::payload_handler(10, 7, packet.data);
me.data = Some(packet_handler::payload_handler(10, 7, packet.data)).unwrap();
}
_ => println!("network protocol not implemented"),
}
}
ETH_P_ARP | ETH_P_RARP => {
me.arp_header = Some(packet_handler::arp_handler(packet.data)).unwrap();
me.data = None;
}
_ => println!("network protocol not implemented"),
}
@ -181,11 +182,11 @@ pub fn parse_device(parse_device: &str, filter_str: &str, insert_max: &usize) ->
packet.data,
))
.unwrap();
me.data = packet_handler::payload_handler(
me.data = Some(packet_handler::payload_handler(
me.ipv4_header.unwrap().ip_ihl,
me.tcp_header.unwrap().data_offset,
packet.data,
);
)).unwrap();
}
UDP => {
me.udp_header = Some(packet_handler::udp_handler(
@ -193,11 +194,11 @@ pub fn parse_device(parse_device: &str, filter_str: &str, insert_max: &usize) ->
packet.data,
))
.unwrap();
me.data = packet_handler::payload_handler(
me.data = Some(packet_handler::payload_handler(
me.ipv4_header.unwrap().ip_ihl,
7,
packet.data,
);
)).unwrap();
}
_ => println!("network protocol not implemented"),
}
@ -208,21 +209,22 @@ pub fn parse_device(parse_device: &str, filter_str: &str, insert_max: &usize) ->
match me.ipv6_header.unwrap().next_header as usize {
TCP => {
me.tcp_header = Some(packet_handler::tcp_handler(10, packet.data)).unwrap();
me.data = packet_handler::payload_handler(
me.data = Some(packet_handler::payload_handler(
10,
me.tcp_header.unwrap().data_offset,
packet.data,
);
)).unwrap();
}
UDP => {
me.udp_header = Some(packet_handler::udp_handler(10, packet.data)).unwrap();
me.data = packet_handler::payload_handler(10, 7, packet.data);
me.data = Some(packet_handler::payload_handler(10, 7, packet.data)).unwrap();
}
_ => println!("network protocol not implemented"),
}
}
ETH_P_ARP | ETH_P_RARP => {
me.arp_header = Some(packet_handler::arp_handler(packet.data)).unwrap();
me.data = None;
}
_ => println!("network protocol not implemented"),
}

180
src/postgres_main Normal file
View File

@ -0,0 +1,180 @@
extern crate serde_json;
//extern crate tokio;
extern crate postgres;
//extern crate tokio_postgres;
use postgres::{Client, NoTls};
use postgres::types::ToSql;
use std::fs::File;
mod configure;
mod parser;
mod serializer;
use rayon::prelude::*;
use std::collections::HashMap;
use std::fs;
use std::io::prelude::*;
//use tokio_postgres::types::ToSql;
//use tokio_postgres::{Error, NoTls};
const PCAPNG_SIGNATURE: [u8; 4] = [0x0a, 0x0d, 0x0d, 0x0a];
const PCAP_SIGNATURE: [u8; 4] = [0xed, 0xab, 0xee, 0xdb];
fn query_string(insert_max: &usize) -> String {
// Changed this to String with given capacity after stumbling over https://github.com/hoodie/concatenation_benchmarks-rs
// Impressive performance increase!
let mut insert_template = String::with_capacity(insert_max * 8 + 43);
insert_template.push_str("INSERT INTO json_dump (packet) Values ");
for insert in 0..insert_max - 1 {
insert_template.push_str(&(format!("(${}), ", insert + 1)));
}
insert_template.push_str(&(format!("(${})", insert_max)));
insert_template
}
//#[tokio::main(core_threads = 4)] // By default, tokio_postgres uses the tokio crate as its runtime
fn main() -> Result<(), postgres::Error> {
/* Init values from file */
let config: configure::Config = configure::from_json_file().unwrap();
// let mut pcap_map: Hashmap<std::path::Path, bool> = HashMap::new();
// TODO: hash file metadata, so its state is comparable at different times and can be written to a db table
// This db table should include UUIDs so it can be joined effectively
let mut pcap_map = HashMap::new();
if let Ok(entries) = fs::read_dir(config.pcap_dir) {
for entry in entries {
if let Ok(entry) = entry {
if let Ok(_file_type) = entry.file_type() {
if entry.metadata().unwrap().is_file() {
let mut magic_number: [u8; 4] = [0; 4];
let _signature = File::open(entry.path().to_owned())
.unwrap()
.read_exact(&mut magic_number)
.unwrap();
match magic_number {
PCAPNG_SIGNATURE => pcap_map.insert(entry.path(), entry.metadata()),
PCAP_SIGNATURE => pcap_map.insert(entry.path(), entry.metadata()),
_ => None,
};
// println!("{:?}", &entry.metadata().unwrap().modified());
}
} else {
println!("Couldn't get file type for {:?}", entry.path());
}
}
}
}
println!("{:?}", pcap_map.iter());
//TODO threading complete logic so files are written concurrent
let mut client = Client::connect(&config.connection, NoTls).unwrap();
client.execute("DROP TABLE IF EXISTS json_dump", &[]).unwrap();
client.execute("CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)", &[]).unwrap();
/* db connection */
// let (client, connection) = tokio_postgres::connect(&config.connection, NoTls).await?;
// tokio::spawn(async move {
// if let Err(e) = connection.await {
// eprintln!("connection error: {}", e);
// }
// });
// client
// .execute("DROP TABLE IF EXISTS json_dump", &[])
// .await?;
// client
// .execute(
// "CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)",
// &[],
// )
// .await?;
/* device or file input */
match config.is_device {
false => for _pcap_file in pcap_map.keys() {
println!("{:?}",&_pcap_file);
// TODO: Tuning vector capacity according to actuarial excpectation, mean average & std dev of packet size
//let v: Vec<parser::QryData> = parser::parse(&_pcap_file, &config.filter);
let mut v = Vec::<parser::QryData>::with_capacity(35536);
v.extend(parser::parse(&_pcap_file, &config.filter));
//let packets_serialized = serializer::serialize_packets(v);
let mut packets_serialized = Vec::<serde_json::Value>::with_capacity(35536);
packets_serialized.extend(serializer::serialize_packets(v));
/* Query */
let chunk_count = packets_serialized.len() / config.insert_max;
let remainder: usize = packets_serialized.len() % config.insert_max;
let chunker = &packets_serialized.len() < &config.insert_max;
match chunker {
true => {
let insert_str = query_string(&packets_serialized.len());
let statement_false = client.prepare(&insert_str).unwrap();
client
.query_raw(
&statement_false,
packets_serialized.iter().map(|p| p as &dyn ToSql),
)
//.await?;
.unwrap();
}
false => {
let insert_str = query_string(&config.insert_max);
let statement = client.prepare(&insert_str).unwrap();
for _i in 0..chunk_count {
let (_input, _) = packets_serialized.split_at(config.insert_max);
client
.query_raw(&statement, _input.to_vec().iter().map(|p| p as &dyn ToSql))
//.await?;
.unwrap();
//let grimmes = packets_serialized.par_chunks(chunk_count).for_each(|x| client.query_raw(&statement, x.to_vec().iter().map(|p| p as &dyn ToSql))());
}
println!("Packets, total:{:?}", packets_serialized.len());
println!("Chunks, total:{}", chunk_count);
println!("Chunks, remainder:{}", remainder);
if remainder > 0 {
let rem_str = query_string(&remainder);
let statement_remainder = client.prepare(&rem_str).unwrap();
let (_garbage, _input) =
packets_serialized.split_at(packets_serialized.len() - remainder);
client
.query_raw(
&statement_remainder,
_input.to_vec().iter().map(|p| p as &dyn ToSql),
)
// .await?;
.unwrap();
}
}
}
},
true => {
let insert_str = query_string(&config.insert_max);
let statement = client.prepare(&insert_str).unwrap();
loop {
let v: Vec<parser::QryData> = parser::parse_device(&config.device, &config.filter, &config.insert_max);
let packets_serialized = serializer::serialize_packets(v);
client
.query_raw(
&statement,
packets_serialized.iter().map(|p| p as &dyn ToSql),
)
//.await?;
.unwrap();
}
},
}
Ok(())
}

View File

@ -2,15 +2,14 @@ extern crate serde_json;
use crate::parser;
use rayon::prelude::*;
use serde::ser::{Serialize, SerializeStruct, Serializer};
use serde_json::json;
//use serde_json::json;
impl Serialize for parser::QryData {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
// 34 (42) is the number of fields in the struct.
let mut state = serializer.serialize_struct("parser::QryData", 34)?;
let mut state = serializer.serialize_struct("parser::QryData", 11)?;
state.serialize_field("time", &self.time)?;
state.serialize_field("ether_header.ether_dhost", &self.ether_header.ether_dhost)?;
state.serialize_field("ether_header.ether_shost", &self.ether_header.ether_shost)?;
@ -26,15 +25,32 @@ impl Serialize for parser::QryData {
}
}
pub fn serialize_packets(v: Vec<parser::QryData>) -> Vec<serde_json::Value> {
// let mut packets_serialized: Vec<_> = Vec::new();
// for packet in v.iter() {
// // packets_serialized.push(json!(&packet));
// }
pub fn serialize_packets(v: Vec<parser::QryData>) -> Vec<serde_json::Value> {
/* rayon parallelized */
let packets_serialized = v.par_iter().map(|x| json!(x)).collect();
// TODO: Benchmark
let packets_serialized = v.par_iter().map(|x| serde_json::to_value(x).unwrap()).collect();
//let packets_serialized: Vec<serde_json::Value> = v.par_iter().map(|x| json!(x)).collect();
packets_serialized
}
// This is way faster than serialize_packets() but can not figure out how to properly select parts of the json from db.
#[allow(dead_code)]
pub fn serialize_packets_as_string(v: Vec<parser::QryData>) -> Vec<serde_json::Value> {
let mut packets_serialized: Vec<serde_json::Value> = Vec::with_capacity(v.len() * 2);
for packet in v.iter() {
packets_serialized.push(serde_json::Value::String(serde_json::to_string(&packet).unwrap()));
}
packets_serialized
}
#[allow(dead_code)]
fn smallest_prime_divisor(remainder: usize ) -> usize {
let smallest_divisor: usize = (2..(remainder/2)).into_par_iter().find_first(|x| remainder % x == 0).unwrap();
smallest_divisor
}