make the database work again
This commit is contained in:
parent
636c383e64
commit
1e6fc92e41
|
@ -1 +1,6 @@
|
||||||
/target
|
/target
|
||||||
|
*.pcap
|
||||||
|
*.pcapng
|
||||||
|
*.log
|
||||||
|
*.svg
|
||||||
|
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -14,8 +14,10 @@ edition = "2018"
|
||||||
|
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
tokio-postgres = { version="0.5.4", features = ["runtime","with-eui48-0_4","with-serde_json-1"] }
|
# tokio-postgres = { version="0.5.4", features = ["runtime","with-eui48-0_4","with-serde_json-1"] }
|
||||||
tokio = { version = "0.2", features = ["full"] }
|
tokio-postgres = { version="0.7", features = ["runtime","with-eui48-0_4","with-serde_json-1"] }
|
||||||
|
#tokio = { version = "0.2", features = ["full"] }
|
||||||
|
tokio = { version = "1.0", features = ["full"] }
|
||||||
pcap = "~0.7.0"
|
pcap = "~0.7.0"
|
||||||
byteorder = "*"
|
byteorder = "*"
|
||||||
bitfield = "*"
|
bitfield = "*"
|
||||||
|
@ -24,5 +26,5 @@ serde_json = { version = "1.0", features = ["raw_value"] }
|
||||||
serde = { version = "1.0.3", features = ["derive"] }
|
serde = { version = "1.0.3", features = ["derive"] }
|
||||||
rayon = "1.3"
|
rayon = "1.3"
|
||||||
regex = "1.3.7"
|
regex = "1.3.7"
|
||||||
futures = "~0.3.5"
|
futures = "~0.3.28"
|
||||||
jemallocator = "~0.3.2"
|
jemallocator = "~0.3.2"
|
||||||
|
|
File diff suppressed because one or more lines are too long
Before Width: | Height: | Size: 182 KiB After Width: | Height: | Size: 97 KiB |
48
src/main.rs
48
src/main.rs
|
@ -1,11 +1,12 @@
|
||||||
mod configure;
|
mod configure;
|
||||||
mod parser;
|
mod parser;
|
||||||
mod serializer;
|
mod serializer;
|
||||||
|
use std::error::Error;
|
||||||
use futures::pin_mut;
|
use futures::pin_mut;
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
use tokio_postgres::binary_copy::BinaryCopyInWriter;
|
use tokio_postgres::binary_copy::BinaryCopyInWriter;
|
||||||
use tokio_postgres::types::{ToSql, Type};
|
use tokio_postgres::types::{ToSql, Type};
|
||||||
use tokio_postgres::{Error, NoTls};
|
use tokio_postgres::{NoTls, Client};
|
||||||
|
|
||||||
extern crate jemallocator;
|
extern crate jemallocator;
|
||||||
#[global_allocator]
|
#[global_allocator]
|
||||||
|
@ -30,8 +31,25 @@ fn query_string(insert_max: &usize, table_name: &str) -> String {
|
||||||
insert_template
|
insert_template
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main(core_threads = 4)]
|
|
||||||
async fn main() -> Result<(), Error> {
|
async fn connect_to_postgres(config: &std::string::String ) -> Result<Client, Box<dyn Error>> {
|
||||||
|
// let config = "postgres://postgres:password@172.17.0.2:5432/postgres";
|
||||||
|
let (client, _connection) = tokio_postgres::connect(&config, NoTls).await?;
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = _connection.await {
|
||||||
|
eprintln!("connection error: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Do any other necessary processing with the client or connection...
|
||||||
|
|
||||||
|
Ok(client)
|
||||||
|
}
|
||||||
|
|
||||||
|
// #[tokio::main(core_threads = 4)]
|
||||||
|
// #[tokio::main(worker_threads = 4)]
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Box<dyn Error>> {
|
||||||
/* Init values from file */
|
/* Init values from file */
|
||||||
let config: configure::Config = configure::from_json_file().unwrap();
|
let config: configure::Config = configure::from_json_file().unwrap();
|
||||||
let pcap_map = configure::map_pcap_dir(&config.pcap_dir).unwrap();
|
let pcap_map = configure::map_pcap_dir(&config.pcap_dir).unwrap();
|
||||||
|
@ -41,20 +59,24 @@ async fn main() -> Result<(), Error> {
|
||||||
// This db table should include UUIDs as primary keys, so it can be joined effectively with past and future runs.
|
// This db table should include UUIDs as primary keys, so it can be joined effectively with past and future runs.
|
||||||
// TODO: Use inotify crate to update pcap_map according to files created while parser is running
|
// TODO: Use inotify crate to update pcap_map according to files created while parser is running
|
||||||
|
|
||||||
|
// println!("{:?}", pcap_map);
|
||||||
|
|
||||||
/* db connection */
|
/* db connection */
|
||||||
let (client, connection) = tokio_postgres::connect(&config.connection, NoTls).await?;
|
|
||||||
tokio::spawn(async move {
|
if let Err(err) = connect_to_postgres(&config.connection).await {
|
||||||
if let Err(e) = connection.await {
|
println!("Error chain: {:?}", err);
|
||||||
eprintln!("connection error: {}", e);
|
println!("Error: {}", err);
|
||||||
|
eprintln!("Error chain: {:?}", err);
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
let client = connect_to_postgres(&config.connection).await.unwrap();
|
||||||
client
|
client
|
||||||
.execute(&*format!("DROP TABLE IF EXISTS {}", &config.tablename), &[])
|
.execute(&*format!("DROP TABLE IF EXISTS {}", &config.tablename), &[])
|
||||||
.await?;
|
.await?;
|
||||||
client
|
client
|
||||||
.execute(
|
.execute(
|
||||||
&*format!(
|
&*format!(
|
||||||
"CREATE TABLE {} ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)",
|
"CREATE TABLE {} ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL, packet_time double precision NOT NULL)",
|
||||||
&config.tablename
|
&config.tablename
|
||||||
),
|
),
|
||||||
&[],
|
&[],
|
||||||
|
@ -78,15 +100,17 @@ async fn main() -> Result<(), Error> {
|
||||||
|
|
||||||
/* Copy data to db */
|
/* Copy data to db */
|
||||||
let sink = client
|
let sink = client
|
||||||
.copy_in("COPY json_dump(packet) from STDIN BINARY")
|
.copy_in("COPY json_dump(packet, packet_time) from STDIN BINARY")
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let writer = BinaryCopyInWriter::new(sink, &[Type::JSON]);
|
let writer = BinaryCopyInWriter::new(sink, &[Type::JSON, Type::FLOAT8]);
|
||||||
|
|
||||||
let join = task::spawn(async move {
|
let join = task::spawn(async move {
|
||||||
pin_mut!(writer);
|
pin_mut!(writer);
|
||||||
for pack in data_serialized {
|
for pack in data_serialized {
|
||||||
writer.as_mut().write(&[&pack]).await.unwrap();
|
//println!("{}", pack);
|
||||||
|
//let time : Result<f64, Error>= serde_json::from_value(pack["time"]);
|
||||||
|
writer.as_mut().write(&[&pack, &pack["time"].as_f64().unwrap()]).await.unwrap();
|
||||||
}
|
}
|
||||||
writer.finish().await.unwrap();
|
writer.finish().await.unwrap();
|
||||||
});
|
});
|
||||||
|
|
211
src/main_bkp
211
src/main_bkp
|
@ -1,211 +0,0 @@
|
||||||
extern crate serde_json;
|
|
||||||
extern crate tokio;
|
|
||||||
extern crate tokio_postgres;
|
|
||||||
|
|
||||||
mod configure;
|
|
||||||
mod parser;
|
|
||||||
mod serializer;
|
|
||||||
//use postgres::{Client, NoTls};
|
|
||||||
//use postgres::types::ToSql;
|
|
||||||
//use postgres::binary_copy::{BinaryCopyInWriter};
|
|
||||||
use tokio_postgres::types::{Type, ToSql};
|
|
||||||
use tokio_postgres::{Error, NoTls};
|
|
||||||
use tokio_postgres::binary_copy::{BinaryCopyInWriter};
|
|
||||||
use futures::{pin_mut};
|
|
||||||
use tokio::task;
|
|
||||||
use tokio::sync::mpsc;
|
|
||||||
//use std::thread::{spawn, JoinHandle};
|
|
||||||
//use std::sync::mpsc::{channel, Receiver};
|
|
||||||
//use std::sync::mpsc;
|
|
||||||
//use std::alloc::System;
|
|
||||||
//
|
|
||||||
//#[global_allocator]
|
|
||||||
//static A: System = System;
|
|
||||||
|
|
||||||
/* conditionals */
|
|
||||||
const FROM_FILE: bool = false;
|
|
||||||
const FROM_DEVICE: bool = true;
|
|
||||||
//const NON_CHUNKED: bool = true;
|
|
||||||
//const CHUNKED: bool = false;
|
|
||||||
|
|
||||||
fn query_string(insert_max: &usize, table_name: &str) -> String {
|
|
||||||
let mut insert_template = String::with_capacity(insert_max * 8 + 96);
|
|
||||||
insert_template.push_str(&*format!("INSERT INTO {} (packet) Values ", table_name));
|
|
||||||
|
|
||||||
for insert in 0..insert_max - 1 {
|
|
||||||
insert_template.push_str(&*format!("(${}), ", insert + 1));
|
|
||||||
}
|
|
||||||
insert_template.push_str(&*format!("(${})", insert_max));
|
|
||||||
|
|
||||||
insert_template
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main(core_threads = 4)]
|
|
||||||
async fn main() -> Result<(), Error> {
|
|
||||||
/* Init values from file */
|
|
||||||
let config: configure::Config = configure::from_json_file().unwrap();
|
|
||||||
let pcap_map = configure::map_pcap_dir(&config.pcap_dir).unwrap();
|
|
||||||
|
|
||||||
// TODO: Create db table with pcap file hashes
|
|
||||||
// TODO: hash file metadata, so its state is comparable with future file updates and can be written to a db table (and read e.g. after system crash)
|
|
||||||
// This db table should include UUIDs as primary keys, so it can be joined effectively with past and future runs.
|
|
||||||
// TODO: Use inotify crate to update pcap_map according to files created while parser is running
|
|
||||||
|
|
||||||
/* db connection */
|
|
||||||
let (client, connection) = tokio_postgres::connect(&config.connection, NoTls).await?;
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(e) = connection.await {
|
|
||||||
eprintln!("connection error: {}", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
client
|
|
||||||
.execute(&*format!("DROP TABLE IF EXISTS {}", &config.tablename), &[])
|
|
||||||
.await?;
|
|
||||||
client
|
|
||||||
.execute(
|
|
||||||
&*format!(
|
|
||||||
"CREATE TABLE {} ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)",
|
|
||||||
&config.tablename
|
|
||||||
),
|
|
||||||
&[],
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
/* device or file input */
|
|
||||||
match config.is_device {
|
|
||||||
FROM_FILE => {
|
|
||||||
for (_pcap_file, _pcap_info) in pcap_map.iter() {
|
|
||||||
//println!("{:?}: {:?}", &_pcap_file, &_pcap_info);
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/* MPSC channeled serialization */
|
|
||||||
// let (qry_data, h1) = parser::mpsc_parser(_pcap_file.to_owned(), config.filter.to_owned(), config.regex_filter.to_owned());
|
|
||||||
// let (data_serialized, h2) = serializer::mpsc_serialize(qry_data);
|
|
||||||
// let packets_serialized = serializer::mpsc_collect_serialized(data_serialized);
|
|
||||||
// let _r1 = h1.join().unwrap();
|
|
||||||
// let _r2 = h2.join().unwrap();
|
|
||||||
|
|
||||||
/* This is serializing data without mpsc, which results in higher memory consumption, it's faster but 12GB main memory is needed */
|
|
||||||
// let v: Vec<parser::QryData> =
|
|
||||||
// parser::parse(&_pcap_file, &config.filter, &config.regex_filter);
|
|
||||||
// let len = v.len();
|
|
||||||
|
|
||||||
/* tokio mpsc channel */
|
|
||||||
let (tx, mut rx) = mpsc::channel(1000);
|
|
||||||
let pcap_file = _pcap_file.clone();
|
|
||||||
let filter = config.filter.clone();
|
|
||||||
let regex_filter = config.regex_filter.clone();
|
|
||||||
let join_handle: task::JoinHandle<Vec<parser::QryData>> = task::spawn( async move {
|
|
||||||
parser::tokio_parse(pcap_file, &filter, ®ex_filter).await
|
|
||||||
});
|
|
||||||
let v = join_handle.await.unwrap();
|
|
||||||
for packet in v.into_iter(){
|
|
||||||
let mut tx = tx.clone();
|
|
||||||
|
|
||||||
tokio::spawn( async move {
|
|
||||||
//println!("serializing!number {:?}", i);
|
|
||||||
|
|
||||||
let packet_serialized = serializer::tokio_serialize(packet).await;
|
|
||||||
tx.send(packet_serialized).await.unwrap();
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
|
||||||
drop(tx);
|
|
||||||
// let mut packets_serialized: Vec<serde_json::Value> = Vec::new();
|
|
||||||
// while let Some(res) = rx.recv().await {
|
|
||||||
// //println!("collecting");
|
|
||||||
// packets_serialized.push(res);
|
|
||||||
// //let packets_serialized = serializer::serialize_packets(v);
|
|
||||||
// }
|
|
||||||
|
|
||||||
let sink = client.copy_in("COPY json_dump(packet) from STDIN BINARY").await.unwrap();
|
|
||||||
let writer = BinaryCopyInWriter::new(sink, &[Type::JSON]);
|
|
||||||
|
|
||||||
let join = task::spawn( async move {
|
|
||||||
pin_mut!(writer);
|
|
||||||
//for pack in packets_serialized {
|
|
||||||
while let Some(res) = rx.recv().await {
|
|
||||||
writer.as_mut().write(&[&res]).await.unwrap();
|
|
||||||
drop(res);
|
|
||||||
// Reminder: write_raw() behavior is very strange, so it's write() for now.
|
|
||||||
// writer.as_mut().write_raw(chunk.into_iter().map(|p| p as &dyn ToSql).collect()).await.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
//thread::sleep(time::Duration::from_millis(3000));
|
|
||||||
writer.finish().await.unwrap();
|
|
||||||
});
|
|
||||||
assert!(join.await.is_ok());
|
|
||||||
|
|
||||||
// TODO: MPSC channel
|
|
||||||
// let mut v = Vec::<parser::QryData>::with_capacity(100000);
|
|
||||||
// v.extend(parser::parse(&_pcap_file, &config.filter, &config.regex_filter));
|
|
||||||
// let mut packets_serialized = Vec::<serde_json::Value>::with_capacity(100000);
|
|
||||||
// packets_serialized.extend(serializer::serialize_packets(v));
|
|
||||||
// Reminder: If COPY doesn't cut it and INSERT is the way to go, uncomment and use following logic inside FROM_FILE
|
|
||||||
// /* Do chunks and query data */
|
|
||||||
// let chunker = (&packets_serialized.len() < &config.insert_max) && (0 < packets_serialized.len()) ;
|
|
||||||
// match chunker {
|
|
||||||
// NON_CHUNKED => {
|
|
||||||
// let insert_str = query_string(&packets_serialized.len(), &config.tablename);
|
|
||||||
// let statement = client.prepare(&insert_str).await?;
|
|
||||||
// client
|
|
||||||
// .query_raw(
|
|
||||||
// &statement,
|
|
||||||
// packets_serialized.iter().map(|p| p as &dyn ToSql),
|
|
||||||
// )
|
|
||||||
// .await?;
|
|
||||||
// }
|
|
||||||
// CHUNKED => {
|
|
||||||
// let insert_str = query_string(&config.insert_max, &config.tablename);
|
|
||||||
// let statement = client.prepare(&insert_str).await?;
|
|
||||||
//
|
|
||||||
// for chunk in packets_serialized.chunks_exact(config.insert_max) {
|
|
||||||
// client
|
|
||||||
// .query_raw(&statement, chunk.iter().map(|p| p as &dyn ToSql))
|
|
||||||
// .await?;
|
|
||||||
// }
|
|
||||||
// let remainder_len = packets_serialized
|
|
||||||
// .chunks_exact(config.insert_max)
|
|
||||||
// .remainder()
|
|
||||||
// .len();
|
|
||||||
// if 0 < remainder_len {
|
|
||||||
// let rem_str = query_string(&remainder_len, &config.tablename);
|
|
||||||
// let statement = client.prepare(&rem_str).await?;
|
|
||||||
// client
|
|
||||||
// .query_raw(
|
|
||||||
// &statement,
|
|
||||||
// packets_serialized
|
|
||||||
// .chunks_exact(config.insert_max)
|
|
||||||
// .remainder()
|
|
||||||
// .iter()
|
|
||||||
// .map(|p| p as &dyn ToSql),
|
|
||||||
// )
|
|
||||||
// .await?;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
FROM_DEVICE => {
|
|
||||||
let insert_str = query_string(&config.insert_max, &config.tablename);
|
|
||||||
let statement = client.prepare(&insert_str).await?;
|
|
||||||
loop {
|
|
||||||
let v: Vec<parser::QryData> = parser::parse_device(
|
|
||||||
&config.device,
|
|
||||||
&config.filter,
|
|
||||||
&config.insert_max,
|
|
||||||
&config.regex_filter,
|
|
||||||
);
|
|
||||||
let packets_serialized = serializer::serialize_packets(v);
|
|
||||||
client
|
|
||||||
.query_raw(
|
|
||||||
&statement,
|
|
||||||
packets_serialized.iter().map(|p| p as &dyn ToSql),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
|
@ -8,6 +8,6 @@
|
||||||
"pcap_dir": "../target/files",
|
"pcap_dir": "../target/files",
|
||||||
"database_tablename": "json_dump",
|
"database_tablename": "json_dump",
|
||||||
"database_user": "postgres",
|
"database_user": "postgres",
|
||||||
"database_host": "localhost",
|
"database_host": "172.17.0.2",
|
||||||
"database_password": "password"
|
"database_password": "password"
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue