parallelized serialization with rayon

This commit is contained in:
gurkenhabicht 2020-05-18 16:51:48 +02:00
parent cfa405b5c2
commit 7d351101b7
1 changed files with 16 additions and 3 deletions

View File

@ -1,6 +1,7 @@
extern crate tokio_postgres; extern crate tokio_postgres;
extern crate serde_json; extern crate serde_json;
extern crate tokio; extern crate tokio;
use rayon::prelude::*;
use serde::ser::{Serialize, Serializer, SerializeStruct}; use serde::ser::{Serialize, Serializer, SerializeStruct};
mod parser; mod parser;
use std::fs::File; use std::fs::File;
@ -33,6 +34,14 @@ impl Serialize for parser::QryData {
state.serialize_field("ipv4_header.ip_fragment_offset", &self.ipv4_header.ip_fragment_offset)?; state.serialize_field("ipv4_header.ip_fragment_offset", &self.ipv4_header.ip_fragment_offset)?;
state.serialize_field("ipv4_header.ip_source_address", &self.ipv4_header.ip_source_address)?; state.serialize_field("ipv4_header.ip_source_address", &self.ipv4_header.ip_source_address)?;
state.serialize_field("ipv4_header.ip_destination_address", &self.ipv4_header.ip_destination_address)?; state.serialize_field("ipv4_header.ip_destination_address", &self.ipv4_header.ip_destination_address)?;
state.serialize_field("ipv6_header.version", &self.ipv6_header.version)?;
state.serialize_field("ipv6_header.traffic_class", &self.ipv6_header.traffic_class)?;
state.serialize_field("ipv6_header.flow_label" , &self.ipv6_header.flow_label)?;
state.serialize_field("ipv6_header.payload_length" , &self.ipv6_header.payload_length)?;
state.serialize_field("ipv6_header.next_header" , &self.ipv6_header.next_header)?;
state.serialize_field("ipv6_header.hop_limit" , &self.ipv6_header.hop_limit)?;
state.serialize_field("ipv6_header.source_address" , &self.ipv6_header.source_address)?;
state.serialize_field("ipv6_header.destination_address" , &self.ipv6_header.destination_address)?;
state.serialize_field("tcp_header.source_port", &self.tcp_header.source_port)?; state.serialize_field("tcp_header.source_port", &self.tcp_header.source_port)?;
state.serialize_field("tcp_header.destination_port", &self.tcp_header.destination_port)?; state.serialize_field("tcp_header.destination_port", &self.tcp_header.destination_port)?;
state.serialize_field("tcp_header.seq_num", &self.tcp_header.seq_num)?; state.serialize_field("tcp_header.seq_num", &self.tcp_header.seq_num)?;
@ -58,10 +67,14 @@ impl Serialize for parser::QryData {
fn serialize_packets ( v: Vec<parser::QryData> ) -> Vec<serde_json::Value> { fn serialize_packets ( v: Vec<parser::QryData> ) -> Vec<serde_json::Value> {
let mut packets_serialized: Vec<_> = Vec::new(); let mut packets_serialized: Vec<_> = Vec::new();
// for packet in v.iter() {
// // packets_serialized.push(json!(&packet));
// }
for packet in v.iter() { /* rayon parallelized */
packets_serialized.push(json!(&packet)); packets_serialized = v.par_iter().map( |x| json!(x) ).collect();
}
packets_serialized packets_serialized
} }