commit 95a3411fc253e23af02e13c06a2cca01a71ebb66 Author: gurkenhabicht Date: Wed May 13 00:03:45 2020 +0200 init diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..8053002 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,185 @@ +extern crate tokio_postgres; +extern crate serde_json; +extern crate tokio; +use serde::ser::{Serialize, Serializer, SerializeStruct}; +mod parser; +use std::fs::File; +use serde_json::json; + +use tokio_postgres::types::ToSql; +//use futures::{TryStreamExt}; +use tokio_postgres::{NoTls, Error}; + + +impl Serialize for parser::QryData { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + // 34 is the number of fields in the struct. + let mut state = serializer.serialize_struct("parser::QryData", 34)?; + state.serialize_field("time", &self.time)?; + state.serialize_field("ether_header.ether_dhost", &self.ether_header.ether_dhost)?; + state.serialize_field("ether_header.ether_shost", &self.ether_header.ether_shost)?; + state.serialize_field("ether_header.ether_type", &self.ether_header.ether_type)?; + state.serialize_field("ipv4_header.ip_version", &self.ipv4_header.ip_version)?; + state.serialize_field("ipv4_header.ip_ihl", &self.ipv4_header.ip_ihl)?; + state.serialize_field("ipv4_header.ip_dscp", &self.ipv4_header.ip_dscp)?; + state.serialize_field("ipv4_header.ip_ecn", &self.ipv4_header.ip_ecn)?; + state.serialize_field("ipv4_header.ip_total_length", &self.ipv4_header.ip_total_length)?; + state.serialize_field("ipv4_header.ip_identification", &self.ipv4_header.ip_identification)?; + state.serialize_field("ipv4_header.ip_df", &self.ipv4_header.ip_df)?; + state.serialize_field("ipv4_header.ip_mf", &self.ipv4_header.ip_mf)?; + state.serialize_field("ipv4_header.ip_fragment_offset", &self.ipv4_header.ip_fragment_offset)?; + state.serialize_field("ipv4_header.ip_source_address", &self.ipv4_header.ip_source_address)?; + state.serialize_field("ipv4_header.ip_destination_address", &self.ipv4_header.ip_destination_address)?; + state.serialize_field("tcp_header.source_port", &self.tcp_header.source_port)?; + state.serialize_field("tcp_header.destination_port", &self.tcp_header.destination_port)?; + state.serialize_field("tcp_header.seq_num", &self.tcp_header.seq_num)?; + state.serialize_field("tcp_header.ack_num", &self.tcp_header.ack_num)?; + state.serialize_field("tcp_header.data_offset", &self.tcp_header.data_offset)?; + state.serialize_field("tcp_header.reserved", &self.tcp_header.reserved)?; + state.serialize_field("tcp_header.ns", &self.tcp_header.ns)?; + state.serialize_field("tcp_header.cwr", &self.tcp_header.cwr)?; + state.serialize_field("tcp_header.ece", &self.tcp_header.ece)?; + state.serialize_field("tcp_header.urg", &self.tcp_header.urg)?; + state.serialize_field("tcp_header.ack", &self.tcp_header.ack)?; + state.serialize_field("tcp_header.psh", &self.tcp_header.psh)?; + state.serialize_field("tcp_header.rst", &self.tcp_header.rst)?; + state.serialize_field("tcp_header.syn", &self.tcp_header.syn)?; + state.serialize_field("tcp_header.fin", &self.tcp_header.fin)?; + state.serialize_field("tcp_header.window_size", &self.tcp_header.window_size)?; + state.serialize_field("tcp_header.checksum", &self.tcp_header.checksum)?; + state.serialize_field("tcp_header.urgent_pointer", &self.tcp_header.urgent_pointer)?; + state.serialize_field("data", &self.data)?; + state.end() + } +} + +fn serialize_packets ( v: Vec ) -> Vec { + let mut packets_serialized: Vec<_> = Vec::new(); + + for packet in v.iter() { + packets_serialized.push(json!(&packet)); + } + packets_serialized +} + +fn query_string ( insert_max: &usize ) -> String { + let mut insert_template: String = "INSERT INTO json_dump (packet) Values ".to_owned(); + + for insert in 0..insert_max-1 { insert_template.push_str( &(format!("(${}), ", insert+1)) );} + insert_template.push_str( &(format!("(${})", insert_max)) ); + + insert_template +} + +#[tokio::main(core_threads = 4)] // By default, tokio_postgres uses the tokio crate as its runtime. +async fn main() -> Result<(), Error> { + + /* Init values from file */ + let file = File::open("parser.json").expect("file should open read only"); + + let json: serde_json::Value = serde_json::from_reader(file).unwrap(); + let filter = json.get("filter").unwrap().as_str().unwrap(); + let insert_max = json.get("insert_max").unwrap().as_u64().unwrap() as usize; + let pcap_file = json.get("pcap_file").unwrap().as_str().unwrap(); + let host = ["host=", json.get("database_host").unwrap().as_str().unwrap()].join(""); + let user = ["user=", json.get("database_user").unwrap().as_str().unwrap()].join(""); + let password = ["password=", json.get("database_password").unwrap().as_str().unwrap()].join(""); + let connection = [host, user, password].join(" "); + let device = json.get("parse_device").unwrap().as_str().unwrap(); + let is_device = json.get("from_device").unwrap().as_bool().unwrap(); + + + /* db connection */ + let (client, connection) = + tokio_postgres::connect(&connection, NoTls).await?; + + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + + client.execute("DROP TABLE IF EXISTS json_dump", &[]).await?; + client.execute("CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)", &[]).await?; + + /* device or file input */ + if false == is_device { + + let v: Vec = parser::parse(&pcap_file, &filter ); + let packets_serialized = serialize_packets( v ); + + /* Query */ + //let insert_max = 60; + let chunk_count = packets_serialized.len()/insert_max; + let remainder: usize = packets_serialized.len() % insert_max; + let chunker = &packets_serialized.len() < &insert_max; + match chunker { + true => { + let insert_str = query_string( &packets_serialized.len() ); + let statement_false = client.prepare( &insert_str ).await?; + client.query_raw(&statement_false, packets_serialized.iter().map(|p| p as &dyn ToSql)).await?; + }, + + false => { + + let insert_str = query_string( &insert_max ); + let statement = client.prepare( &insert_str ).await?; + + + for _i in 0..chunk_count { + let (_input, _)= packets_serialized.split_at(insert_max); + client.query_raw(&statement, _input.to_vec().iter().map(|p| p as &dyn ToSql)).await?; + } + + println!("Packets, total:{:?}",packets_serialized.len()); + println!("Chunks, total:{}", chunk_count); + println!("Chunks, remainder{}", remainder); + + if remainder > 0 { + let rem_str = query_string( &remainder ); + let statement_remainder = client.prepare(&rem_str).await?; + let (_garbage, _input) =packets_serialized.split_at(packets_serialized.len()-remainder); + client.query_raw(&statement_remainder, _input.to_vec().iter().map(|p| p as &dyn ToSql),).await?; + } + + } + } + + } else { + let insert_str = query_string(&insert_max); + let statement = client.prepare(&insert_str).await?; + loop { + let v: Vec = parser::parse_device(&device, &filter, &insert_max); + let packets_serialized = serialize_packets( v ); + client.query_raw(&statement, packets_serialized.iter().map(|p| p as &dyn ToSql),).await?; + } + } + + + Ok(()) + +} + +#[test] +fn test_insert_json () { +use serde_json::json; + let mut client = Client::connect("host=localhost user=postgres password=password", NoTls).unwrap(); + let john = json!({ + "name": "John Doe", + "age": 43, + "phones": [ + "+44 1234567", + "+44 2345678" + ], + "empty": [] + }); + + client.execute("DROP TABLE IF EXISTS json_dump", &[]).unwrap(); + client.execute("CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, data json NOT NULL)", &[]); + client.query("INSERT INTO json_dump ( data ) VALUES ($1)", &[&john]); +} + + diff --git a/src/parser.json b/src/parser.json new file mode 100644 index 0000000..c7a935c --- /dev/null +++ b/src/parser.json @@ -0,0 +1,10 @@ +{ + "insert_max": 80, + "filter": "tcp && !ip6", + "from_device": true, + "parse_device": "enp7s0", + "pcap_file": "../target/wohnung2.pcapng", + "database_user": "postgres", + "database_host": "localhost", + "database_password": "password" +} diff --git a/src/parser.rs b/src/parser.rs new file mode 100644 index 0000000..c0100ae --- /dev/null +++ b/src/parser.rs @@ -0,0 +1,150 @@ +extern crate byteorder; +extern crate bitfield; +extern crate eui48; +mod packet_handler; +use pcap::Capture; +use eui48::MacAddress; +use std::net::{IpAddr, Ipv4Addr}; +use std::str; + +fn build_ether () -> packet_handler::EtherHeader { + packet_handler::EtherHeader { + ether_dhost: (MacAddress::new([0;6])).to_hex_string(), + ether_shost: (MacAddress::new([0;6])).to_hex_string(), + ether_type: 0, + } +} + +fn build_ipv4 () -> packet_handler::IpV4Header { + packet_handler::IpV4Header { + ip_version: 0, + ip_ihl: 0, + ip_dscp: 0, + ip_ecn: 0, + ip_total_length: 0, + ip_identification: 0, + ip_df: 0, + ip_mf: 0, + ip_fragment_offset: 0, + ip_time_to_live: 0, + ip_protocol: 0, + ip_header_checksum: 0, + ip_source_address: IpAddr::V4(Ipv4Addr::new(0,0,0,0)), + ip_destination_address: IpAddr::V4(Ipv4Addr::new(0,0,0,0)), + } +} + +fn build_tcp () -> packet_handler::TcpHeader { + packet_handler::TcpHeader { + source_port: 0, + destination_port: 0, + seq_num: 0, + ack_num: 0, + data_offset: 0, + reserved: 0, + ns: 0, + cwr: 0, + ece: 0, + urg: 0, + ack: 0, + psh: 0, + rst: 0, + syn: 0, + fin: 0, + window_size: 0, + checksum: 0, + urgent_pointer: 0, + } +} + +// TODO: wrap packet_handler types inside Option +#[derive(Debug, Clone)] +pub struct QryData{ + pub id: i32, + pub time: f64, + pub data: Option>, + pub ether_header: packet_handler::EtherHeader, + pub ipv4_header: packet_handler::IpV4Header, + pub tcp_header: packet_handler::TcpHeader, +} + +pub fn parse (parse_file: &str, filter_str: &str) -> Vec { + let ether_init = build_ether(); + let ipv4_init = build_ipv4(); + let tcp_init = build_tcp(); + + let mut me = QryData { + id: 0, + time: 0.0, + data: None, + ether_header: ether_init, + ipv4_header: ipv4_init, + tcp_header: tcp_init, + }; + let mut v: Vec = Vec::new(); + + //let mut cap = Capture::from_file("../target/wohnung2.pcapng").unwrap(); + let mut cap = Capture::from_file(parse_file).unwrap(); + //let filter_str: String = "tcp && !ip6".to_owned(); + //let filter_str: String = "tcp && ip dst 18.202.15.201".to_owned(); + //let filter_str: String = "tcp && ip dst 192.168.0.7".to_owned(); + //println!("{:?}", filter_str); + Capture::filter(&mut cap, &filter_str).unwrap(); + + while let Ok(packet) = cap.next() { + me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64; + me.data = Some(packet.data.to_vec()); + me.ether_header = packet_handler::ethernet_handler( packet.data ); + if 8 == me.ether_header.ether_type { + me.ipv4_header = packet_handler::ip_handler( packet.data ); + if 6 == me.ipv4_header.ip_protocol { + me.tcp_header = packet_handler::tcp_handler( me.ipv4_header.ip_ihl, packet.data ); + me.data= packet_handler::payload_handler( me.ipv4_header.ip_ihl, me.tcp_header.data_offset, packet.data); + } + + } + + v.push(QryData{id:0, time:me.time, data: me.data, ether_header:me.ether_header, ipv4_header: me.ipv4_header, tcp_header: me.tcp_header}); + + } + v +} + +pub fn parse_device (parse_device: &str, filter_str: &str, insert_max: &usize) -> Vec { + let ether_init = build_ether(); + let ipv4_init = build_ipv4(); + let tcp_init = build_tcp(); + + let mut me = QryData { + id: 0, + time: 0.0, + data: None, + ether_header: ether_init, + ipv4_header: ipv4_init, + tcp_header: tcp_init, + }; + let mut v: Vec = Vec::new(); + let mut cap = Capture::from_device(parse_device).unwrap().open().unwrap(); + Capture::filter(&mut cap, &filter_str).unwrap(); + + 'parse: while let Ok(packet) = cap.next(){ + me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64; + me.data = Some(packet.data.to_vec()); + me.ether_header = packet_handler::ethernet_handler( packet.data ); + if 8 == me.ether_header.ether_type { + me.ipv4_header = packet_handler::ip_handler( packet.data ); + if 6 == me.ipv4_header.ip_protocol { + me.tcp_header = packet_handler::tcp_handler( me.ipv4_header.ip_ihl, packet.data ); + me.data= packet_handler::payload_handler( me.ipv4_header.ip_ihl, me.tcp_header.data_offset, packet.data); + } + + } + + v.push(QryData{id:0, time:me.time, data: me.data, ether_header:me.ether_header, ipv4_header: me.ipv4_header, tcp_header: me.tcp_header}); + if &v.len() >= insert_max { + break 'parse; + } + } + v +} + diff --git a/src/parser/packet_handler.rs b/src/parser/packet_handler.rs new file mode 100644 index 0000000..6613f44 --- /dev/null +++ b/src/parser/packet_handler.rs @@ -0,0 +1,204 @@ +extern crate eui48; +extern crate byteorder; +extern crate bitfield; + +use byteorder::{ByteOrder, LittleEndian}; +use eui48::{MacAddress, Eui48}; +use std::net::{IpAddr, Ipv4Addr}; +use bitfield::bitfield; + + +/* ethernet */ +const ETH_ALEN: usize = 6; +const ETH_TLEN: usize = 2; +const ETHER_HDRLEN: usize = 14; + + +#[derive(Debug, Clone)] +pub struct EtherHeader { +// pub ether_dhost: MacAddress, +// pub ether_shost: MacAddress, + pub ether_dhost: String, + pub ether_shost: String, + pub ether_type: i32, +} + +pub fn ethernet_handler ( packet_data: &[u8] ) -> EtherHeader { + let mut _ether_dhost: [u8; ETH_ALEN] = [0; ETH_ALEN]; + let mut _ether_shost: [u8; ETH_ALEN] = [0; ETH_ALEN]; + let mut _ether_type: u16 = 0; + + _ether_dhost.clone_from_slice(&packet_data[0..ETH_ALEN]); + + //println!("{:?}", (&(_ether_dhost).to_owned())); + _ether_shost.clone_from_slice(&packet_data[ETH_ALEN..ETH_ALEN*2]); + _ether_type = LittleEndian::read_u16(&packet_data[ETH_ALEN*2..(ETH_ALEN*2)+ETH_TLEN]); + + EtherHeader { + ether_dhost: (MacAddress::new(_ether_dhost as Eui48).to_hex_string()), + ether_shost: (MacAddress::new(_ether_shost as Eui48).to_hex_string()), + ether_type: _ether_type as i32, + } +} + +/* ip */ +#[derive(Debug,Copy, Clone)] +pub struct IpV4Header { + pub ip_version: u32, + pub ip_ihl: u32, + pub ip_dscp: u32, + pub ip_ecn: u32, + pub ip_total_length: u32, + pub ip_identification: u32, + pub ip_df: u32, + pub ip_mf: u32, + pub ip_fragment_offset: u32, + pub ip_time_to_live: u32, + pub ip_protocol: u32, + pub ip_header_checksum: u32, + pub ip_source_address: IpAddr, + pub ip_destination_address: IpAddr, + +} + +bitfield! { + struct BitfieldIpV4Header(MSB0 [u8]); + impl Debug; + u32; + get_version, _: 3, 0; + get_ihl, _: 7, 4; + get_dscp, _: 13, 8; + get_ecn, _: 15, 14; + get_total_length, _: 31, 16; + get_identification, _: 47, 32; + get_df, _: 49; + get_mf, _: 50; + get_fragment_offset, _: 63, 51; + get_time_to_live, _: 71, 64; + get_protocol, _: 79, 72; + get_header_checksum, _: 95, 80; + u8, get_source_address, _: 103, 96, 4; + u32, into Ipv4Addr, get_destination_address, _: 159, 128; +} + +impl + AsMut<[u8]>> BitfieldIpV4Header { + fn get_source_as_ip_addr(&self) -> Ipv4Addr { + let mut src = [0; 4]; + for (i, src) in src.iter_mut().enumerate() { + *src = self.get_source_address(i); + } + src.into() + } +} + +pub fn ip_handler ( packet_data: &[u8] ) -> IpV4Header { + let (_head, tail) = packet_data.split_at(ETHER_HDRLEN); + let (raw_hdr, _) = tail.split_at(20); + let mut _tail: [u8; 20] = [0; 20]; + _tail.copy_from_slice(raw_hdr); + + let ip_header = BitfieldIpV4Header(_tail); + + IpV4Header { + ip_version: ip_header.get_version(), + ip_ihl: ip_header.get_ihl(), + ip_dscp: ip_header.get_dscp(), + ip_ecn: ip_header.get_ecn(), + ip_total_length: ip_header.get_total_length(), + ip_identification: ip_header.get_identification(), + ip_df: ip_header.get_df().into(), + ip_mf: ip_header.get_mf().into(), + ip_fragment_offset: ip_header.get_fragment_offset(), + ip_time_to_live: ip_header.get_time_to_live(), + ip_protocol: ip_header.get_protocol(), + ip_header_checksum: ip_header.get_header_checksum(), + ip_source_address: IpAddr::V4(ip_header.get_source_as_ip_addr()), + ip_destination_address: IpAddr::V4(ip_header.get_destination_address()), + } +} + +/* tcp */ +#[derive(Debug,Copy,Clone)] +pub struct TcpHeader { + pub source_port: u32, + pub destination_port: u32, + pub seq_num: u32, + pub ack_num: u32, + pub data_offset: u32, + pub reserved: u32, + pub ns: u32, + pub cwr: u32, + pub ece: u32, + pub urg: u32, + pub ack: u32, + pub psh: u32, + pub rst: u32, + pub syn: u32, + pub fin: u32, + pub window_size: u32, + pub checksum: u32, + pub urgent_pointer: u32, +} + + +bitfield! { + struct BitfieldTcpHeader (MSB0 [u8]); + u32; + get_source_port, _: 15, 0; + get_destination_port, _: 31,16; + get_seq_num, _: 63,32; + get_ack_num, _: 95,64; + get_data_offset, _: 99,96; + get_reserved, _: 102,100; + get_ns, _: 103; + get_cwr, _: 104; + get_ece, _: 105; + get_urg, _: 106; + get_ack, _: 108; + get_psh, _: 108; + get_rst, _: 109; + get_syn, _: 110; + get_fin, _: 111; + get_window_size, _: 127,112; + get_checksum, _: 143,128; + get_urgent_pointer, _: 159,144; +} + +pub fn tcp_handler ( ip_hlen: u32, packet_data: &[u8] ) ->TcpHeader { + let (_head, tail) = packet_data.split_at(ETHER_HDRLEN+ip_hlen as usize * 4); + let (raw_hdr, _) = tail.split_at(20); + let mut _tail: [u8; 20] = [0; 20]; + _tail.copy_from_slice(raw_hdr); + let tcp_header = BitfieldTcpHeader(_tail); + + TcpHeader { + source_port: tcp_header.get_source_port(), + destination_port: tcp_header.get_destination_port(), + seq_num: tcp_header.get_seq_num(), + ack_num: tcp_header.get_ack_num(), + data_offset: tcp_header.get_data_offset(), + reserved: tcp_header.get_reserved(), + ns: tcp_header.get_ns().into(), + cwr: tcp_header.get_cwr().into(), + ece: tcp_header.get_ece().into(), + urg: tcp_header.get_urg().into(), + ack: tcp_header.get_ack().into(), + psh: tcp_header.get_psh().into(), + rst: tcp_header.get_rst().into(), + syn: tcp_header.get_syn().into(), + fin: tcp_header.get_fin().into(), + window_size: tcp_header.get_window_size(), + checksum: tcp_header.get_checksum(), + urgent_pointer: tcp_header.get_urgent_pointer(), + } + + +} + +/* payload */ +pub fn payload_handler ( ip_hlen: u32, data_offset: u32, packet_data : &[u8] ) -> Option> { + let (_head, tail)= packet_data.split_at(ETHER_HDRLEN+ip_hlen as usize * 4+data_offset as usize * 4); + Some(tail.to_vec()) +} + +