modular Config initialization
This commit is contained in:
parent
2d96ddba3a
commit
483dad881c
|
@ -9,6 +9,9 @@ edition = "2018"
|
||||||
#[profile.dev]
|
#[profile.dev]
|
||||||
#opt-level = 3
|
#opt-level = 3
|
||||||
|
|
||||||
|
#[profile.test]
|
||||||
|
#postgres = "~0.1.7"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
tokio-postgres = { version="0.5.4", features = ["runtime","with-eui48-0_4","with-serde_json-1"] }
|
tokio-postgres = { version="0.5.4", features = ["runtime","with-eui48-0_4","with-serde_json-1"] }
|
||||||
tokio = { version = "0.2", features = ["full"] }
|
tokio = { version = "0.2", features = ["full"] }
|
||||||
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
|
||||||
|
// Init of configuration files could also be done via Config crate.
|
||||||
|
// But at this point of development it seems like unjustified overhead.
|
||||||
|
|
||||||
|
extern crate serde_json;
|
||||||
|
use std::fs::File;
|
||||||
|
|
||||||
|
pub struct Config {
|
||||||
|
pub filter: String,
|
||||||
|
pub insert_max: usize,
|
||||||
|
pub pcap_file: String,
|
||||||
|
pub connection: String,
|
||||||
|
pub device: String,
|
||||||
|
pub is_device: bool,
|
||||||
|
pub pcap_dir: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub fn from_json_file() -> Option<Config> {
|
||||||
|
let config_file = File::open("parser.json").expect("file should open read only");
|
||||||
|
let json: serde_json::Value = serde_json::from_reader(config_file).unwrap();
|
||||||
|
Some(Config {
|
||||||
|
filter: json.get("filter").unwrap().as_str().unwrap().to_owned(),
|
||||||
|
insert_max : json.get("insert_max").unwrap().as_u64().unwrap() as usize,
|
||||||
|
pcap_file : json.get("pcap_file").unwrap().as_str().unwrap().to_owned(),
|
||||||
|
connection : format!("host={} user={} password={}",
|
||||||
|
json.get("database_host").unwrap().as_str().unwrap(),
|
||||||
|
json.get("database_user").unwrap().as_str().unwrap(),
|
||||||
|
json.get("database_password").unwrap().as_str().unwrap(),
|
||||||
|
),
|
||||||
|
device : json.get("parse_device").unwrap().as_str().unwrap().to_owned(),
|
||||||
|
is_device : json.get("from_device").unwrap().as_bool().unwrap(),
|
||||||
|
pcap_dir : json.get("pcap_dir").unwrap().as_str().unwrap().to_owned(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
88
src/main.rs
88
src/main.rs
|
@ -4,7 +4,9 @@ extern crate tokio_postgres;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
mod parser;
|
mod parser;
|
||||||
mod serializer;
|
mod serializer;
|
||||||
|
mod configure;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
|
use rayon::prelude::*;
|
||||||
use std::io::prelude::*;
|
use std::io::prelude::*;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use tokio_postgres::types::ToSql;
|
use tokio_postgres::types::ToSql;
|
||||||
|
@ -24,37 +26,20 @@ fn query_string(insert_max: &usize) -> String {
|
||||||
insert_template
|
insert_template
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[tokio::main(core_threads = 4)] // By default, tokio_postgres uses the tokio crate as its runtime.
|
#[tokio::main(core_threads = 4)] // By default, tokio_postgres uses the tokio crate as its runtime.
|
||||||
async fn main() -> Result<(), Error> {
|
async fn main() -> Result<(), Error> {
|
||||||
/* Init values from file */
|
/* Init values from file */
|
||||||
let file = File::open("parser.json").expect("file should open read only");
|
|
||||||
|
let config: configure::Config = configure::from_json_file().unwrap();
|
||||||
let json: serde_json::Value = serde_json::from_reader(file).unwrap();
|
|
||||||
let filter = json.get("filter").unwrap().as_str().unwrap();
|
// let mut pcap_map: Hashmap<std::path::Path, bool> = HashMap::new();
|
||||||
let insert_max = json.get("insert_max").unwrap().as_u64().unwrap() as usize;
|
|
||||||
let pcap_file = json.get("pcap_file").unwrap().as_str().unwrap();
|
|
||||||
let host = [
|
// TODO: hash file metadata, so its state is comparable at different times and can be written to a db table
|
||||||
"host=",
|
// This db table should include UUIDs so it can be joined effectively
|
||||||
json.get("database_host").unwrap().as_str().unwrap(),
|
|
||||||
]
|
|
||||||
.join("");
|
|
||||||
let user = [
|
|
||||||
"user=",
|
|
||||||
json.get("database_user").unwrap().as_str().unwrap(),
|
|
||||||
]
|
|
||||||
.join("");
|
|
||||||
let password = [
|
|
||||||
"password=",
|
|
||||||
json.get("database_password").unwrap().as_str().unwrap(),
|
|
||||||
]
|
|
||||||
.join("");
|
|
||||||
let connection = [host, user, password].join(" ");
|
|
||||||
let device = json.get("parse_device").unwrap().as_str().unwrap();
|
|
||||||
let is_device = json.get("from_device").unwrap().as_bool().unwrap();
|
|
||||||
let pcap_dir = json.get("pcap_dir").unwrap().as_str().unwrap();
|
|
||||||
|
|
||||||
let mut pcap_map = HashMap::new();
|
let mut pcap_map = HashMap::new();
|
||||||
if let Ok(entries) = fs::read_dir(pcap_dir) {
|
if let Ok(entries) = fs::read_dir(config.pcap_dir) {
|
||||||
for entry in entries {
|
for entry in entries {
|
||||||
if let Ok(entry) = entry {
|
if let Ok(entry) = entry {
|
||||||
if let Ok(_file_type) = entry.file_type() {
|
if let Ok(_file_type) = entry.file_type() {
|
||||||
|
@ -78,7 +63,7 @@ async fn main() -> Result<(), Error> {
|
||||||
}
|
}
|
||||||
println!("{:?}", pcap_map.iter());
|
println!("{:?}", pcap_map.iter());
|
||||||
/* db connection */
|
/* db connection */
|
||||||
let (client, connection) = tokio_postgres::connect(&connection, NoTls).await?;
|
let (client, connection) = tokio_postgres::connect(&config.connection, NoTls).await?;
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = connection.await {
|
if let Err(e) = connection.await {
|
||||||
|
@ -97,15 +82,16 @@ async fn main() -> Result<(), Error> {
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
/* device or file input */
|
/* device or file input */
|
||||||
if false == is_device {
|
if false == config.is_device {
|
||||||
let v: Vec<parser::QryData> = parser::parse(&pcap_file, &filter);
|
for _pcap_file in pcap_map.keys() {
|
||||||
|
let v: Vec<parser::QryData> = parser::parse(&_pcap_file, &config.filter);
|
||||||
let packets_serialized = serializer::serialize_packets(v);
|
let packets_serialized = serializer::serialize_packets(v);
|
||||||
|
|
||||||
/* Query */
|
/* Query */
|
||||||
//let insert_max = 60;
|
|
||||||
let chunk_count = packets_serialized.len() / insert_max;
|
let chunk_count = packets_serialized.len() / config.insert_max;
|
||||||
let remainder: usize = packets_serialized.len() % insert_max;
|
let remainder: usize = packets_serialized.len() % config.insert_max;
|
||||||
let chunker = &packets_serialized.len() < &insert_max;
|
let chunker = &packets_serialized.len() < &config.insert_max;
|
||||||
match chunker {
|
match chunker {
|
||||||
true => {
|
true => {
|
||||||
let insert_str = query_string(&packets_serialized.len());
|
let insert_str = query_string(&packets_serialized.len());
|
||||||
|
@ -119,11 +105,11 @@ async fn main() -> Result<(), Error> {
|
||||||
}
|
}
|
||||||
|
|
||||||
false => {
|
false => {
|
||||||
let insert_str = query_string(&insert_max);
|
let insert_str = query_string(&config.insert_max);
|
||||||
let statement = client.prepare(&insert_str).await?;
|
let statement = client.prepare(&insert_str).await?;
|
||||||
|
|
||||||
for _i in 0..chunk_count {
|
for _i in 0..chunk_count {
|
||||||
let (_input, _) = packets_serialized.split_at(insert_max);
|
let (_input, _) = packets_serialized.split_at(config.insert_max);
|
||||||
client
|
client
|
||||||
.query_raw(&statement, _input.to_vec().iter().map(|p| p as &dyn ToSql))
|
.query_raw(&statement, _input.to_vec().iter().map(|p| p as &dyn ToSql))
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -147,11 +133,12 @@ async fn main() -> Result<(), Error> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
let insert_str = query_string(&insert_max);
|
let insert_str = query_string(&config.insert_max);
|
||||||
let statement = client.prepare(&insert_str).await?;
|
let statement = client.prepare(&insert_str).await?;
|
||||||
loop {
|
loop {
|
||||||
let v: Vec<parser::QryData> = parser::parse_device(&device, &filter, &insert_max);
|
let v: Vec<parser::QryData> = parser::parse_device(&config.device, &config.filter, &config.insert_max);
|
||||||
let packets_serialized = serializer::serialize_packets(v);
|
let packets_serialized = serializer::serialize_packets(v);
|
||||||
client
|
client
|
||||||
.query_raw(
|
.query_raw(
|
||||||
|
@ -164,28 +151,3 @@ async fn main() -> Result<(), Error> {
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_insert_json() {
|
|
||||||
use serde_json::json;
|
|
||||||
let mut client =
|
|
||||||
Client::connect("host=localhost user=postgres password=password", NoTls).unwrap();
|
|
||||||
let john = json!({
|
|
||||||
"name": "John Doe",
|
|
||||||
"age": 43,
|
|
||||||
"phones": [
|
|
||||||
"+44 1234567",
|
|
||||||
"+44 2345678"
|
|
||||||
],
|
|
||||||
"empty": []
|
|
||||||
});
|
|
||||||
|
|
||||||
client
|
|
||||||
.execute("DROP TABLE IF EXISTS json_dump", &[])
|
|
||||||
.unwrap();
|
|
||||||
client.execute(
|
|
||||||
"CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, data json NOT NULL)",
|
|
||||||
&[],
|
|
||||||
);
|
|
||||||
client.query("INSERT INTO json_dump ( data ) VALUES ($1)", &[&john]);
|
|
||||||
}
|
|
||||||
|
|
|
@ -45,7 +45,7 @@ fn flag_carnage(re: &Regex, payload: &[u8]) -> Option<String> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn parse(parse_file: &str, filter_str: &str) -> Vec<QryData> {
|
pub fn parse(parse_file: &std::path::Path, filter_str: &str) -> Vec<QryData> {
|
||||||
let ether_init = build_ether();
|
let ether_init = build_ether();
|
||||||
|
|
||||||
let mut me = QryData {
|
let mut me = QryData {
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
//extern crate serde_json;
|
||||||
|
//extern crate tokio;
|
||||||
|
//extern crate tokio_postgres;
|
||||||
|
//use std::fs::File;
|
||||||
|
//mod parser;
|
||||||
|
//mod serializer;
|
||||||
|
//use std::fs;
|
||||||
|
//use std::io::prelude::*;
|
||||||
|
//use std::collections::HashMap;
|
||||||
|
//use tokio_postgres::types::ToSql;
|
||||||
|
//use tokio_postgres::{Error, NoTls};
|
||||||
|
//
|
||||||
|
//
|
||||||
|
//#[test]
|
||||||
|
//fn test_insert_json() {
|
||||||
|
// use serde_json::json;
|
||||||
|
// let mut client =
|
||||||
|
// tokio_postgres::connect("host=localhost user=postgres password=password", NoTls).unwrap();
|
||||||
|
// let john = json!({
|
||||||
|
// "name": "John Doe",
|
||||||
|
// "age": 43,
|
||||||
|
// "phones": [
|
||||||
|
// "+44 1234567",
|
||||||
|
// "+44 2345678"
|
||||||
|
// ],
|
||||||
|
// "empty": []
|
||||||
|
// });
|
||||||
|
//
|
||||||
|
// client
|
||||||
|
// .execute("DROP TABLE IF EXISTS json_dump", &[])
|
||||||
|
// .unwrap();
|
||||||
|
// client.execute(
|
||||||
|
// "CREATE TABLE json_dump ( ID serial NOT NULL PRIMARY KEY, data json NOT NULL)",
|
||||||
|
// &[],
|
||||||
|
// );
|
||||||
|
// client.query("INSERT INTO json_dump ( data ) VALUES ($1)", &[&john]);
|
||||||
|
//}
|
Loading…
Reference in New Issue