This commit is contained in:
gurkenhabicht 2020-06-04 21:13:48 +02:00
parent 6f56f5a930
commit cb973851d6
15 changed files with 6368 additions and 29 deletions

6
src/.gdb_history Normal file
View File

@ -0,0 +1,6 @@
exit
quit
b tpcpr::main
file
file help
quit

View File

@ -6,9 +6,12 @@ use std::fs::File;
use std::collections::HashMap;
use std::fs;
use std::io::prelude::*;
//use std::thread::{spawn, JoinHandle};
//use std::sync::mpsc::{channel, Receiver};
const PCAPNG_SIGNATURE: [u8; 4] = [0x0a, 0x0d, 0x0d, 0x0a];
const PCAP_SIGNATURE: [u8; 4] = [0xed, 0xab, 0xee, 0xdb];
const PCAP_SIGNATURE: [u8; 4] = [0xd4, 0xc3, 0xb2, 0xa1];
const PCAP_SIGNATURE_BE: [u8; 4] = [0xa1, 0xb2, 0xc3, 0xa1];
pub struct Config {
pub filter: String,
@ -58,7 +61,7 @@ pub fn map_pcap_dir ( pcap_dir: &str ) -> Option<std::collections::HashMap<std::
.unwrap();
match magic_number {
PCAPNG_SIGNATURE => pcap_map.insert(entry.path(), entry.metadata().unwrap()),
PCAP_SIGNATURE => pcap_map.insert(entry.path(), entry.metadata().unwrap()),
PCAP_SIGNATURE | PCAP_SIGNATURE_BE => pcap_map.insert(entry.path(), entry.metadata().unwrap()),
_ => None,
};
// println!("{:?}", &entry.metadata().unwrap().modified());
@ -71,3 +74,5 @@ pub fn map_pcap_dir ( pcap_dir: &str ) -> Option<std::collections::HashMap<std::
}
Some(pcap_map)
}

View File

@ -11,8 +11,6 @@ use tokio_postgres::{Error, NoTls};
fn query_string(insert_max: &usize) -> String {
// Changed this to String with given capacity after stumbling over https://github.com/hoodie/concatenation_benchmarks-rs
// Impressive performance increase!
let mut insert_template = String::with_capacity(insert_max * 8 + 43);
insert_template.push_str("INSERT INTO json_dump (packet) Values ");
@ -31,12 +29,13 @@ async fn main() -> Result<(), Error> {
let config: configure::Config = configure::from_json_file().unwrap();
// TODO: hash file metadata, so its state is comparable at different times and can be written to a db table
// TODO: hash file metadata, so its state is comparable at times and can be written to a db table (e.g. after system crash)
// This db table should include UUIDs so it can be joined effectively
let pcap_map = configure::map_pcap_dir( &config.pcap_dir ).unwrap();
println!("{:?}", pcap_map.iter());
/* db connection */
// println!("{:?}", pcap_map.iter());
/* db connection */
let (client, connection) = tokio_postgres::connect(&config.connection, NoTls).await?;
tokio::spawn(async move {
@ -69,8 +68,7 @@ async fn main() -> Result<(), Error> {
//let mut packets_serialized = Vec::<serde_json::Value>::with_capacity(35536);
//packets_serialized.extend(serializer::serialize_packets(v));
/* Query */
/* Query */
let chunk_count = packets_serialized.len() / config.insert_max;
let remainder: usize = packets_serialized.len() % config.insert_max;
let chunker = &packets_serialized.len() < &config.insert_max;
@ -111,8 +109,7 @@ async fn main() -> Result<(), Error> {
}
}
}
}
,
},
true => {
let insert_str = query_string(&config.insert_max);
let statement = client.prepare(&insert_str).await?;

View File

@ -1,6 +1,6 @@
{
"insert_max": 10000,
"filter": "tcp && ip6",
"filter": "tcp && !ip6",
"from_device": false,
"parse_device": "enp7s0",
"pcap_file": "../target/arp_test.pcapng",

File diff suppressed because it is too large Load Diff

189
src/parser/.junk/main.rs Normal file
View File

@ -0,0 +1,189 @@
extern crate serde_json;
extern crate tokio;
extern crate tokio_postgres;
mod configure;
mod parser;
mod serializer;
use tokio_postgres::types::ToSql;
use tokio_postgres::{Error, NoTls};
use std::thread::{spawn, JoinHandle};
use std::sync::mpsc::{channel, Receiver};
use std::sync::mpsc;
fn query_string(insert_max: &usize) -> String {
let mut insert_template = String::with_capacity(insert_max * 8 + 43);
insert_template.push_str("INSERT INTO json_dump (packet) Values ");
for insert in 0..insert_max - 1 {
insert_template.push_str(&(format!("(${}), ", insert + 1)));
}
insert_template.push_str(&(format!("(${})", insert_max)));
insert_template
}
pub trait OffThreadExt: Iterator {
fn off_thread(self) -> mpsc::IntoIter<Self::Item>;
}
impl <T> OffThreadExt for T
where T: Iterator + Send + 'static,
T::Item: Send + 'static
{
fn off_thread(self) -> mpsc::IntoIter<Self::Item>{
// Create channel to transfer items from the worker thread
let (sender, receiver) = mpsc::sync_channel(1024);
// Move and run iterator to,from new thread
spawn(move || {
for item in self {
if sender.send(item).is_err() {
break
}
}
});
receiver.into_iter()
}
}
#[tokio::main(core_threads = 4)] // By default, tokio_postgres uses the tokio crate as its runtime.
async fn main() -> Result<(), Error> {
/* Init values from file */
let config: configure::Config = configure::from_json_file().unwrap();
// TODO: hash file metadata, so its state is comparable at different times and can be written to a db table
// This db table should include UUIDs so it can be joined effectively
let pcap_map = configure::map_pcap_dir( &config.pcap_dir ).unwrap();
// println!("{:?}", pcap_map.iter());
/* db connection */
let (client, connection) = tokio_postgres::connect(&config.connection, NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
client
.execute("DROP TABLE IF EXISTS json_dump", &[])
.await?;
client
.execute(
"CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)",
&[],
)
.await?;
/* device or file input */
match config.is_device {
false => {
let pcap_dir = config.pcap_dir.clone();
let (pcap_files, h1) = configure::start_map_pcap_dir_thread(pcap_dir.to_owned());
let (qry_data, h2) = parser::start_parse_thread(pcap_files);
//let (qry_data, h2) = parser::start_parse_thread_vec(pcap_files);
let (data_serialized, h3) = serializer::start_serialize_packets_thread(qry_data);
//let data_serialized = serializer::start_serialize_vec_thread(qry_data);
let packets_serialized = serializer::start_collect_serialized(data_serialized);
let r1 = h1.join().unwrap();
let r2 = h2.join().unwrap();
//let r3 = h3.join().unwrap();
//for _pcap_file in pcap_map.keys() {
// let packets_serialized = configure::start_map_pcap_dir_thread(config.pcap_dir.to_owned()).into_iter()
// //.errors_to(error_sender)
// .off_thread()
// .map(|(x,y)| parser::start_parse_thread(x))
// //.errors_to(error_sender)
// .off_thread()
// .map(|(x,y)| serializer::start_serialize_packets_thread(x))
// .off_thread().collect();
//println!("{:?}",&_pcap_file);
// TODO: Tuning vector capacity according to actuarial excpectation, mean average & std dev of packet size
// let v: Vec<parser::QryData> = parser::parse(&_pcap_file, &config.filter);
//let mut v = Vec::<parser::QryData>::with_capacity(35536);
//v.extend(parser::parse(&_pcap_file, &config.filter));
// let packets_serialized = serializer::serialize_packets(v);
//let mut packets_serialized = Vec::<serde_json::Value>::with_capacity(35536);
//packets_serialized.extend(serializer::serialize_packets(v));
/* Query */
let chunk_count = packets_serialized.len() / config.insert_max;
let remainder: usize = packets_serialized.len() % config.insert_max;
let chunker = &packets_serialized.len() < &config.insert_max;
match chunker {
true => {
let insert_str = query_string(&packets_serialized.len());
let statement_false = client.prepare(&insert_str).await?;
client
.query_raw(
&statement_false,
packets_serialized.iter().map(|p| p as &dyn ToSql),
)
.await?;
}
false => {
let insert_str = query_string(&config.insert_max);
let statement = client.prepare(&insert_str).await?;
for _i in 0..chunk_count {
let (_input, _) = packets_serialized.split_at(config.insert_max);
client
.query_raw(&statement, _input.to_vec().iter().map(|p| p as &dyn ToSql))
.await?;
}
if remainder > 0 {
let rem_str = query_string(&remainder);
let statement_remainder = client.prepare(&rem_str).await?;
let (_garbage, _input) =
packets_serialized.split_at(packets_serialized.len() - remainder);
client
.query_raw(
&statement_remainder,
_input.to_vec().iter().map(|p| p as &dyn ToSql),
)
.await?;
}
}
}
},
true => {
let insert_str = query_string(&config.insert_max);
let statement = client.prepare(&insert_str).await?;
loop {
let v: Vec<parser::QryData> = parser::parse_device(&config.device, &config.filter, &config.insert_max);
let packets_serialized = serializer::serialize_packets(v);
client
.query_raw(
&statement,
packets_serialized.iter().map(|p| p as &dyn ToSql),
)
.await?;
}
},
}
Ok(())
}

View File

@ -0,0 +1,11 @@
{
"insert_max": 10000,
"filter": "tcp && ip6",
"from_device": false,
"parse_device": "enp7s0",
"pcap_file": "../target/arp_test.pcapng",
"pcap_dir": "../target",
"database_user": "postgres",
"database_host": "localhost",
"database_password": "password"
}

461
src/parser/.junk/parser.rs Normal file
View File

@ -0,0 +1,461 @@
extern crate bitfield;
extern crate byteorder;
extern crate eui48;
mod packet_handler;
use super::configure;
use eui48::{Eui48, MacAddress};
use pcap::Capture;
use regex::bytes::Regex;
use std::str;
use std::thread::{spawn, JoinHandle};
use std::sync::mpsc::{channel, Receiver};
/* protocol ids, LittleEndian */
const ETH_P_IPV6: usize = 0xDD86;
const ETH_P_IP: usize = 0x08;
const TCP: usize = 0x06;
const UDP: usize = 0x11;
const ETH_P_ARP: usize = 0x0608;
const ETH_P_RARP: usize = 0x3580;
fn build_ether() -> packet_handler::EtherHeader {
packet_handler::EtherHeader {
ether_dhost: MacAddress::new([0;6]),
ether_shost: MacAddress::new([0;6]),
ether_type: 0,
}
}
#[derive(Debug, Clone)]
pub struct QryData {
pub id: i32,
pub time: f64,
pub data: Option<Vec<u8>>,
pub ether_header: packet_handler::EtherHeader,
pub ipv4_header: Option<packet_handler::IpV4Header>,
pub ipv6_header: Option<packet_handler::IpV6Header>,
pub tcp_header: Option<packet_handler::TcpHeader>,
pub udp_header: Option<packet_handler::UdpHeader>,
pub arp_header: Option<packet_handler::ArpHeader>,
pub reg_res: Option<String>,
}
fn flag_carnage(re: &Regex, payload: &[u8]) -> Option<String> {
let mut flags: String = String::new();
for mat in re.find_iter(payload) {
flags.push_str(std::str::from_utf8(mat.as_bytes()).unwrap());
}
match 0 < flags.len() {
false => None,
true => Some(flags),
}
}
#[allow(dead_code)]
pub fn start_parse_thread_vec(parse_files: Receiver<std::path::PathBuf>) -> (Receiver<Vec<QryData>>, JoinHandle<()>) {
let (sender, receiver) = channel();
let handle = spawn( move || {
let ether_init = build_ether();
let config = configure::from_json_file().unwrap();
// TODO: make init fn
let mut me = QryData {
id: 0,
time: 0.0,
data: None,
ether_header: ether_init,
ipv4_header: None::<packet_handler::IpV4Header>,
ipv6_header: None::<packet_handler::IpV6Header>,
tcp_header: None::<packet_handler::TcpHeader>,
udp_header: None::<packet_handler::UdpHeader>,
arp_header: None::<packet_handler::ArpHeader>,
reg_res: None::<String>,
};
let mut v: Vec<QryData> = Vec::new();
for parse_file in parse_files {
let mut cap = Capture::from_file(parse_file).unwrap();
Capture::filter(&mut cap, &config.filter).unwrap();
let re = Regex::new(r"(?:http|https):[[::punct::]]?").unwrap();
while let Ok(packet) = cap.next() {
me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64;
me.data = Some(packet.data.to_vec());
me.reg_res = flag_carnage(&re, packet.data);
me.reg_res = None;
me.ether_header = packet_handler::ethernet_handler(packet.data);
match me.ether_header.ether_type as usize {
ETH_P_IP => {
me.ipv6_header = None::<packet_handler::IpV6Header>;
me.ipv4_header = Some(packet_handler::ip_handler(packet.data)).unwrap();
match me.ipv4_header.unwrap().ip_protocol as usize {
TCP => {
me.tcp_header = Some(packet_handler::tcp_handler(
me.ipv4_header.unwrap().ip_ihl,
packet.data,
))
.unwrap();
me.data = Some(packet_handler::payload_handler(
me.ipv4_header.unwrap().ip_ihl,
me.tcp_header.unwrap().data_offset,
packet.data,
)).unwrap();
}
UDP => {
me.udp_header = Some(packet_handler::udp_handler(
me.ipv4_header.unwrap().ip_ihl,
packet.data,
))
.unwrap();
me.data = Some(packet_handler::payload_handler(
me.ipv4_header.unwrap().ip_ihl,
7,
packet.data,
)).unwrap();
}
_ => println!("network protocol not implemented"),
}
}
ETH_P_IPV6 => {
me.ipv4_header = None::<packet_handler::IpV4Header>;
me.ipv6_header = Some(packet_handler::ipv6_handler(packet.data)).unwrap();
match me.ipv6_header.unwrap().next_header as usize {
TCP => {
me.tcp_header = Some(packet_handler::tcp_handler(10, packet.data)).unwrap();
me.data = Some(packet_handler::payload_handler(
10,
me.tcp_header.unwrap().data_offset,
packet.data,
)).unwrap();
}
UDP => {
me.udp_header = Some(packet_handler::udp_handler(10, packet.data)).unwrap();
me.data = Some(packet_handler::payload_handler(10, 7, packet.data)).unwrap();
}
_ => println!("network protocol not implemented"),
}
}
ETH_P_ARP | ETH_P_RARP => {
me.arp_header = Some(packet_handler::arp_handler(packet.data)).unwrap();
me.data = None;
}
_ => println!("network protocol not implemented"),
}
v.push(QryData {
id: 0,
time: me.time,
data: me.data,
ether_header: me.ether_header,
ipv4_header: me.ipv4_header,
ipv6_header: me.ipv6_header,
tcp_header: me.tcp_header,
udp_header: me.udp_header,
arp_header: me.arp_header,
reg_res: me.reg_res,
});
}
}
sender.send(v.clone());//.is_err() { break; }
});
(receiver, handle)
}
#[allow(dead_code)]
pub fn start_parse_thread(parse_files: Receiver<std::path::PathBuf>) -> (Receiver<QryData>, JoinHandle<()>) {
let (sender, receiver) = channel();
let handle = spawn( move || {
let ether_init = build_ether();
let config = configure::from_json_file().unwrap();
// TODO: make init fn
let mut me = QryData {
id: 0,
time: 0.0,
data: None,
ether_header: ether_init,
ipv4_header: None::<packet_handler::IpV4Header>,
ipv6_header: None::<packet_handler::IpV6Header>,
tcp_header: None::<packet_handler::TcpHeader>,
udp_header: None::<packet_handler::UdpHeader>,
arp_header: None::<packet_handler::ArpHeader>,
reg_res: None::<String>,
};
for parse_file in parse_files {
let mut cap = Capture::from_file(parse_file).unwrap();
Capture::filter(&mut cap, &config.filter).unwrap();
let re = Regex::new(r"(?:http|https):[[::punct::]]?").unwrap();
while let Ok(packet) = cap.next() {
me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64;
me.data = Some(packet.data.to_vec());
me.reg_res = flag_carnage(&re, packet.data);
me.reg_res = None;
me.ether_header = packet_handler::ethernet_handler(packet.data);
match me.ether_header.ether_type as usize {
ETH_P_IP => {
me.ipv6_header = None::<packet_handler::IpV6Header>;
me.ipv4_header = Some(packet_handler::ip_handler(packet.data)).unwrap();
match me.ipv4_header.unwrap().ip_protocol as usize {
TCP => {
me.tcp_header = Some(packet_handler::tcp_handler(
me.ipv4_header.unwrap().ip_ihl,
packet.data,
))
.unwrap();
me.data = Some(packet_handler::payload_handler(
me.ipv4_header.unwrap().ip_ihl,
me.tcp_header.unwrap().data_offset,
packet.data,
)).unwrap();
}
UDP => {
me.udp_header = Some(packet_handler::udp_handler(
me.ipv4_header.unwrap().ip_ihl,
packet.data,
))
.unwrap();
me.data = Some(packet_handler::payload_handler(
me.ipv4_header.unwrap().ip_ihl,
7,
packet.data,
)).unwrap();
}
_ => println!("network protocol not implemented"),
}
}
ETH_P_IPV6 => {
me.ipv4_header = None::<packet_handler::IpV4Header>;
me.ipv6_header = Some(packet_handler::ipv6_handler(packet.data)).unwrap();
match me.ipv6_header.unwrap().next_header as usize {
TCP => {
me.tcp_header = Some(packet_handler::tcp_handler(10, packet.data)).unwrap();
me.data = Some(packet_handler::payload_handler(
10,
me.tcp_header.unwrap().data_offset,
packet.data,
)).unwrap();
}
UDP => {
me.udp_header = Some(packet_handler::udp_handler(10, packet.data)).unwrap();
me.data = Some(packet_handler::payload_handler(10, 7, packet.data)).unwrap();
}
_ => println!("network protocol not implemented"),
}
}
ETH_P_ARP | ETH_P_RARP => {
me.arp_header = Some(packet_handler::arp_handler(packet.data)).unwrap();
me.data = None;
}
_ => println!("network protocol not implemented"),
}
if sender.send(me.clone()).is_err() {
break;
}
}}
});
(receiver, handle)
}
pub fn parse(parse_file: &std::path::Path, filter_str: &str) -> Vec<QryData> {
let ether_init = build_ether();
let mut me = QryData {
id: 0,
time: 0.0,
data: None,
ether_header: ether_init,
ipv4_header: None::<packet_handler::IpV4Header>,
ipv6_header: None::<packet_handler::IpV6Header>,
tcp_header: None::<packet_handler::TcpHeader>,
udp_header: None::<packet_handler::UdpHeader>,
arp_header: None::<packet_handler::ArpHeader>,
reg_res: None::<String>,
};
let mut v: Vec<QryData> = Vec::new();
let mut cap = Capture::from_file(parse_file).unwrap();
Capture::filter(&mut cap, &filter_str).unwrap();
let re = Regex::new(r"(?:http|https):[[::punct::]]?").unwrap();
while let Ok(packet) = cap.next() {
me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64;
me.data = Some(packet.data.to_vec());
me.reg_res = flag_carnage(&re, packet.data);
me.ether_header = packet_handler::ethernet_handler(packet.data);
match me.ether_header.ether_type as usize {
ETH_P_IP => {
me.ipv6_header = None::<packet_handler::IpV6Header>;
me.ipv4_header = Some(packet_handler::ip_handler(packet.data)).unwrap();
match me.ipv4_header.unwrap().ip_protocol as usize {
TCP => {
me.tcp_header = Some(packet_handler::tcp_handler(
me.ipv4_header.unwrap().ip_ihl,
packet.data,
))
.unwrap();
me.data = Some(packet_handler::payload_handler(
me.ipv4_header.unwrap().ip_ihl,
me.tcp_header.unwrap().data_offset,
packet.data,
)).unwrap();
}
UDP => {
me.udp_header = Some(packet_handler::udp_handler(
me.ipv4_header.unwrap().ip_ihl,
packet.data,
))
.unwrap();
me.data = Some(packet_handler::payload_handler(
me.ipv4_header.unwrap().ip_ihl,
7,
packet.data,
)).unwrap();
}
_ => println!("network protocol not implemented"),
}
}
ETH_P_IPV6 => {
me.ipv4_header = None::<packet_handler::IpV4Header>;
me.ipv6_header = Some(packet_handler::ipv6_handler(packet.data)).unwrap();
match me.ipv6_header.unwrap().next_header as usize {
TCP => {
me.tcp_header = Some(packet_handler::tcp_handler(10, packet.data)).unwrap();
me.data = Some(packet_handler::payload_handler(
10,
me.tcp_header.unwrap().data_offset,
packet.data,
)).unwrap();
}
UDP => {
me.udp_header = Some(packet_handler::udp_handler(10, packet.data)).unwrap();
me.data = Some(packet_handler::payload_handler(10, 7, packet.data)).unwrap();
}
_ => println!("network protocol not implemented"),
}
}
ETH_P_ARP | ETH_P_RARP => {
me.arp_header = Some(packet_handler::arp_handler(packet.data)).unwrap();
me.data = None;
}
_ => println!("network protocol not implemented"),
}
v.push(QryData {
id: 0,
time: me.time,
data: me.data,
ether_header: me.ether_header,
ipv4_header: me.ipv4_header,
ipv6_header: me.ipv6_header,
tcp_header: me.tcp_header,
udp_header: me.udp_header,
arp_header: me.arp_header,
reg_res: me.reg_res,
});
}
v
}
pub fn parse_device(parse_device: &str, filter_str: &str, insert_max: &usize) -> Vec<QryData> {
let ether_init = build_ether();
let mut me = QryData {
id: 0,
time: 0.0,
data: None,
ether_header: ether_init,
ipv4_header: None::<packet_handler::IpV4Header>,
ipv6_header: None::<packet_handler::IpV6Header>,
tcp_header: None::<packet_handler::TcpHeader>,
udp_header: None::<packet_handler::UdpHeader>,
arp_header: None::<packet_handler::ArpHeader>,
reg_res: None::<String>,
};
let mut v: Vec<QryData> = Vec::new();
let mut cap = Capture::from_device(parse_device).unwrap().open().unwrap();
Capture::filter(&mut cap, &filter_str).unwrap();
let re = Regex::new(r"(?:http|https):[[::punct::]]").unwrap();
'parse: while let Ok(packet) = cap.next() {
me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64;
me.data = Some(packet.data.to_vec());
me.reg_res = flag_carnage(&re, packet.data);
me.ether_header = packet_handler::ethernet_handler(packet.data);
match me.ether_header.ether_type as usize {
ETH_P_IP => {
me.ipv6_header = None::<packet_handler::IpV6Header>;
me.ipv4_header = Some(packet_handler::ip_handler(packet.data)).unwrap();
match me.ipv4_header.unwrap().ip_protocol as usize {
TCP => {
me.tcp_header = Some(packet_handler::tcp_handler(
me.ipv4_header.unwrap().ip_ihl,
packet.data,
))
.unwrap();
me.data = Some(packet_handler::payload_handler(
me.ipv4_header.unwrap().ip_ihl,
me.tcp_header.unwrap().data_offset,
packet.data,
)).unwrap();
}
UDP => {
me.udp_header = Some(packet_handler::udp_handler(
me.ipv4_header.unwrap().ip_ihl,
packet.data,
))
.unwrap();
me.data = Some(packet_handler::payload_handler(
me.ipv4_header.unwrap().ip_ihl,
7,
packet.data,
)).unwrap();
}
_ => println!("network protocol not implemented"),
}
}
ETH_P_IPV6 => {
me.ipv4_header = None::<packet_handler::IpV4Header>;
me.ipv6_header = Some(packet_handler::ipv6_handler(packet.data)).unwrap();
match me.ipv6_header.unwrap().next_header as usize {
TCP => {
me.tcp_header = Some(packet_handler::tcp_handler(10, packet.data)).unwrap();
me.data = Some(packet_handler::payload_handler(
10,
me.tcp_header.unwrap().data_offset,
packet.data,
)).unwrap();
}
UDP => {
me.udp_header = Some(packet_handler::udp_handler(10, packet.data)).unwrap();
me.data = Some(packet_handler::payload_handler(10, 7, packet.data)).unwrap();
}
_ => println!("network protocol not implemented"),
}
}
ETH_P_ARP | ETH_P_RARP => {
me.arp_header = Some(packet_handler::arp_handler(packet.data)).unwrap();
me.data = None;
}
_ => println!("network protocol not implemented"),
}
v.push(QryData {
id: 0,
time: me.time,
data: me.data,
ether_header: me.ether_header,
ipv4_header: me.ipv4_header,
ipv6_header: me.ipv6_header,
tcp_header: me.tcp_header,
udp_header: me.udp_header,
arp_header: me.arp_header,
reg_res: me.reg_res,
});
if &v.len() >= insert_max {
break 'parse;
}
}
v
}

View File

@ -0,0 +1,104 @@
extern crate serde_json;
use crate::parser;
use rayon::prelude::*;
use serde::ser::{Serialize, SerializeStruct, Serializer};
use std::thread::{spawn, JoinHandle};
use std::thread;
use std::sync::mpsc::{channel, Receiver};
use serde_json::json;
impl Serialize for parser::QryData {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("parser::QryData", 11)?;
state.serialize_field("time", &self.time)?;
state.serialize_field("ether_header.ether_dhost", &self.ether_header.ether_dhost.to_hex_string())?;
state.serialize_field("ether_header.ether_shost", &self.ether_header.ether_shost.to_hex_string())?;
state.serialize_field("ether_header.ether_type", &self.ether_header.ether_type)?;
state.serialize_field("ipv4_header", &self.ipv4_header)?;
state.serialize_field("ipv6_header", &self.ipv6_header)?;
state.serialize_field("tcp_header", &self.tcp_header)?;
state.serialize_field("udp_header", &self.udp_header)?;
state.serialize_field("arp_header", &self.arp_header)?;
state.serialize_field("data", &self.data)?;
state.serialize_field("reg_res", &self.reg_res)?;
state.end()
}
}
pub fn serialize_packets(v: Vec<parser::QryData>) -> Vec<serde_json::Value> {
/* rayon parallelized */
// TODO: Benchmark. As far as I tested, this reaps no benefit.
let packets_serialized = v.par_iter().map(|x| serde_json::to_value(x).unwrap()).collect();
//let packets_serialized: Vec<serde_json::Value> = v.par_iter().map(|x| json!(x)).collect();
packets_serialized
}
// This is way faster than serialize_packets() but I can not figure out how to properly select parts from the resulting json structure as an sql query
#[allow(dead_code)]
pub fn serialize_packets_as_string(v: Vec<parser::QryData>) -> Vec<serde_json::Value> {
let mut packets_serialized: Vec<serde_json::Value> = Vec::with_capacity(v.len() * 2);
for packet in v.iter() {
packets_serialized.push(serde_json::Value::String(serde_json::to_string(&packet).unwrap()));
}
packets_serialized
}
#[allow(dead_code)]
fn smallest_prime_divisor(remainder: usize ) -> usize {
let smallest_divisor: usize = (2..(remainder/2)).into_par_iter().find_first(|x| remainder % x == 0).unwrap();
smallest_divisor
}
#[allow(dead_code)]
//pub fn start_serialize_packets_thread(parsed_data: Receiver<parser::QryData>) -> (Receiver<serde_json::Value>, JoinHandle<()>) {
pub fn start_serialize_packets_thread(parsed_data: Receiver<parser::QryData>) -> (Receiver<serde_json::Value>, JoinHandle<()>) {
let (sender, receiver) = channel();
let handle = spawn ( move || {
for data in parsed_data {
//serde_json::Value::String(serde_json::to_string(&packet).unwrap())
if sender.send(serde_json::to_value(data).unwrap()).is_err() {
break;
}
}
});
(receiver, handle)
}
#[allow(dead_code)]
pub fn start_collect_serialized( serialized_data: Receiver<serde_json::Value> ) -> Vec<serde_json::Value> {
//let (sender, receiver) = channel();
let mut packets_serialized:Vec<serde_json::Value> = Vec::new();
for data in serialized_data {
packets_serialized.push(data);
}
packets_serialized
}
pub fn start_serialize_vec_thread(parsed_data: Receiver<Vec<parser::QryData>>) -> Receiver<serde_json::Value> {
let (sender, receiver) = channel();
for vec in parsed_data {
for packet in vec {
let sender = sender.clone();
let handle = spawn ( move || {
//sender.send(start_serialize_packets_thread(packet));
sender.send(json!(packet))
//sender.send( serde_json::to_value(packet).unwrap())
//serde_json::Value::String(serde_json::to_string(&packet).unwrap())
//if sender.send(serde_json::to_value(data).unwrap()).is_err() {
// break;
}
);}
//});
}
receiver
}

View File

@ -0,0 +1,190 @@
extern crate serde_json;
extern crate tokio;
extern crate tokio_postgres;
mod configure;
mod parser;
mod serializer;
use tokio_postgres::types::ToSql;
use tokio_postgres::{Error, NoTls};
use std::thread::{spawn, JoinHandle};
use std::sync::mpsc::{channel, Receiver};
use std::sync::mpsc;
fn query_string(insert_max: &usize) -> String {
let mut insert_template = String::with_capacity(insert_max * 8 + 43);
insert_template.push_str("INSERT INTO json_dump (packet) Values ");
for insert in 0..insert_max - 1 {
insert_template.push_str(&(format!("(${}), ", insert + 1)));
}
insert_template.push_str(&(format!("(${})", insert_max)));
insert_template
}
pub trait OffThreadExt: Iterator {
fn off_thread(self) -> mpsc::IntoIter<Self::Item>;
}
impl <T> OffThreadExt for T
where T: Iterator + Send + 'static,
T::Item: Send + 'static
{
fn off_thread(self) -> mpsc::IntoIter<Self::Item>{
// Create channel to transfer items from the worker thread
let (sender, receiver) = mpsc::sync_channel(1024);
// Move and run iterator to,from new thread
spawn(move || {
for item in self {
if sender.send(item).is_err() {
break
}
}
});
receiver.into_iter()
}
}
#[tokio::main(core_threads = 4)] // By default, tokio_postgres uses the tokio crate as its runtime.
async fn main() -> Result<(), Error> {
/* Init values from file */
let config: configure::Config = configure::from_json_file().unwrap();
// TODO: hash file metadata, so its state is comparable at different times and can be written to a db table
// This db table should include UUIDs so it can be joined effectively
// let pcap_map = configure::map_pcap_dir( &config.pcap_dir ).unwrap();
// println!("{:?}", pcap_map.iter());
/* db connection */
let (client, connection) = tokio_postgres::connect(&config.connection, NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
client
.execute("DROP TABLE IF EXISTS json_dump", &[])
.await?;
client
.execute(
"CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)",
&[],
)
.await?;
/* device or file input */
match config.is_device {
false => {
let pcap_dir = config.pcap_dir.clone();
let (pcap_files, h1) = configure::start_map_pcap_dir_thread(pcap_dir.to_owned());
println!("{:?}", pcap_files.iter());
let (qry_data, h2) = parser::start_parse_thread(pcap_files);
//let (qry_data, h2) = parser::start_parse_thread_vec(pcap_files);
let (data_serialized, h3) = serializer::start_serialize_packets_thread(qry_data);
//let data_serialized = serializer::start_serialize_vec_thread(qry_data);
let packets_serialized = serializer::start_collect_serialized(data_serialized);
let r1 = h1.join().unwrap();
let r2 = h2.join().unwrap();
//let r3 = h3.join().unwrap();
//for _pcap_file in pcap_map.keys() {
// let packets_serialized = configure::start_map_pcap_dir_thread(config.pcap_dir.to_owned()).into_iter()
// //.errors_to(error_sender)
// .off_thread()
// .map(|(x,y)| parser::start_parse_thread(x))
// //.errors_to(error_sender)
// .off_thread()
// .map(|(x,y)| serializer::start_serialize_packets_thread(x))
// .off_thread().collect();
//println!("{:?}",&_pcap_file);
// TODO: Tuning vector capacity according to actuarial excpectation, mean average & std dev of packet size
// let v: Vec<parser::QryData> = parser::parse(&_pcap_file, &config.filter);
//let mut v = Vec::<parser::QryData>::with_capacity(35536);
//v.extend(parser::parse(&_pcap_file, &config.filter));
// let packets_serialized = serializer::serialize_packets(v);
//let mut packets_serialized = Vec::<serde_json::Value>::with_capacity(35536);
//packets_serialized.extend(serializer::serialize_packets(v));
/* Query */
let chunk_count = packets_serialized.len() / config.insert_max;
let remainder: usize = packets_serialized.len() % config.insert_max;
let chunker = &packets_serialized.len() < &config.insert_max;
match chunker {
true => {
let insert_str = query_string(&packets_serialized.len());
let statement_false = client.prepare(&insert_str).await?;
client
.query_raw(
&statement_false,
packets_serialized.iter().map(|p| p as &dyn ToSql),
)
.await?;
}
false => {
let insert_str = query_string(&config.insert_max);
let statement = client.prepare(&insert_str).await?;
for _i in 0..chunk_count {
let (_input, _) = packets_serialized.split_at(config.insert_max);
client
.query_raw(&statement, _input.to_vec().iter().map(|p| p as &dyn ToSql))
.await?;
}
if remainder > 0 {
let rem_str = query_string(&remainder);
let statement_remainder = client.prepare(&rem_str).await?;
let (_garbage, _input) =
packets_serialized.split_at(packets_serialized.len() - remainder);
client
.query_raw(
&statement_remainder,
_input.to_vec().iter().map(|p| p as &dyn ToSql),
)
.await?;
}
}
}
},
true => {
let insert_str = query_string(&config.insert_max);
let statement = client.prepare(&insert_str).await?;
loop {
let v: Vec<parser::QryData> = parser::parse_device(&config.device, &config.filter, &config.insert_max);
let packets_serialized = serializer::serialize_packets(v);
client
.query_raw(
&statement,
packets_serialized.iter().map(|p| p as &dyn ToSql),
)
.await?;
}
},
}
Ok(())
}

130
src/parser/.junk/tokio_main Normal file
View File

@ -0,0 +1,130 @@
extern crate serde_json;
extern crate tokio;
extern crate tokio_postgres;
mod configure;
mod parser;
mod serializer;
use tokio_postgres::types::ToSql;
use tokio_postgres::{Error, NoTls};
fn query_string(insert_max: &usize) -> String {
let mut insert_template = String::with_capacity(insert_max * 8 + 43);
insert_template.push_str("INSERT INTO json_dump (packet) Values ");
for insert in 0..insert_max - 1 {
insert_template.push_str(&(format!("(${}), ", insert + 1)));
}
insert_template.push_str(&(format!("(${})", insert_max)));
insert_template
}
#[tokio::main(core_threads = 4)] // By default, tokio_postgres uses the tokio crate as its runtime.
async fn main() -> Result<(), Error> {
/* Init values from file */
let config: configure::Config = configure::from_json_file().unwrap();
// TODO: hash file metadata, so its state is comparable at different times and can be written to a db table
// This db table should include UUIDs so it can be joined effectively
let pcap_map = configure::map_pcap_dir( &config.pcap_dir ).unwrap();
// println!("{:?}", pcap_map.iter());
/* db connection */
let (client, connection) = tokio_postgres::connect(&config.connection, NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
client
.execute("DROP TABLE IF EXISTS json_dump", &[])
.await?;
client
.execute(
"CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)",
&[],
)
.await?;
/* device or file input */
match config.is_device {
false => for _pcap_file in pcap_map.keys() {
println!("{:?}",&_pcap_file);
// TODO: Tuning vector capacity according to actuarial excpectation, mean average & std dev of packet size
let v: Vec<parser::QryData> = parser::parse(&_pcap_file, &config.filter);
//let mut v = Vec::<parser::QryData>::with_capacity(35536);
//v.extend(parser::parse(&_pcap_file, &config.filter));
let packets_serialized = serializer::serialize_packets(v);
//let mut packets_serialized = Vec::<serde_json::Value>::with_capacity(35536);
//packets_serialized.extend(serializer::serialize_packets(v));
/* Query */
let chunk_count = packets_serialized.len() / config.insert_max;
let remainder: usize = packets_serialized.len() % config.insert_max;
let chunker = &packets_serialized.len() < &config.insert_max;
match chunker {
true => {
let insert_str = query_string(&packets_serialized.len());
let statement_false = client.prepare(&insert_str).await?;
client
.query_raw(
&statement_false,
packets_serialized.iter().map(|p| p as &dyn ToSql),
)
.await?;
}
false => {
let insert_str = query_string(&config.insert_max);
let statement = client.prepare(&insert_str).await?;
for _i in 0..chunk_count {
let (_input, _) = packets_serialized.split_at(config.insert_max);
client
.query_raw(&statement, _input.to_vec().iter().map(|p| p as &dyn ToSql))
.await?;
}
if remainder > 0 {
let rem_str = query_string(&remainder);
let statement_remainder = client.prepare(&rem_str).await?;
let (_garbage, _input) =
packets_serialized.split_at(packets_serialized.len() - remainder);
client
.query_raw(
&statement_remainder,
_input.to_vec().iter().map(|p| p as &dyn ToSql),
)
.await?;
}
}
}
},
true => {
let insert_str = query_string(&config.insert_max);
let statement = client.prepare(&insert_str).await?;
loop {
let v: Vec<parser::QryData> = parser::parse_device(&config.device, &config.filter, &config.insert_max);
let packets_serialized = serializer::serialize_packets(v);
client
.query_raw(
&statement,
packets_serialized.iter().map(|p| p as &dyn ToSql),
)
.await?;
}
},
}
Ok(())
}

View File

@ -2,10 +2,13 @@ extern crate bitfield;
extern crate byteorder;
extern crate eui48;
mod packet_handler;
use eui48::MacAddress;
use eui48::{MacAddress};
use pcap::Capture;
use regex::bytes::Regex;
use std::str;
//use std::thread::{spawn, JoinHandle};
//use std::sync::mpsc::{channel, Receiver};
/* protocol ids, LittleEndian */
const ETH_P_IPV6: usize = 0xDD86;
@ -17,8 +20,8 @@ const ETH_P_RARP: usize = 0x3580;
fn build_ether() -> packet_handler::EtherHeader {
packet_handler::EtherHeader {
ether_dhost: (MacAddress::new([0; 6])).to_hex_string(),
ether_shost: (MacAddress::new([0; 6])).to_hex_string(),
ether_dhost: MacAddress::new([0;6]),
ether_shost: MacAddress::new([0;6]),
ether_type: 0,
}
}
@ -37,6 +40,7 @@ pub struct QryData {
pub reg_res: Option<String>,
}
/* Regex parse _complete_ package */
fn flag_carnage(re: &Regex, payload: &[u8]) -> Option<String> {
let mut flags: String = String::new();
for mat in re.find_iter(payload) {
@ -146,6 +150,8 @@ pub fn parse(parse_file: &std::path::Path, filter_str: &str) -> Vec<QryData> {
v
}
/* This could need some love */
pub fn parse_device(parse_device: &str, filter_str: &str, insert_max: &usize) -> Vec<QryData> {
let ether_init = build_ether();

View File

@ -15,13 +15,12 @@ const ETH_ALEN: usize = 6;
const ETH_TLEN: usize = 2;
const ETHER_HDRLEN: usize = 14;
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy)]
pub struct EtherHeader {
// pub ether_dhost: MacAddress, // ommitted because serde_json serializer doen 't recognize MacAddress type
// pub ether_shost: MacAddress,
pub ether_dhost: String,
pub ether_shost: String,
pub ether_type: i32,
pub ether_dhost: MacAddress,
pub ether_shost: MacAddress,
pub ether_type: i32,
}
// TODO: implement optional ethernet vlan shim header fields
@ -35,8 +34,11 @@ pub fn ethernet_handler(packet_data: &[u8]) -> EtherHeader {
_ether_type = LittleEndian::read_u16(&packet_data[ETH_ALEN * 2..(ETH_ALEN * 2) + ETH_TLEN]);
EtherHeader {
ether_dhost: (MacAddress::new(_ether_dhost as Eui48).to_hex_string()),
ether_shost: (MacAddress::new(_ether_shost as Eui48).to_hex_string()),
ether_dhost: (MacAddress::new(_ether_dhost as Eui48)),
ether_shost: (MacAddress::new(_ether_shost as Eui48)),
// ether_dhost: _ether_dhost as Eui48,
// ether_shost: _ether_shost as Eui48,
ether_type: _ether_type as i32,
}
}

View File

@ -2,6 +2,9 @@ extern crate serde_json;
use crate::parser;
use rayon::prelude::*;
use serde::ser::{Serialize, SerializeStruct, Serializer};
//use std::thread::{spawn, JoinHandle};
//use std::thread;
//use std::sync::mpsc::{channel, Receiver};
//use serde_json::json;
impl Serialize for parser::QryData {
@ -11,8 +14,8 @@ impl Serialize for parser::QryData {
{
let mut state = serializer.serialize_struct("parser::QryData", 11)?;
state.serialize_field("time", &self.time)?;
state.serialize_field("ether_header.ether_dhost", &self.ether_header.ether_dhost)?;
state.serialize_field("ether_header.ether_shost", &self.ether_header.ether_shost)?;
state.serialize_field("ether_header.ether_dhost", &self.ether_header.ether_dhost.to_hex_string())?;
state.serialize_field("ether_header.ether_shost", &self.ether_header.ether_shost.to_hex_string())?;
state.serialize_field("ether_header.ether_type", &self.ether_header.ether_type)?;
state.serialize_field("ipv4_header", &self.ipv4_header)?;
state.serialize_field("ipv6_header", &self.ipv6_header)?;
@ -25,12 +28,10 @@ impl Serialize for parser::QryData {
}
}
pub fn serialize_packets(v: Vec<parser::QryData>) -> Vec<serde_json::Value> {
/* rayon parallelized */
// TODO: Benchmark
// TODO: Benchmark. As far as I tested, this reaps no benefit.
let packets_serialized = v.par_iter().map(|x| serde_json::to_value(x).unwrap()).collect();
//let packets_serialized: Vec<serde_json::Value> = v.par_iter().map(|x| json!(x)).collect();
@ -38,7 +39,7 @@ pub fn serialize_packets(v: Vec<parser::QryData>) -> Vec<serde_json::Value> {
}
// This is way faster than serialize_packets() but can not figure out how to properly select parts of the json from db.
// This is way faster than serialize_packets() but I can not figure out how to properly select parts from the resulting json structure as an sql query
#[allow(dead_code)]
pub fn serialize_packets_as_string(v: Vec<parser::QryData>) -> Vec<serde_json::Value> {
let mut packets_serialized: Vec<serde_json::Value> = Vec::with_capacity(v.len() * 2);