From b571cb06f5142e244116515666fb6d6c75663cd8 Mon Sep 17 00:00:00 2001 From: gurkenhabicht Date: Mon, 29 Jun 2020 02:08:26 +0200 Subject: [PATCH] introduced full tokio + std mpsc chain, introduced jemallocator --- Cargo.toml | 8 + README.md | 23 ++- src/flamegraph.svg | 419 ++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 63 ++++++- src/main_bkp | 211 +++++++++++++++++++++ src/parser.json | 6 +- src/parser/mod.rs | 152 ++++++++++++--- src/serializer/mod.rs | 36 +++- 8 files changed, 875 insertions(+), 43 deletions(-) create mode 100644 src/flamegraph.svg create mode 100644 src/main_bkp diff --git a/Cargo.toml b/Cargo.toml index 5c78721..5416be7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,13 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[profile.release] +debug = true +#lto = "fat" +#codegen-units = 1 +#panic = "abort" + + [dependencies] tokio-postgres = { version="0.5.4", features = ["runtime","with-eui48-0_4","with-serde_json-1"] } tokio = { version = "0.2", features = ["full"] } @@ -20,3 +27,4 @@ serde = { version = "1.0.3", features = ["derive"] } rayon = "1.3" regex = "1.3.7" futures = "~0.3.5" +jemallocator = "~0.3.2" diff --git a/README.md b/README.md index cb02f1c..a971110 100644 --- a/README.md +++ b/README.md @@ -3,10 +3,10 @@ The software is written in Rust (2018, safe mode only). At the current state I am having fun writing in Rust and testing language features. The code should be modular enough to change any function you deem awful enough. Error handling is subpar at the moment. There is no real unit testing to speak of since switching to asynchronous functionality. Testing will come back. -This version is a successor of the _POSIX_C_SOURCE 200809L implementation in which all data parsed from a pcap/pcapng files is written as a single and simple query. The ingestion time is rather fast (tested writes: 100*10^3 tcp packets in ~1.8 sec) but may be insecure. See the other repository for more information. +This version is a successor of the _POSIX_C_SOURCE 200809L implementation in which all data parsed from a pcap/pcapng files is written as a single and simple query. The ingestion time is rather fast (tested writes: 100*10^3 tcp packets in ~1.8 sec) but the procedure may be insecure. See the other repository for more information. The idea of this iteration is to use a prepared statement and chunk the data according to maximum input. Postgres databases have a custom maximum limit on each insert query of prepared statements. Said chunk size is initialized through the config/interface file called parser.json as `insert_max`. Data can be read from PCAP/PCANG files, as well as network devices. -**UPDATE 0.2.0**: Chunking can be omitted completely when using PostgreSQL's `COPY` transferring binary data instead of using `Insert`. This is not only somewhat faster, but there are quite a few lines of code less in the end. Only parsing from network device still needs chunks. +**UPDATE 0.2.0**: Chunking (more on this in the next paragraph) can be omitted completely when using PostgreSQL's `COPY` transferring binary data instead of using `Insert`. This is not only somewhat faster, but there are quite a few lines of code less in the end. Only parsing from network device still needs chunks. The other recent change is that only none NULL protocol data of a packet is serialized to json. Table insertion should be smaller this way. Process is as follows: @@ -22,18 +22,13 @@ Process is as follows: - Prepared statements are prepared according to chunksize - Queried data gets queried in chunks afterwards -Currently, ethernet, IPv4, IPV6, TCP, UDP and ARP/RARP network protocols are handled. +Currently, ethernet, IPv4, IPV6, TCP, UDP and ARP/RARP network protocols are handled any additional session layer/wrapped data can be found in packet->data[u8] -- for now. Because of testing purposes, layout of the table is serialized json. Table layout is somewhat "dynamic". Any procotols not recognized in a parsed packet will be marked as NULL inside a resulting table row. -A query may look like this `select packet from json_dump where packet->>'ipv4_header' is not null;` - -Speaking of serialization: After profiling it turns out that ~20% of cpu time is used for serialization to json. This, of course, could be saved completely. +A query may look like this `select packet->>'ipv4_header' from json_dump;` or this `select packet from json_dump where packet->>'reg_res' is not null;` to show parsed datai via regex. Another subgoal was the ability to compile a static binary, which --last time I tested-- works without dependencies, but the need for libpcap itself. It even executes on oracle linux, after linking against the elf64 interpreter in a direct manner. If you ever had the pleasure using this derivate it may come as a suprise to you. The key is to compile via `x86_64-unknown-linux-musl` target. See: https://doc.rust-lang.org/edition-guide/rust-2018/platform-and-target-support/musl-support-for-fully-static-binaries.html Caveats: Regex Syntax is limited at the moment, because it is not compiled from a Rawstring, but a common one. Escaping does not work properly, character classes do. I have to fiddle the correct synctactical way to get it out of the json file and into a raw. For already supported regular expression syntax see: https://docs.rs/regex/1.3.9/regex/#syntax , also see the example in `parser.json`. -Transmitting all the data of the formerly described testing table layout results in a rather big table size. HDD space was no issue so far. Ingest of 30808676 TCP/IP Packets taken from iCTF 2020 PCAPs results in 99.4GB of json data. See: https://docs.docker.com/engine/reference/run/#runtime-constraints-on-resources for more details. - -Gotchas: My test setup consists of a postgresql db inside a docker container. Main memory usage of said container is low ~300MB, but I had to set `--oom-score-adj=999` in order to not get the container quit automatically. `--oom-kill-disable=false` would turn it off complete, I guess. I did no fine tuning of this value, yet. If this whole thing turns out to be viable, some future features may be: @@ -49,3 +44,13 @@ There are many other things left to be desired. Bechmarking was done with the identical file that was used in the previous C implementation. Inserting none chunked data resulted in ~20 minutes of querying to database. Now, chunked data is below 12 seconds after compiler optimization. Speaking of optimization: Do yourself a favor an run release code not debug code: `cargo run --release`. The compiler does a rather hefty optimization and you will save some time waiting for your precious data do be inserted. I did no further optimization besides trying to enable the compiler to do a better job. Just blackboxing, no assembly tweaking yet. + + +** TESTRUNS **: + +Run 001 at 24.06.2020 of iCTF2020 PCAP (bpf filter: 'tcp') files resulted in a table of roundabout 74GB size (`$du -hs`), 30808676 rows, `cargo run --release 3627,41s user 156,47s system 23% cpu 4:29:19,27 total` . PostgreSQL12 server used was a vanilla `docker pull postgres` container on a 2008 Macbook, 2,4GHz dual core, 6GB RAM connected via wifi. +Memory usage of the Client was at about 11.5GB out of 14.7GB which results in 0.78 utilization. (An tokio mpsc pipe will be the next improvement. Thus, memory usage may be less afterwards) +Run 002 at 25.06.2020 of iCTF2020 PCAP (bpf filter: 'tcp') files resulted in a table size of roundabout 74GB size ('&du -hs')m 30808676 rows,`cargo run --release 3669,68s user 163,23s system 23% cpu 4:27:19,14 total`. PostgreSQL12 server used was a vanilla `docker pull postgres` container on a 2008 Macbook, 2,4GHz dual core, 6GB RAM connected via wifi. Memory usage of the Client was at about 11.5GB out of 14.7GB which results in 0.78 utilization. (An tokio mpsc pipe will be the next improvement. Thus, memory usage may be less afterwards) +Run 003 cargo run --release 3847,69s user 236,93s system 25% cpu 4:22:45,90 total +Run 004 cargo run --release 1176,24s user 146,11s system 30% cpu 1:12:49,93 total on localhost docker +Run 005 cargo run --release 1181,33s user 139,35s system 29% cpu 1:15:40,24 total on localhost docker diff --git a/src/flamegraph.svg b/src/flamegraph.svg new file mode 100644 index 0000000..1a2ade7 --- /dev/null +++ b/src/flamegraph.svg @@ -0,0 +1,419 @@ +Flame Graph Reset ZoomSearch [[heap]] (54 samples, 0.87%)bytes::buf::buf_mut::BufMut::put (65 samples, 1.05%)core::ptr::drop_in_place (98 samples, 1.58%)serde_json::value::ser::<impl serde::ser::Serialize for serde_json::value::Value>::serialize (12 samples, 0.19%)[anon] (253 samples, 4.07%)[ano..bytes::buf::buf_mut::BufMut::put (146 samples, 2.35%)b..core::intrinsics::copy_nonoverlapping (43 samples, 0.69%)[tpcpr] (161 samples, 2.59%)[t..__memmove_avx_unaligned_erms (22 samples, 0.35%)bytes::buf::buf_mut::BufMut::put (101 samples, 1.63%)core::ptr::drop_in_place (586 samples, 9.43%)core::ptr::dr..serde_json::value::ser::<impl serde::ser::Serialize for serde_json::value::Value>::serialize (182 samples, 2.93%)se..<serde_json::number::Number as serde::ser::Serialize>::serialize (27 samples, 0.43%)<&mut serde_json::ser::Serializer<W,F> as serde::ser::Serializer>::serialize_u64 (27 samples, 0.43%)serde_json::ser::Formatter::write_u64 (27 samples, 0.43%)std::io::Write::write_all (27 samples, 0.43%)<bytes::buf::ext::writer::Writer<B> as std::io::Write>::write (27 samples, 0.43%)[unknown] (952 samples, 15.33%)[unknown]core::ptr::drop_in_place (10 samples, 0.16%)<std::sync::mutex::MutexGuard<T> as core::ops::drop::Drop>::drop (10 samples, 0.16%)std::sys_common::mutex::Mutex::raw_unlock (10 samples, 0.16%)std::sys::unix::mutex::Mutex::unlock (10 samples, 0.16%)__pthread_mutex_unlock_usercnt (9 samples, 0.14%)__pthread_cond_wait (25 samples, 0.40%)std::sys::unix::condvar::Condvar::wait (27 samples, 0.43%)std::sys_common::condvar::Condvar::wait (28 samples, 0.45%)std::sync::condvar::Condvar::wait (31 samples, 0.50%)tokio::runtime::park::Inner::park_condvar (49 samples, 0.79%)mio::poll::ReadinessQueue::poll (11 samples, 0.18%)mio::sys::unix::awakener::pipe::Awakener::cleanup (7 samples, 0.11%)mio::sys::unix::epoll::Selector::select (9 samples, 0.14%)epoll_wait (7 samples, 0.11%)mio::poll::Poll::poll (36 samples, 0.58%)mio::poll::Poll::poll1 (36 samples, 0.58%)mio::poll::Poll::poll2 (32 samples, 0.52%)<tokio::park::either::Either<A,B> as tokio::park::Park>::park (44 samples, 0.71%)<tokio::io::driver::Driver as tokio::park::Park>::park (44 samples, 0.71%)tokio::io::driver::Driver::turn (42 samples, 0.68%)tokio::time::driver::Driver<T>::process (9 samples, 0.14%)<tokio::park::either::Either<A,B> as tokio::park::Park>::park (57 samples, 0.92%)<tokio::time::driver::Driver<T> as tokio::park::Park>::park (57 samples, 0.92%)tokio::runtime::park::Inner::park_driver (63 samples, 1.01%)<tokio::runtime::park::Parker as tokio::park::Park>::park (141 samples, 2.27%)<..tokio::runtime::park::Inner::park (139 samples, 2.24%)t..tokio::util::try_lock::TryLock<T>::try_lock (11 samples, 0.18%)core::sync::atomic::AtomicBool::compare_exchange (11 samples, 0.18%)core::sync::atomic::atomic_compare_exchange (11 samples, 0.18%)tokio::runtime::thread_pool::worker::Context::park_timeout (149 samples, 2.40%)to..tokio::runtime::queue::Inject<T>::is_closed (12 samples, 0.19%)std::sync::mutex::Mutex<T>::lock (8 samples, 0.13%)std::sys_common::mutex::Mutex::raw_lock (8 samples, 0.13%)std::sys::unix::mutex::Mutex::lock (8 samples, 0.13%)__GI___pthread_mutex_lock (8 samples, 0.13%)tokio::runtime::task::raw::RawTask::dealloc (11 samples, 0.18%)core::ptr::drop_in_place (12 samples, 0.19%)core::ptr::drop_in_place (12 samples, 0.19%)<tokio::runtime::task::Task<S> as core::ops::drop::Drop>::drop (12 samples, 0.19%)tokio::runtime::thread_pool::worker::Core::maintenance (35 samples, 0.56%)tokio::runtime::thread_pool::worker::Core::drain_pending_drop (23 samples, 0.37%)tokio::runtime::thread_pool::worker::Core::transition_from_parked (12 samples, 0.19%)tokio::runtime::thread_pool::idle::Idle::is_parked (12 samples, 0.19%)tokio::runtime::thread_pool::idle::Idle::transition_worker_to_parked (9 samples, 0.14%)tokio::runtime::thread_pool::worker::Context::park (206 samples, 3.32%)tok..tokio::runtime::thread_pool::worker::Core::transition_to_parked (10 samples, 0.16%)core::sync::atomic::AtomicPtr<T>::load (9 samples, 0.14%)core::sync::atomic::atomic_load (9 samples, 0.14%)core::task::wake::Waker::wake (11 samples, 0.18%)tokio::runtime::task::waker::wake_by_val (11 samples, 0.18%)tokio::runtime::task::harness::Harness<T,S>::wake_by_val (11 samples, 0.18%)tokio::runtime::task::harness::Harness<T,S>::wake_by_ref (10 samples, 0.16%)tokio::sync::semaphore_ll::Semaphore::add_permits_locked2 (23 samples, 0.37%)tokio::sync::semaphore_ll::Waiter::assign_permits (14 samples, 0.23%)tokio::sync::task::atomic_waker::AtomicWaker::wake (13 samples, 0.21%)tokio::sync::mpsc::bounded::Receiver<T>::recv::{{closure}} (30 samples, 0.48%)<tokio::future::poll_fn::PollFn<F> as core::future::future::Future>::poll (29 samples, 0.47%)tokio::sync::mpsc::bounded::Receiver<T>::recv::{{closure}}::{{closure}} (29 samples, 0.47%)tokio::sync::mpsc::bounded::Receiver<T>::poll_recv (29 samples, 0.47%)tokio::sync::mpsc::chan::Rx<T,S>::recv (29 samples, 0.47%)tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut (29 samples, 0.47%)tokio::sync::mpsc::chan::Rx<T,S>::recv::{{closure}} (29 samples, 0.47%)tokio::sync::semaphore_ll::Semaphore::add_permits_locked (26 samples, 0.42%)<tokio::future::poll_fn::PollFn<F> as core::future::future::Future>::poll (13 samples, 0.21%)tokio::sync::mpsc::bounded::Sender<T>::send::{{closure}}::{{closure}} (13 samples, 0.21%)tokio::sync::mpsc::bounded::Sender<T>::poll_ready (13 samples, 0.21%)tokio::sync::mpsc::chan::Tx<T,S>::poll_ready (13 samples, 0.21%)<(tokio::sync::semaphore_ll::Semaphore,usize) as tokio::sync::mpsc::chan::Semaphore>::poll_acquire (13 samples, 0.21%)tokio::sync::semaphore_ll::Permit::poll_acquire (13 samples, 0.21%)tokio::sync::semaphore_ll::Waiter::permits_to_acquire (7 samples, 0.11%)core::sync::atomic::AtomicUsize::load (7 samples, 0.11%)core::sync::atomic::atomic_load (7 samples, 0.11%)tokio::sync::mpsc::list::Tx<T>::push (8 samples, 0.13%)tokio::sync::mpsc::bounded::Sender<T>::send::{{closure}} (24 samples, 0.39%)tokio::sync::mpsc::bounded::Sender<T>::try_send (9 samples, 0.14%)tokio::sync::mpsc::chan::Tx<T,S>::try_send (9 samples, 0.14%)tokio::sync::mpsc::chan::Chan<T,S>::try_send (9 samples, 0.14%)<tokio::runtime::park::Unparker as tokio::park::Unpark>::unpark (10 samples, 0.16%)tokio::runtime::park::Inner::unpark (7 samples, 0.11%)tokio::runtime::thread_pool::idle::Idle::notify_should_wakeup (7 samples, 0.11%)core::sync::atomic::AtomicUsize::fetch_add (7 samples, 0.11%)core::sync::atomic::atomic_add (7 samples, 0.11%)tokio::runtime::task::core::Core<T,S>::schedule (33 samples, 0.53%)tokio::loom::std::unsafe_cell::UnsafeCell<T>::with (33 samples, 0.53%)tokio::runtime::task::core::Core<T,S>::schedule::{{closure}} (33 samples, 0.53%)tokio::runtime::thread_pool::worker::<impl tokio::runtime::task::Schedule for alloc::sync::Arc<tokio::runtime::thread_pool::worker::Worker>>::schedule (33 samples, 0.53%)tokio::runtime::thread_pool::worker::Shared::schedule (33 samples, 0.53%)tokio::macros::scoped_tls::ScopedKey<T>::with (33 samples, 0.53%)tokio::runtime::thread_pool::worker::Shared::schedule::{{closure}} (33 samples, 0.53%)tokio::runtime::thread_pool::worker::Shared::schedule_local (33 samples, 0.53%)tokio::runtime::thread_pool::worker::Shared::notify_parked (32 samples, 0.52%)tokio::runtime::thread_pool::idle::Idle::worker_to_notify (13 samples, 0.21%)<futures_util::sink::send::Send<Si,Item> as core::future::future::Future>::poll (47 samples, 0.76%)<&mut S as futures_sink::Sink<Item>>::start_send (45 samples, 0.72%)<core::pin::Pin<P> as futures_sink::Sink<Item>>::start_send (45 samples, 0.72%)<tokio_postgres::copy_in::CopyInSink<T> as futures_sink::Sink<T>>::start_send (44 samples, 0.71%)futures_channel::mpsc::sink_impl::<impl futures_sink::Sink<T> for futures_channel::mpsc::Sender<T>>::start_send (43 samples, 0.69%)futures_channel::mpsc::Sender<T>::start_send (43 samples, 0.69%)futures_channel::mpsc::Sender<T>::try_send (43 samples, 0.69%)futures_channel::mpsc::BoundedSenderInner<T>::try_send (43 samples, 0.69%)futures_channel::mpsc::BoundedSenderInner<T>::do_send_b (43 samples, 0.69%)futures_channel::mpsc::BoundedSenderInner<T>::queue_push_and_signal (42 samples, 0.68%)tokio::runtime::task::waker::wake_by_val (35 samples, 0.56%)tokio::runtime::task::harness::Harness<T,S>::wake_by_val (35 samples, 0.56%)tokio::runtime::task::harness::Harness<T,S>::wake_by_ref (35 samples, 0.56%)<&serde_json::map::Map<alloc::string::String,serde_json::value::Value> as core::iter::traits::collect::IntoIterator>::into_iter (9 samples, 0.14%)serde::ser::impls::<impl serde::ser::Serialize for alloc::string::String>::serialize (22 samples, 0.35%)<serde_json::ser::MapKeySerializer<W,F> as serde::ser::Serializer>::serialize_str (21 samples, 0.34%)<&mut serde_json::ser::Serializer<W,F> as serde::ser::Serializer>::serialize_str (21 samples, 0.34%)serde_json::ser::format_escaped_str (21 samples, 0.34%)serde_json::ser::format_escaped_str_contents (16 samples, 0.26%)serde_json::ser::Formatter::write_string_fragment (7 samples, 0.11%)std::io::Write::write_all (7 samples, 0.11%)<bytes::buf::ext::writer::Writer<B> as std::io::Write>::write (7 samples, 0.11%)<serde_json::ser::Compound<W,F> as serde::ser::SerializeMap>::serialize_key (25 samples, 0.40%)<core::slice::Iter<T> as core::iter::traits::iterator::Iterator>::next (13 samples, 0.21%)<serde_json::ser::State as core::cmp::PartialEq>::eq (7 samples, 0.11%)core::ptr::drop_in_place (49 samples, 0.79%)serde::ser::Serializer::collect_seq (13 samples, 0.21%)core::intrinsics::copy_nonoverlapping (169 samples, 2.72%)co..core::ptr::const_ptr::<impl *const T>::offset (15 samples, 0.24%)itoa::Buffer::format (636 samples, 10.24%)itoa::Buffer::f..<u64 as itoa::Integer>::write (636 samples, 10.24%)<u64 as itoa::I..<u64 as itoa::IntegerPrivate<[u8: _]>>::write_to (636 samples, 10.24%)<u64 as itoa::I..core::ptr::mut_ptr::<impl *mut T>::offset (14 samples, 0.23%)<&mut T as bytes::buf::buf_mut::BufMut>::remaining_mut (66 samples, 1.06%)<bytes::bytes_mut::BytesMut as bytes::buf::buf_mut::BufMut>::remaining_mut (11 samples, 0.18%)<&mut T as bytes::buf::buf_mut::BufMut>::advance_mut (38 samples, 0.61%)<bytes::bytes_mut::BytesMut as bytes::buf::buf_mut::BufMut>::advance_mut (38 samples, 0.61%)bytes::bytes_mut::BytesMut::maybe_uninit_bytes (8 samples, 0.13%)core::ptr::mut_ptr::<impl *mut T>::offset (8 samples, 0.13%)alloc::vec::Vec<T>::reserve (25 samples, 0.40%)alloc::raw_vec::RawVec<T,A>::reserve (25 samples, 0.40%)alloc::raw_vec::RawVec<T,A>::try_reserve (25 samples, 0.40%)alloc::raw_vec::RawVec<T,A>::grow (25 samples, 0.40%)<alloc::alloc::Global as core::alloc::AllocRef>::grow (25 samples, 0.40%)alloc::alloc::realloc (25 samples, 0.40%)__GI___libc_realloc (25 samples, 0.40%)_int_realloc (24 samples, 0.39%)_int_malloc (16 samples, 0.26%)malloc_consolidate (7 samples, 0.11%)alloc::vec::Vec<T>::with_capacity (38 samples, 0.61%)alloc::raw_vec::RawVec<T>::with_capacity (38 samples, 0.61%)alloc::raw_vec::RawVec<T,A>::with_capacity_in (38 samples, 0.61%)alloc::raw_vec::RawVec<T,A>::allocate_in (38 samples, 0.61%)<alloc::alloc::Global as core::alloc::AllocRef>::alloc (38 samples, 0.61%)alloc::alloc::alloc (38 samples, 0.61%)__GI___libc_malloc (38 samples, 0.61%)_int_malloc (37 samples, 0.60%)malloc_consolidate (24 samples, 0.39%)unlink_chunk.constprop.0 (7 samples, 0.11%)<&mut T as bytes::buf::buf_mut::BufMut>::bytes_mut (96 samples, 1.55%)<bytes::bytes_mut::BytesMut as bytes::buf::buf_mut::BufMut>::bytes_mut (96 samples, 1.55%)bytes::bytes_mut::BytesMut::reserve (69 samples, 1.11%)bytes::bytes_mut::BytesMut::reserve_inner (69 samples, 1.11%)<&mut T as bytes::buf::buf_mut::BufMut>::remaining_mut (9 samples, 0.14%)<&mut T as bytes::buf::buf_mut::BufMut>::remaining_mut (9 samples, 0.14%)<bytes::bytes_mut::BytesMut as bytes::buf::buf_mut::BufMut>::remaining_mut (9 samples, 0.14%)bytes::buf::buf_mut::BufMut::put (40 samples, 0.64%)core::cmp::min (45 samples, 0.72%)core::cmp::Ord::min (45 samples, 0.72%)bytes::buf::buf_mut::BufMut::put (540 samples, 8.69%)bytes::buf::..core::intrinsics::copy_nonoverlapping (266 samples, 4.28%)core:..__memmove_avx_unaligned_erms (263 samples, 4.23%)__mem..<bytes::buf::ext::writer::Writer<B> as std::io::Write>::write (683 samples, 11.00%)<bytes::buf::ext..core::cmp::min (36 samples, 0.58%)core::cmp::Ord::min (36 samples, 0.58%)serde_json::value::ser::<impl serde::ser::Serialize for serde_json::value::Value>::serialize (33 samples, 0.53%)core::slice::<impl [T]>::is_empty (17 samples, 0.27%)<serde_json::number::Number as serde::ser::Serialize>::serialize (1,431 samples, 23.04%)<serde_json::number::Number as serde:..<&mut serde_json::ser::Serializer<W,F> as serde::ser::Serializer>::serialize_u64 (1,375 samples, 22.14%)<&mut serde_json::ser::Serializer<W..serde_json::ser::Formatter::write_u64 (1,375 samples, 22.14%)serde_json::ser::Formatter::write_u..std::io::Write::write_all (739 samples, 11.90%)std::io::Write::wr..core::ptr::drop_in_place (28 samples, 0.45%)serde::ser::impls::<impl serde::ser::Serialize for &T>::serialize (1,647 samples, 26.52%)serde::ser::impls::<impl serde::ser::Seria..serde_json::value::ser::<impl serde::ser::Serialize for serde_json::value::Value>::serialize (1,632 samples, 26.28%)serde_json::value::ser::<impl serde::ser::..<&mut T as bytes::buf::buf_mut::BufMut>::remaining_mut (33 samples, 0.53%)<bytes::bytes_mut::BytesMut as bytes::buf::buf_mut::BufMut>::remaining_mut (12 samples, 0.19%)<&mut T as bytes::buf::buf_mut::BufMut>::advance_mut (31 samples, 0.50%)<bytes::bytes_mut::BytesMut as bytes::buf::buf_mut::BufMut>::advance_mut (31 samples, 0.50%)bytes::bytes_mut::BytesMut::maybe_uninit_bytes (19 samples, 0.31%)core::ptr::mut_ptr::<impl *mut T>::offset (19 samples, 0.31%)alloc::vec::Vec<T>::reserve (8 samples, 0.13%)alloc::raw_vec::RawVec<T,A>::reserve (8 samples, 0.13%)alloc::raw_vec::RawVec<T,A>::try_reserve (8 samples, 0.13%)alloc::raw_vec::RawVec<T,A>::grow (8 samples, 0.13%)<alloc::alloc::Global as core::alloc::AllocRef>::grow (8 samples, 0.13%)alloc::alloc::realloc (8 samples, 0.13%)__GI___libc_realloc (8 samples, 0.13%)_int_realloc (7 samples, 0.11%)alloc::vec::Vec<T>::with_capacity (24 samples, 0.39%)alloc::raw_vec::RawVec<T>::with_capacity (24 samples, 0.39%)alloc::raw_vec::RawVec<T,A>::with_capacity_in (24 samples, 0.39%)alloc::raw_vec::RawVec<T,A>::allocate_in (24 samples, 0.39%)<alloc::alloc::Global as core::alloc::AllocRef>::alloc (24 samples, 0.39%)alloc::alloc::alloc (23 samples, 0.37%)__GI___libc_malloc (23 samples, 0.37%)_int_malloc (23 samples, 0.37%)malloc_consolidate (20 samples, 0.32%)unlink_chunk.constprop.0 (8 samples, 0.13%)<&mut T as bytes::buf::buf_mut::BufMut>::bytes_mut (74 samples, 1.19%)<bytes::bytes_mut::BytesMut as bytes::buf::buf_mut::BufMut>::bytes_mut (74 samples, 1.19%)bytes::bytes_mut::BytesMut::reserve (36 samples, 0.58%)bytes::bytes_mut::BytesMut::reserve_inner (35 samples, 0.56%)<&mut T as bytes::buf::buf_mut::BufMut>::remaining_mut (22 samples, 0.35%)<&mut T as bytes::buf::buf_mut::BufMut>::remaining_mut (22 samples, 0.35%)<bytes::bytes_mut::BytesMut as bytes::buf::buf_mut::BufMut>::remaining_mut (22 samples, 0.35%)bytes::buf::buf_mut::BufMut::put (21 samples, 0.34%)core::cmp::min (22 samples, 0.35%)core::cmp::Ord::min (22 samples, 0.35%)bytes::buf::buf_mut::BufMut::put (335 samples, 5.39%)bytes::..core::intrinsics::copy_nonoverlapping (85 samples, 1.37%)__memmove_avx_unaligned_erms (75 samples, 1.21%)<bytes::buf::ext::writer::Writer<B> as std::io::Write>::write (409 samples, 6.59%)<bytes::b..core::cmp::min (13 samples, 0.21%)core::cmp::Ord::min (13 samples, 0.21%)serde::ser::Serializer::collect_seq (13 samples, 0.21%)<serde_json::ser::Compound<W,F> as serde::ser::SerializeSeq>::serialize_element (2,223 samples, 35.79%)<serde_json::ser::Compound<W,F> as serde::ser::SerializeSe..serde_json::ser::Formatter::begin_array_value (454 samples, 7.31%)serde_json..std::io::Write::write_all (454 samples, 7.31%)std::io::W..core::slice::<impl core::ops::index::Index<I> for [T]>::index (12 samples, 0.19%)<core::ops::range::RangeFrom<usize> as core::slice::SliceIndex<[T]>>::index (12 samples, 0.19%)<core::ops::range::Range<usize> as core::slice::SliceIndex<[T]>>::index (12 samples, 0.19%)<core::ops::range::Range<usize> as core::slice::SliceIndex<[T]>>::get_unchecked (12 samples, 0.19%)core::ptr::const_ptr::<impl *const T>::add (12 samples, 0.19%)core::ptr::const_ptr::<impl *const T>::offset (12 samples, 0.19%)serde::ser::Serializer::collect_seq (2,331 samples, 37.53%)serde::ser::Serializer::collect_seqcore::ptr::drop_in_place (19 samples, 0.31%)alloc::collections::btree::navigate::<impl alloc::collections::btree::node::NodeRef<BorrowType,K,V,alloc::collections::btree::node::marker::LeafOrInternal>>::first_leaf_edge (7 samples, 0.11%)alloc::collections::btree::node::Handle<alloc::collections::btree::node::NodeRef<BorrowType,K,V,alloc::collections::btree::node::marker::Internal>,alloc::collections::btree::node::marker::Edge>::descend (7 samples, 0.11%)<&serde_json::map::Map<alloc::string::String,serde_json::value::Value> as core::iter::traits::collect::IntoIterator>::into_iter (19 samples, 0.31%)alloc::collections::btree::map::BTreeMap<K,V>::iter (18 samples, 0.29%)core::option::Option<T>::map (17 samples, 0.27%)alloc::collections::btree::map::BTreeMap<K,V>::iter::{{closure}} (17 samples, 0.27%)alloc::collections::btree::navigate::<impl alloc::collections::btree::node::NodeRef<BorrowType,K,V,alloc::collections::btree::node::marker::LeafOrInternal>>::last_leaf_edge (10 samples, 0.16%)alloc::collections::btree::node::NodeRef<BorrowType,K,V,Type>::last_edge (10 samples, 0.16%)alloc::collections::btree::node::NodeRef<BorrowType,K,V,Type>::len (10 samples, 0.16%)alloc::collections::btree::navigate::<impl alloc::collections::btree::node::Handle<alloc::collections::btree::node::NodeRef<alloc::collections::btree::node::marker::Immut,K,V,alloc::collections::btree::node::marker::Leaf>,alloc::collections::btree::node::marker::Edge>>::next_unchecked::{{closure}} (8 samples, 0.13%)alloc::collections::btree::navigate::<impl alloc::collections::btree::node::Handle<alloc::collections::btree::node::NodeRef<alloc::collections::btree::node::marker::Immut,K,V,alloc::collections::btree::node::marker::Leaf>,alloc::collections::btree::node::marker::Edge>>::next_unchecked (11 samples, 0.18%)alloc::collections::btree::navigate::replace (11 samples, 0.18%)<serde_json::map::Iter as core::iter::traits::iterator::Iterator>::next (12 samples, 0.19%)<alloc::collections::btree::map::Iter<K,V> as core::iter::traits::iterator::Iterator>::next (12 samples, 0.19%)alloc::collections::btree::map::Range<K,V>::next_unchecked (12 samples, 0.19%)ryu::buffer::Buffer::format_finite (8 samples, 0.13%)<f64 as ryu::buffer::Sealed>::write_to_ryu_buffer (8 samples, 0.13%)ryu::pretty::format64 (8 samples, 0.13%)<&mut serde_json::ser::Serializer<W,F> as serde::ser::Serializer>::serialize_f64 (10 samples, 0.16%)serde_json::ser::Formatter::write_f64 (10 samples, 0.16%)<serde_json::number::Number as serde::ser::Serialize>::serialize (15 samples, 0.24%)bytes::buf::buf_mut::BufMut::put (14 samples, 0.23%)serde_json::ser::Formatter::begin_string (19 samples, 0.31%)std::io::Write::write_all (19 samples, 0.31%)<bytes::buf::ext::writer::Writer<B> as std::io::Write>::write (18 samples, 0.29%)serde_json::ser::Formatter::end_string (9 samples, 0.14%)std::io::Write::write_all (9 samples, 0.14%)<bytes::buf::ext::writer::Writer<B> as std::io::Write>::write (9 samples, 0.14%)<&mut T as bytes::buf::buf_mut::BufMut>::bytes_mut (9 samples, 0.14%)<bytes::bytes_mut::BytesMut as bytes::buf::buf_mut::BufMut>::bytes_mut (9 samples, 0.14%)bytes::bytes_mut::BytesMut::reserve (9 samples, 0.14%)bytes::bytes_mut::BytesMut::reserve_inner (9 samples, 0.14%)bytes::buf::buf_mut::BufMut::put (30 samples, 0.48%)core::intrinsics::copy_nonoverlapping (12 samples, 0.19%)__memmove_avx_unaligned_erms (10 samples, 0.16%)serde::ser::impls::<impl serde::ser::Serialize for alloc::string::String>::serialize (141 samples, 2.27%)s..<serde_json::ser::MapKeySerializer<W,F> as serde::ser::Serializer>::serialize_str (137 samples, 2.21%)<..<&mut serde_json::ser::Serializer<W,F> as serde::ser::Serializer>::serialize_str (137 samples, 2.21%)<..serde_json::ser::format_escaped_str (136 samples, 2.19%)s..serde_json::ser::format_escaped_str_contents (96 samples, 1.55%)serde_json::ser::Formatter::write_string_fragment (46 samples, 0.74%)std::io::Write::write_all (46 samples, 0.74%)<bytes::buf::ext::writer::Writer<B> as std::io::Write>::write (45 samples, 0.72%)bytes::buf::buf_mut::BufMut::put (19 samples, 0.31%)core::intrinsics::copy_nonoverlapping (7 samples, 0.11%)<bytes::buf::ext::writer::Writer<B> as std::io::Write>::write (25 samples, 0.40%)<serde_json::ser::Compound<W,F> as serde::ser::SerializeMap>::serialize_key (176 samples, 2.83%)<s..serde_json::ser::Formatter::begin_object_key (27 samples, 0.43%)std::io::Write::write_all (27 samples, 0.43%)<&mut T as bytes::buf::buf_mut::BufMut>::remaining_mut (12 samples, 0.19%)bytes::buf::buf_mut::BufMut::put (8 samples, 0.13%)serde_json::ser::Formatter::begin_object_value (25 samples, 0.40%)std::io::Write::write_all (25 samples, 0.40%)<bytes::buf::ext::writer::Writer<B> as std::io::Write>::write (25 samples, 0.40%)<&mut serde_json::ser::Serializer<W,F> as serde::ser::Serializer>::serialize_str (14 samples, 0.23%)serde_json::ser::format_escaped_str (14 samples, 0.23%)serde_json::ser::format_escaped_str_contents (9 samples, 0.14%)<u64 as itoa::IntegerPrivate<[u8: _]>>::write_to (16 samples, 0.26%)itoa::Buffer::format (17 samples, 0.27%)<u64 as itoa::Integer>::write (17 samples, 0.27%)<&mut T as bytes::buf::buf_mut::BufMut>::bytes_mut (7 samples, 0.11%)<bytes::bytes_mut::BytesMut as bytes::buf::buf_mut::BufMut>::bytes_mut (7 samples, 0.11%)<bytes::buf::ext::writer::Writer<B> as std::io::Write>::write (27 samples, 0.43%)bytes::buf::buf_mut::BufMut::put (23 samples, 0.37%)core::intrinsics::copy_nonoverlapping (7 samples, 0.11%)__memmove_avx_unaligned_erms (7 samples, 0.11%)<serde_json::number::Number as serde::ser::Serialize>::serialize (49 samples, 0.79%)<&mut serde_json::ser::Serializer<W,F> as serde::ser::Serializer>::serialize_u64 (45 samples, 0.72%)serde_json::ser::Formatter::write_u64 (45 samples, 0.72%)std::io::Write::write_all (28 samples, 0.45%)<serde_json::ser::Compound<W,F> as serde::ser::SerializeMap>::serialize_value (127 samples, 2.04%)<..serde_json::value::ser::<impl serde::ser::Serialize for serde_json::value::Value>::serialize (90 samples, 1.45%)tokio_postgres::binary_copy::BinaryCopyInWriter::write::{{closure}} (2,799 samples, 45.07%)tokio_postgres::binary_copy::BinaryCopyInWriter::write::{{closure}}<core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll (2,799 samples, 45.07%)<core::future::from_generator::GenFuture<T> as core::future::future::Futur..tokio_postgres::binary_copy::BinaryCopyInWriter::write_raw::{{closure}} (2,799 samples, 45.07%)tokio_postgres::binary_copy::BinaryCopyInWriter::write_raw::{{closure}}postgres_types::serde_json_1::<impl postgres_types::ToSql for serde_json::value::Value>::to_sql_checked (2,750 samples, 44.28%)postgres_types::serde_json_1::<impl postgres_types::ToSql for serde_json:..postgres_types::__to_sql_checked (2,750 samples, 44.28%)postgres_types::__to_sql_checkedpostgres_types::serde_json_1::<impl postgres_types::ToSql for serde_json::value::Value>::to_sql (2,750 samples, 44.28%)postgres_types::serde_json_1::<impl postgres_types::ToSql for serde_json:..<postgres_types::serde_json_1::Json<T> as postgres_types::ToSql>::to_sql (2,750 samples, 44.28%)<postgres_types::serde_json_1::Json<T> as postgres_types::ToSql>::to_sqlserde_json::ser::to_writer (2,749 samples, 44.26%)serde_json::ser::to_writerserde::ser::impls::<impl serde::ser::Serialize for &T>::serialize (2,749 samples, 44.26%)serde::ser::impls::<impl serde::ser::Serialize for &T>::serializeserde_json::value::ser::<impl serde::ser::Serialize for serde_json::value::Value>::serialize (2,749 samples, 44.26%)serde_json::value::ser::<impl serde::ser::Serialize for serde_json::value..serde::ser::SerializeMap::serialize_entry (2,734 samples, 44.02%)serde::ser::SerializeMap::serialize_entry<serde_json::ser::Compound<W,F> as serde::ser::SerializeMap>::serialize_value (2,708 samples, 43.60%)<serde_json::ser::Compound<W,F> as serde::ser::SerializeMap>::serialize..serde_json::value::ser::<impl serde::ser::Serialize for serde_json::value::Value>::serialize (370 samples, 5.96%)serde_js..serde::ser::SerializeMap::serialize_entry (308 samples, 4.96%)serde:..__GI___libc_malloc (9 samples, 0.14%)_int_malloc (7 samples, 0.11%)<tpcpr::parser::QryData as core::clone::Clone>::clone (20 samples, 0.32%)<core::option::Option<T> as core::clone::Clone>::clone (13 samples, 0.21%)<alloc::vec::Vec<T> as core::clone::Clone>::clone (12 samples, 0.19%)alloc::slice::<impl [T]>::to_vec (12 samples, 0.19%)alloc::slice::hack::to_vec (12 samples, 0.19%)alloc::vec::Vec<T>::with_capacity (10 samples, 0.16%)alloc::raw_vec::RawVec<T>::with_capacity (10 samples, 0.16%)alloc::raw_vec::RawVec<T,A>::with_capacity_in (10 samples, 0.16%)alloc::raw_vec::RawVec<T,A>::allocate_in (10 samples, 0.16%)<alloc::alloc::Global as core::alloc::AllocRef>::alloc (10 samples, 0.16%)alloc::alloc::alloc (10 samples, 0.16%)alloc::vec::Vec<T>::push (7 samples, 0.11%)pcap::Capture<T>::next (12 samples, 0.19%)pcap_next_ex (12 samples, 0.19%)[libpcap.so.1.9.1] (12 samples, 0.19%)[libpcap.so.1.9.1] (12 samples, 0.19%)[libpcap.so.1.9.1] (8 samples, 0.13%)[libpcap.so.1.9.1] (8 samples, 0.13%)__GI__IO_fread (8 samples, 0.13%)tpcpr::parser::QryData::transport_layer (16 samples, 0.26%)tpcpr::parser::packet_handler::tcp_handler (10 samples, 0.16%)tpcpr::parser::QryData::encap_en10mb (25 samples, 0.40%)tpcpr::parser::packet_handler::ip_handler (8 samples, 0.13%)tpcpr::parser::QryData::transport_layer (8 samples, 0.13%)tpcpr::parser::QryData::encap_raw (21 samples, 0.34%)tpcpr::parser::packet_handler::ip_handler (12 samples, 0.19%)aho_corasick::ahocorasick::Imp<S>::find_at_no_state (8 samples, 0.13%)aho_corasick::dfa::DFA<S>::find_at_no_state (8 samples, 0.13%)aho_corasick::automaton::Automaton::find_at_no_state (8 samples, 0.13%)aho_corasick::automaton::Automaton::leftmost_find_at_no_state (8 samples, 0.13%)aho_corasick::automaton::Automaton::leftmost_find_at_no_state_imp (8 samples, 0.13%)aho_corasick::prefilter::next (8 samples, 0.13%)tpcpr::parser::tokio_parse::{{closure}} (103 samples, 1.66%)tpcpr::parser::flag_carnage (10 samples, 0.16%)<regex::re_bytes::Matches as core::iter::traits::iterator::Iterator>::next (10 samples, 0.16%)<regex::re_trait::Matches<R> as core::iter::traits::iterator::Iterator>::next (10 samples, 0.16%)<regex::exec::ExecNoSync as regex::re_trait::RegularExpression>::find_at (10 samples, 0.16%)regex::exec::ExecNoSync::find_dfa_forward (10 samples, 0.16%)regex::dfa::Fsm::forward (10 samples, 0.16%)regex::dfa::Fsm::exec_at (9 samples, 0.14%)regex::dfa::Fsm::prefix_at (9 samples, 0.14%)regex::literal::imp::LiteralSearcher::find (9 samples, 0.14%)aho_corasick::ahocorasick::AhoCorasick<S>::find (9 samples, 0.14%)core::ptr::drop_in_place (11 samples, 0.18%)core::ptr::drop_in_place (11 samples, 0.18%)core::ptr::drop_in_place (10 samples, 0.16%)core::ptr::drop_in_place (10 samples, 0.16%)<alloc::raw_vec::RawVec<T,A> as core::ops::drop::Drop>::drop (10 samples, 0.16%)<alloc::alloc::Global as core::alloc::AllocRef>::dealloc (10 samples, 0.16%)alloc::alloc::dealloc (10 samples, 0.16%)_int_free (7 samples, 0.11%)<serde_json::value::ser::SerializeMap as serde::ser::SerializeMap>::serialize_key (21 samples, 0.34%)serde::ser::impls::<impl serde::ser::Serialize for str>::serialize (21 samples, 0.34%)<serde_json::value::ser::MapKeySerializer as serde::ser::Serializer>::serialize_str (21 samples, 0.34%)alloc::str::<impl alloc::borrow::ToOwned for str>::to_owned (21 samples, 0.34%)alloc::slice::<impl alloc::borrow::ToOwned for [T]>::to_owned (21 samples, 0.34%)alloc::slice::<impl [T]>::to_vec (21 samples, 0.34%)alloc::slice::hack::to_vec (21 samples, 0.34%)alloc::vec::Vec<T>::with_capacity (15 samples, 0.24%)alloc::raw_vec::RawVec<T>::with_capacity (15 samples, 0.24%)alloc::raw_vec::RawVec<T,A>::with_capacity_in (15 samples, 0.24%)alloc::raw_vec::RawVec<T,A>::allocate_in (15 samples, 0.24%)<alloc::alloc::Global as core::alloc::AllocRef>::alloc (15 samples, 0.24%)alloc::alloc::alloc (15 samples, 0.24%)__GI___libc_malloc (15 samples, 0.24%)serde_json::map::Map<alloc::string::String,serde_json::value::Value>::insert (11 samples, 0.18%)alloc::collections::btree::map::BTreeMap<K,V>::insert (10 samples, 0.16%)alloc::collections::btree::map::BTreeMap<K,V>::entry (10 samples, 0.16%)<core::slice::Iter<T> as core::iter::traits::iterator::Iterator>::next (10 samples, 0.16%)alloc::vec::Vec<T>::push (413 samples, 6.65%)alloc::ve..core::ptr::write (397 samples, 6.39%)core::pt..<serde_json::value::ser::SerializeVec as serde::ser::SerializeSeq>::serialize_element (438 samples, 7.05%)<serde_js..serde_json::value::to_value (25 samples, 0.40%)serde::ser::impls::<impl serde::ser::Serialize for &T>::serialize (23 samples, 0.37%)serde::ser::impls::<impl serde::ser::Serialize for &T>::serialize (23 samples, 0.37%)serde::ser::impls::<impl serde::ser::Serialize for &T>::serialize (23 samples, 0.37%)serde::ser::impls::<impl serde::ser::Serialize for u8>::serialize (23 samples, 0.37%)<serde_json::value::ser::Serializer as serde::ser::Serializer>::serialize_u8 (23 samples, 0.37%)<serde_json::value::ser::Serializer as serde::ser::Serializer>::serialize_u64 (23 samples, 0.37%)serde::ser::impls::<impl serde::ser::Serialize for alloc::vec::Vec<T>>::serialize (463 samples, 7.45%)serde::ser..serde::ser::Serializer::collect_seq (463 samples, 7.45%)serde::ser..<serde_json::value::ser::Serializer as serde::ser::Serializer>::serialize_seq (15 samples, 0.24%)alloc::vec::Vec<T>::with_capacity (15 samples, 0.24%)alloc::raw_vec::RawVec<T>::with_capacity (15 samples, 0.24%)alloc::raw_vec::RawVec<T,A>::with_capacity_in (15 samples, 0.24%)alloc::raw_vec::RawVec<T,A>::allocate_in (15 samples, 0.24%)<alloc::alloc::Global as core::alloc::AllocRef>::alloc (15 samples, 0.24%)alloc::alloc::alloc (15 samples, 0.24%)__GI___libc_malloc (15 samples, 0.24%)_int_malloc (15 samples, 0.24%)sysmalloc (11 samples, 0.18%)alloc::vec::Vec<T>::reserve (13 samples, 0.21%)alloc::raw_vec::RawVec<T,A>::reserve (13 samples, 0.21%)alloc::raw_vec::RawVec<T,A>::try_reserve (13 samples, 0.21%)alloc::raw_vec::RawVec<T,A>::grow (13 samples, 0.21%)<alloc::alloc::Global as core::alloc::AllocRef>::grow (9 samples, 0.14%)alloc::alloc::realloc (9 samples, 0.14%)__GI___libc_realloc (9 samples, 0.14%)<&mut W as core::fmt::Write>::write_str (14 samples, 0.23%)<alloc::string::String as core::fmt::Write>::write_str (14 samples, 0.23%)alloc::string::String::push_str (14 samples, 0.23%)alloc::vec::Vec<T>::extend_from_slice (14 samples, 0.23%)<alloc::vec::Vec<T> as alloc::vec::SpecExtend<&T,core::slice::Iter<T>>>::spec_extend (14 samples, 0.23%)tpcpr::parser::packet_handler::_IMPL_SERIALIZE_FOR_EtherHeader::<impl serde::ser::Serialize for tpcpr::parser::packet_handler::EtherHeader>::serialize (39 samples, 0.63%)<serde_json::value::ser::SerializeMap as serde::ser::SerializeStruct>::serialize_field (39 samples, 0.63%)serde::ser::SerializeMap::serialize_entry (39 samples, 0.63%)<serde_json::value::ser::SerializeMap as serde::ser::SerializeMap>::serialize_value (35 samples, 0.56%)serde_json::value::to_value (28 samples, 0.45%)serde::ser::impls::<impl serde::ser::Serialize for &T>::serialize (28 samples, 0.45%)serde::ser::impls::<impl serde::ser::Serialize for &T>::serialize (28 samples, 0.45%)<eui48::MacAddress as serde::ser::Serialize>::serialize (28 samples, 0.45%)eui48::MacAddress::to_canonical (24 samples, 0.39%)alloc::fmt::format (24 samples, 0.39%)core::fmt::Write::write_fmt (24 samples, 0.39%)core::fmt::write (24 samples, 0.39%)core::fmt::run (21 samples, 0.34%)core::fmt::num::<impl core::fmt::LowerHex for i8>::fmt (16 samples, 0.26%)core::fmt::num::GenericRadix::fmt_int (16 samples, 0.26%)__GI___libc_malloc (24 samples, 0.39%)_int_malloc (7 samples, 0.11%)<serde_json::value::ser::SerializeMap as serde::ser::SerializeMap>::serialize_key (36 samples, 0.58%)serde::ser::impls::<impl serde::ser::Serialize for str>::serialize (35 samples, 0.56%)<serde_json::value::ser::MapKeySerializer as serde::ser::Serializer>::serialize_str (35 samples, 0.56%)alloc::str::<impl alloc::borrow::ToOwned for str>::to_owned (35 samples, 0.56%)alloc::slice::<impl alloc::borrow::ToOwned for [T]>::to_owned (35 samples, 0.56%)alloc::slice::<impl [T]>::to_vec (35 samples, 0.56%)alloc::slice::hack::to_vec (35 samples, 0.56%)alloc::vec::Vec<T>::with_capacity (26 samples, 0.42%)alloc::raw_vec::RawVec<T>::with_capacity (26 samples, 0.42%)alloc::raw_vec::RawVec<T,A>::with_capacity_in (26 samples, 0.42%)alloc::raw_vec::RawVec<T,A>::allocate_in (26 samples, 0.42%)<alloc::alloc::Global as core::alloc::AllocRef>::alloc (26 samples, 0.42%)alloc::alloc::alloc (25 samples, 0.40%)core::option::Option<T>::expect (8 samples, 0.13%)alloc::collections::btree::search::search_linear (8 samples, 0.13%)<alloc::string::String as core::cmp::Ord>::cmp (8 samples, 0.13%)<alloc::vec::Vec<T> as core::cmp::Ord>::cmp (8 samples, 0.13%)core::slice::<impl core::cmp::Ord for [T]>::cmp (7 samples, 0.11%)<u8 as core::slice::SliceOrd>::compare (7 samples, 0.11%)alloc::collections::btree::map::BTreeMap<K,V>::entry (30 samples, 0.48%)alloc::collections::btree::search::search_tree (12 samples, 0.19%)alloc::collections::btree::search::search_node (11 samples, 0.18%)alloc::collections::btree::node::Handle<alloc::collections::btree::node::NodeRef<alloc::collections::btree::node::marker::Mut,K,V,alloc::collections::btree::node::marker::Leaf>,alloc::collections::btree::node::marker::Edge>::insert_fit (8 samples, 0.13%)alloc::collections::btree::node::slice_insert (8 samples, 0.13%)alloc::collections::btree::node::Handle<alloc::collections::btree::node::NodeRef<alloc::collections::btree::node::marker::Mut,K,V,alloc::collections::btree::node::marker::Leaf>,alloc::collections::btree::node::marker::Edge>::insert (17 samples, 0.27%)alloc::collections::btree::node::Handle<alloc::collections::btree::node::NodeRef<alloc::collections::btree::node::marker::Mut,K,V,alloc::collections::btree::node::marker::Leaf>,alloc::collections::btree::node::marker::KV>::split (9 samples, 0.14%)alloc::boxed::Box<T>::new (9 samples, 0.14%)alloc::alloc::exchange_malloc (8 samples, 0.13%)<alloc::alloc::Global as core::alloc::AllocRef>::alloc (8 samples, 0.13%)alloc::alloc::alloc (8 samples, 0.13%)__GI___libc_malloc (8 samples, 0.13%)_int_malloc (7 samples, 0.11%)sysmalloc (7 samples, 0.11%)serde_json::map::Map<alloc::string::String,serde_json::value::Value>::insert (58 samples, 0.93%)alloc::collections::btree::map::BTreeMap<K,V>::insert (57 samples, 0.92%)alloc::collections::btree::map::VacantEntry<K,V>::insert (27 samples, 0.43%)alloc::collections::btree::node::Root<K,V>::push_level (10 samples, 0.16%)tpcpr::parser::packet_handler::_IMPL_SERIALIZE_FOR_IpV4Header::<impl serde::ser::Serialize for tpcpr::parser::packet_handler::IpV4Header>::serialize (126 samples, 2.03%)t..<serde_json::value::ser::SerializeMap as serde::ser::SerializeStruct>::serialize_field (125 samples, 2.01%)<..serde::ser::SerializeMap::serialize_entry (124 samples, 2.00%)s..<serde_json::value::ser::SerializeMap as serde::ser::SerializeMap>::serialize_value (88 samples, 1.42%)serde_json::value::to_value (20 samples, 0.32%)serde::ser::impls::<impl serde::ser::Serialize for &T>::serialize (20 samples, 0.32%)serde::ser::impls::<impl serde::ser::Serialize for &T>::serialize (20 samples, 0.32%)serde::ser::impls::<impl serde::ser::Serialize for std::net::ip::IpAddr>::serialize (20 samples, 0.32%)serde::ser::impls::<impl serde::ser::Serialize for std::net::ip::Ipv4Addr>::serialize (20 samples, 0.32%)std::io::Write::write_fmt (13 samples, 0.21%)core::fmt::write (13 samples, 0.21%)<std::net::ip::Ipv4Addr as core::fmt::Display>::fmt (12 samples, 0.19%)std::io::Write::write_fmt (8 samples, 0.13%)core::fmt::write (8 samples, 0.13%)tpcpr::parser::packet_handler::_IMPL_SERIALIZE_FOR_IpV6Header::<impl serde::ser::Serialize for tpcpr::parser::packet_handler::IpV6Header>::serialize (14 samples, 0.23%)<serde_json::value::ser::SerializeMap as serde::ser::SerializeStruct>::serialize_field (14 samples, 0.23%)serde::ser::SerializeMap::serialize_entry (14 samples, 0.23%)<serde_json::value::ser::SerializeMap as serde::ser::SerializeMap>::serialize_value (11 samples, 0.18%)__GI___libc_malloc (45 samples, 0.72%)_int_malloc (20 samples, 0.32%)<serde_json::value::ser::SerializeMap as serde::ser::SerializeMap>::serialize_key (64 samples, 1.03%)serde::ser::impls::<impl serde::ser::Serialize for str>::serialize (62 samples, 1.00%)<serde_json::value::ser::MapKeySerializer as serde::ser::Serializer>::serialize_str (62 samples, 1.00%)alloc::str::<impl alloc::borrow::ToOwned for str>::to_owned (62 samples, 1.00%)alloc::slice::<impl alloc::borrow::ToOwned for [T]>::to_owned (62 samples, 1.00%)alloc::slice::<impl [T]>::to_vec (62 samples, 1.00%)alloc::slice::hack::to_vec (62 samples, 1.00%)alloc::vec::Vec<T>::with_capacity (46 samples, 0.74%)alloc::raw_vec::RawVec<T>::with_capacity (46 samples, 0.74%)alloc::raw_vec::RawVec<T,A>::with_capacity_in (46 samples, 0.74%)alloc::raw_vec::RawVec<T,A>::allocate_in (46 samples, 0.74%)<alloc::alloc::Global as core::alloc::AllocRef>::alloc (46 samples, 0.74%)alloc::alloc::alloc (46 samples, 0.74%)core::option::Option<T>::expect (9 samples, 0.14%)__GI___libc_malloc (8 samples, 0.13%)_int_malloc (8 samples, 0.13%)alloc::collections::btree::map::BTreeMap<K,V>::ensure_root_is_owned (11 samples, 0.18%)core::option::Option<T>::get_or_insert_with (11 samples, 0.18%)core::ops::function::FnOnce::call_once (11 samples, 0.18%)alloc::collections::btree::node::Root<K,V>::new_leaf (11 samples, 0.18%)alloc::boxed::Box<T>::new (11 samples, 0.18%)alloc::alloc::exchange_malloc (10 samples, 0.16%)<alloc::alloc::Global as core::alloc::AllocRef>::alloc (10 samples, 0.16%)alloc::alloc::alloc (10 samples, 0.16%)alloc::collections::btree::search::search_linear (13 samples, 0.21%)<alloc::string::String as core::cmp::Ord>::cmp (9 samples, 0.14%)<alloc::vec::Vec<T> as core::cmp::Ord>::cmp (9 samples, 0.14%)core::slice::<impl core::cmp::Ord for [T]>::cmp (7 samples, 0.11%)<u8 as core::slice::SliceOrd>::compare (7 samples, 0.11%)__memcmp_avx2_movbe (7 samples, 0.11%)alloc::collections::btree::map::BTreeMap<K,V>::entry (36 samples, 0.58%)alloc::collections::btree::search::search_tree (21 samples, 0.34%)alloc::collections::btree::search::search_node (18 samples, 0.29%)alloc::collections::btree::node::Handle<alloc::collections::btree::node::NodeRef<alloc::collections::btree::node::marker::Mut,K,V,alloc::collections::btree::node::marker::Leaf>,alloc::collections::btree::node::marker::Edge>::insert_fit (7 samples, 0.11%)alloc::collections::btree::node::slice_insert (7 samples, 0.11%)alloc::collections::btree::node::Handle<alloc::collections::btree::node::NodeRef<alloc::collections::btree::node::marker::Mut,K,V,alloc::collections::btree::node::marker::Leaf>,alloc::collections::btree::node::marker::Edge>::insert (15 samples, 0.24%)alloc::boxed::Box<T>::new (7 samples, 0.11%)<core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll (3,800 samples, 61.18%)<core::future::from_generator::GenFuture<T> as core::future::future::Future>::polltpcpr::serializer::tokio_serialize::{{closure}} (841 samples, 13.54%)tpcpr::serializer::t..serde_json::value::to_value (838 samples, 13.49%)serde_json::value::t..tpcpr::parser::_IMPL_SERIALIZE_FOR_QryData::<impl serde::ser::Serialize for tpcpr::parser::QryData>::serialize (827 samples, 13.32%)tpcpr::parser::_IMPL..<serde_json::value::ser::SerializeMap as serde::ser::SerializeStruct>::serialize_field (823 samples, 13.25%)<serde_json::value::..serde::ser::SerializeMap::serialize_entry (823 samples, 13.25%)serde::ser::Serializ..<serde_json::value::ser::SerializeMap as serde::ser::SerializeMap>::serialize_value (802 samples, 12.91%)<serde_json::value:..serde_json::value::to_value (790 samples, 12.72%)serde_json::value::..serde::ser::impls::<impl serde::ser::Serialize for &T>::serialize (790 samples, 12.72%)serde::ser::impls::..serde::ser::impls::<impl serde::ser::Serialize for &T>::serialize (790 samples, 12.72%)serde::ser::impls::..serde::ser::impls::<impl serde::ser::Serialize for core::option::Option<T>>::serialize (790 samples, 12.72%)serde::ser::impls::..<serde_json::value::ser::Serializer as serde::ser::Serializer>::serialize_some (790 samples, 12.72%)<serde_json::value:..tpcpr::parser::packet_handler::_IMPL_SERIALIZE_FOR_TcpHeader::<impl serde::ser::Serialize for tpcpr::parser::packet_handler::TcpHeader>::serialize (148 samples, 2.38%)tp..<serde_json::value::ser::SerializeMap as serde::ser::SerializeStruct>::serialize_field (145 samples, 2.33%)<..serde::ser::SerializeMap::serialize_entry (144 samples, 2.32%)s..<serde_json::value::ser::SerializeMap as serde::ser::SerializeMap>::serialize_value (80 samples, 1.29%)serde_json::map::Map<alloc::string::String,serde_json::value::Value>::insert (67 samples, 1.08%)alloc::collections::btree::map::BTreeMap<K,V>::insert (67 samples, 1.08%)alloc::collections::btree::map::VacantEntry<K,V>::insert (30 samples, 0.48%)alloc::collections::btree::node::Root<K,V>::push_level (8 samples, 0.13%)<&mio::net::tcp::TcpStream as std::io::Write>::write (9 samples, 0.14%)<&mio::sys::unix::tcp::TcpStream as std::io::Write>::write (9 samples, 0.14%)<&std::net::tcp::TcpStream as std::io::Write>::write (9 samples, 0.14%)std::sys_common::net::TcpStream::write (9 samples, 0.14%)__libc_send (9 samples, 0.14%)<tokio_util::codec::framed::Fuse<T,U> as tokio::io::async_write::AsyncWrite>::poll_write (14 samples, 0.23%)<tokio_postgres::maybe_tls_stream::MaybeTlsStream<S,T> as tokio::io::async_write::AsyncWrite>::poll_write (14 samples, 0.23%)<tokio_postgres::socket::Socket as tokio::io::async_write::AsyncWrite>::poll_write (14 samples, 0.23%)<tokio::net::tcp::stream::TcpStream as tokio::io::async_write::AsyncWrite>::poll_write (14 samples, 0.23%)tokio::net::tcp::stream::TcpStream::poll_write_priv (14 samples, 0.23%)tokio_postgres::connection::Connection<S,T>::poll_flush (18 samples, 0.29%)<tokio_util::codec::framed::Framed<T,U> as futures_sink::Sink<I>>::poll_flush (18 samples, 0.29%)<tokio_util::codec::framed_write::FramedWrite2<T> as futures_sink::Sink<I>>::poll_flush (18 samples, 0.29%)<tokio_util::codec::framed::Framed<T,U> as futures_core::stream::Stream>::poll_next (11 samples, 0.18%)<tokio_util::codec::framed_read::FramedRead2<T> as futures_core::stream::Stream>::poll_next (11 samples, 0.18%)tokio::io::async_read::AsyncRead::poll_read_buf (11 samples, 0.18%)<tokio_util::codec::framed_write::FramedWrite2<T> as tokio::io::async_read::AsyncRead>::poll_read (9 samples, 0.14%)<tokio_util::codec::framed::Fuse<T,U> as tokio::io::async_read::AsyncRead>::poll_read (9 samples, 0.14%)<tokio_postgres::maybe_tls_stream::MaybeTlsStream<S,T> as tokio::io::async_read::AsyncRead>::poll_read (9 samples, 0.14%)<tokio_postgres::socket::Socket as tokio::io::async_read::AsyncRead>::poll_read (9 samples, 0.14%)<tokio::net::tcp::stream::TcpStream as tokio::io::async_read::AsyncRead>::poll_read (9 samples, 0.14%)tokio::net::tcp::stream::TcpStream::poll_read_priv (9 samples, 0.14%)tokio::io::poll_evented::PollEvented<E>::poll_read_ready (9 samples, 0.14%)tokio::io::registration::Registration::poll_read_ready (8 samples, 0.13%)tokio::io::registration::Registration::poll_ready (8 samples, 0.13%)tokio_postgres::connection::Connection<S,T>::poll_read (13 samples, 0.21%)tokio_postgres::connection::Connection<S,T>::poll_response (13 samples, 0.21%)bytes::bytes_mut::BytesMut::extend_from_slice (31 samples, 0.50%)core::intrinsics::copy_nonoverlapping (31 samples, 0.50%)__memmove_avx_unaligned_erms (31 samples, 0.50%)<bytes::bytes_mut::BytesMut as bytes::buf::buf_mut::BufMut>::put (41 samples, 0.66%)core::ptr::drop_in_place (7 samples, 0.11%)<tokio_postgres::codec::PostgresCodec as tokio_util::codec::encoder::Encoder<tokio_postgres::codec::FrontendMessage>>::encode (44 samples, 0.71%)postgres_protocol::message::frontend::CopyData<T>::write (43 samples, 0.69%)<tokio_util::codec::framed::Framed<T,U> as futures_sink::Sink<I>>::start_send (46 samples, 0.74%)<tokio_util::codec::framed_write::FramedWrite2<T> as futures_sink::Sink<I>>::start_send (46 samples, 0.74%)core::mem::drop (10 samples, 0.16%)core::ptr::drop_in_place (10 samples, 0.16%)alloc::alloc::box_free (10 samples, 0.16%)<alloc::alloc::Global as core::alloc::AllocRef>::dealloc (10 samples, 0.16%)alloc::alloc::dealloc (10 samples, 0.16%)futures_channel::mpsc::Receiver<T>::next_message (31 samples, 0.50%)futures_channel::mpsc::queue::Queue<T>::pop_spin (23 samples, 0.37%)futures_channel::mpsc::queue::Queue<T>::pop (19 samples, 0.31%)futures_util::stream::stream::StreamExt::poll_next_unpin (36 samples, 0.58%)<tokio_postgres::copy_in::CopyInReceiver as futures_core::stream::Stream>::poll_next (36 samples, 0.58%)futures_util::stream::stream::StreamExt::poll_next_unpin (35 samples, 0.56%)<futures_channel::mpsc::Receiver<T> as futures_core::stream::Stream>::poll_next (35 samples, 0.56%)<tokio_postgres::connection::Connection<S,T> as core::future::future::Future>::poll (122 samples, 1.96%)<..tokio_postgres::connection::Connection<S,T>::poll_message (122 samples, 1.96%)t..tokio_postgres::connection::Connection<S,T>::poll_write (85 samples, 1.37%)<alloc::collections::btree::map::IntoIter<K,V> as core::iter::traits::iterator::Iterator>::next (10 samples, 0.16%)alloc::collections::btree::navigate::<impl alloc::collections::btree::node::Handle<alloc::collections::btree::node::NodeRef<alloc::collections::btree::node::marker::Owned,K,V,alloc::collections::btree::node::marker::Leaf>,alloc::collections::btree::node::marker::Edge>>::next_unchecked (10 samples, 0.16%)alloc::collections::btree::navigate::replace (10 samples, 0.16%)alloc::collections::btree::navigate::<impl alloc::collections::btree::node::Handle<alloc::collections::btree::node::NodeRef<alloc::collections::btree::node::marker::Owned,K,V,alloc::collections::btree::node::marker::Leaf>,alloc::collections::btree::node::marker::Edge>>::next_unchecked::{{closure}} (10 samples, 0.16%)alloc::collections::btree::navigate::next_kv_unchecked_dealloc (13 samples, 0.21%)alloc::collections::btree::node::NodeRef<alloc::collections::btree::node::marker::Owned,K,V,alloc::collections::btree::node::marker::LeafOrInternal>::deallocate_and_ascend (7 samples, 0.11%)alloc::collections::btree::navigate::<impl alloc::collections::btree::node::Handle<alloc::collections::btree::node::NodeRef<alloc::collections::btree::node::marker::Owned,K,V,alloc::collections::btree::node::marker::Leaf>,alloc::collections::btree::node::marker::Edge>>::next_unchecked::{{closure}} (39 samples, 0.63%)<alloc::collections::btree::map::IntoIter<K,V> as core::iter::traits::iterator::Iterator>::next (41 samples, 0.66%)alloc::collections::btree::navigate::<impl alloc::collections::btree::node::Handle<alloc::collections::btree::node::NodeRef<alloc::collections::btree::node::marker::Owned,K,V,alloc::collections::btree::node::marker::Leaf>,alloc::collections::btree::node::marker::Edge>>::next_unchecked (41 samples, 0.66%)alloc::collections::btree::navigate::replace (41 samples, 0.66%)alloc::collections::btree::node::NodeRef<alloc::collections::btree::node::marker::Owned,K,V,alloc::collections::btree::node::marker::LeafOrInternal>::deallocate_and_ascend (12 samples, 0.19%)<alloc::alloc::Global as core::alloc::AllocRef>::dealloc (11 samples, 0.18%)alloc::alloc::dealloc (11 samples, 0.18%)_int_free (11 samples, 0.18%)<alloc::collections::btree::map::BTreeMap<K,V> as core::ops::drop::Drop>::drop (119 samples, 1.92%)<..core::mem::drop (117 samples, 1.88%)c..core::ptr::drop_in_place (117 samples, 1.88%)c..<alloc::collections::btree::map::IntoIter<K,V> as core::ops::drop::Drop>::drop (117 samples, 1.88%)<..core::mem::drop (47 samples, 0.76%)core::ptr::drop_in_place (47 samples, 0.76%)core::ptr::drop_in_place (41 samples, 0.66%)core::ptr::drop_in_place (39 samples, 0.63%)core::ptr::drop_in_place (39 samples, 0.63%)<alloc::raw_vec::RawVec<T,A> as core::ops::drop::Drop>::drop (39 samples, 0.63%)<alloc::alloc::Global as core::alloc::AllocRef>::dealloc (39 samples, 0.63%)alloc::alloc::dealloc (39 samples, 0.63%)_int_free (29 samples, 0.47%)malloc_consolidate (24 samples, 0.39%)unlink_chunk.constprop.0 (11 samples, 0.18%)_int_free (30 samples, 0.48%)<alloc::vec::Vec<T> as core::ops::drop::Drop>::drop (119 samples, 1.92%)<..core::ptr::drop_in_place (119 samples, 1.92%)c..core::ptr::drop_in_place (65 samples, 1.05%)core::mem::drop (295 samples, 4.75%)core::..<alloc::collections::btree::map::BTreeMap<K,V> as core::ops::drop::Drop>::drop (295 samples, 4.75%)<alloc..core::mem::drop (295 samples, 4.75%)core::..core::ptr::drop_in_place (295 samples, 4.75%)core::..<alloc::collections::btree::map::IntoIter<K,V> as core::ops::drop::Drop>::drop (295 samples, 4.75%)<alloc..core::mem::drop (281 samples, 4.52%)core:..core::ptr::drop_in_place (281 samples, 4.52%)core:..core::ptr::drop_in_place (132 samples, 2.13%)c..core::ptr::drop_in_place (131 samples, 2.11%)c..core::ptr::drop_in_place (12 samples, 0.19%)<alloc::raw_vec::RawVec<T,A> as core::ops::drop::Drop>::drop (12 samples, 0.19%)<alloc::alloc::Global as core::alloc::AllocRef>::dealloc (12 samples, 0.19%)alloc::alloc::dealloc (12 samples, 0.19%)_int_free (9 samples, 0.14%)core::ptr::drop_in_place (17 samples, 0.27%)core::ptr::drop_in_place (17 samples, 0.27%)core::ptr::drop_in_place (17 samples, 0.27%)_int_free (7 samples, 0.11%)<core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll (4,251 samples, 68.44%)<core::future::from_generator::GenFuture<T> as core::future::future::Future>::polltpcpr::main::{{closure}}::{{closure}} (4,250 samples, 68.43%)tpcpr::main::{{closure}}::{{closure}}tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut (4,252 samples, 68.46%)tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_muttokio::runtime::task::core::Core<T,S>::poll::{{closure}} (4,252 samples, 68.46%)tokio::runtime::task::core::Core<T,S>::poll::{{closure}}<std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once (4,263 samples, 68.64%)<std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_oncecore::ops::function::FnOnce::call_once (4,263 samples, 68.64%)core::ops::function::FnOnce::call_oncetokio::runtime::task::harness::Harness<T,S>::poll::{{closure}} (4,263 samples, 68.64%)tokio::runtime::task::harness::Harness<T,S>::poll::{{closure}}tokio::runtime::task::core::Core<T,S>::poll (4,259 samples, 68.57%)tokio::runtime::task::core::Core<T,S>::polltokio::runtime::task::core::Core<T,S>::drop_future_or_output (7 samples, 0.11%)tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut (7 samples, 0.11%)tokio::runtime::task::core::Core<T,S>::drop_future_or_output::{{closure}} (7 samples, 0.11%)std::panic::catch_unwind (4,267 samples, 68.70%)std::panic::catch_unwindstd::panicking::try (4,267 samples, 68.70%)std::panicking::trystd::panicking::try::do_call (4,267 samples, 68.70%)std::panicking::try::do_call_int_free (7 samples, 0.11%)tokio::runtime::task::core::Core<T,S>::release (9 samples, 0.14%)tokio::loom::std::unsafe_cell::UnsafeCell<T>::with (9 samples, 0.14%)tokio::runtime::task::core::Core<T,S>::release::{{closure}} (9 samples, 0.14%)tokio::runtime::thread_pool::worker::<impl tokio::runtime::task::Schedule for alloc::sync::Arc<tokio::runtime::thread_pool::worker::Worker>>::release (9 samples, 0.14%)tokio::macros::scoped_tls::ScopedKey<T>::with (9 samples, 0.14%)tokio::runtime::thread_pool::worker::<impl tokio::runtime::task::Schedule for alloc::sync::Arc<tokio::runtime::thread_pool::worker::Worker>>::release::{{closure}} (8 samples, 0.13%)tokio::runtime::task::harness::Harness<T,S>::complete (21 samples, 0.34%)tokio::runtime::task::harness::Harness<T,S>::poll (4,294 samples, 69.14%)tokio::runtime::task::harness::Harness<T,S>::polltokio::coop::budget (4,304 samples, 69.30%)tokio::coop::budgetstd::thread::local::LocalKey<T>::with (4,304 samples, 69.30%)std::thread::local::LocalKey<T>::withstd::thread::local::LocalKey<T>::try_with (4,304 samples, 69.30%)std::thread::local::LocalKey<T>::try_withtokio::coop::budget::{{closure}} (4,304 samples, 69.30%)tokio::coop::budget::{{closure}}tokio::runtime::thread_pool::worker::Context::run_task::{{closure}} (4,303 samples, 69.28%)tokio::runtime::thread_pool::worker::Context::run_task::{{closure}}tokio::runtime::task::Notified<S>::run (4,296 samples, 69.17%)tokio::runtime::task::Notified<S>::runtokio::runtime::task::raw::RawTask::poll (4,296 samples, 69.17%)tokio::runtime::task::raw::RawTask::poll<tokio::runtime::park::Unparker as tokio::park::Unpark>::unpark (8 samples, 0.13%)tokio::runtime::park::Inner::unpark (8 samples, 0.13%)tokio::runtime::thread_pool::worker::Context::run_task (4,327 samples, 69.67%)tokio::runtime::thread_pool::worker::Context::run_tasktokio::runtime::thread_pool::worker::Core::transition_from_searching (19 samples, 0.31%)tokio::runtime::thread_pool::worker::Shared::transition_worker_from_searching (19 samples, 0.31%)tokio::runtime::thread_pool::worker::Shared::notify_parked (19 samples, 0.31%)tokio::runtime::thread_pool::idle::Idle::worker_to_notify (7 samples, 0.11%)tokio::runtime::thread_pool::worker::Core::next_task (14 samples, 0.23%)core::sync::atomic::AtomicU32::compare_exchange (10 samples, 0.16%)core::sync::atomic::atomic_compare_exchange (10 samples, 0.16%)core::sync::atomic::AtomicU32::load (7 samples, 0.11%)core::sync::atomic::atomic_load (7 samples, 0.11%)tokio::runtime::queue::Steal<T>::steal_into (25 samples, 0.40%)tokio::runtime::queue::Steal<T>::steal_into2 (24 samples, 0.39%)tokio::runtime::thread_pool::worker::Core::steal_work (34 samples, 0.55%)std::panic::catch_unwind (4,594 samples, 73.97%)std::panic::catch_unwindstd::panicking::try (4,594 samples, 73.97%)std::panicking::trystd::panicking::try::do_call (4,594 samples, 73.97%)std::panicking::try::do_call<std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once (4,594 samples, 73.97%)<std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_oncestd::thread::Builder::spawn_unchecked::{{closure}}::{{closure}} (4,594 samples, 73.97%)std::thread::Builder::spawn_unchecked::{{closure}}::{{closure}}std::sys_common::backtrace::__rust_begin_short_backtrace (4,594 samples, 73.97%)std::sys_common::backtrace::__rust_begin_short_backtracetokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}} (4,594 samples, 73.97%)tokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}}tokio::runtime::handle::Handle::enter (4,594 samples, 73.97%)tokio::runtime::handle::Handle::entertokio::runtime::context::enter (4,594 samples, 73.97%)tokio::runtime::context::entertokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}}::{{closure}} (4,594 samples, 73.97%)tokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}}::{{closure}}tokio::runtime::blocking::pool::Inner::run (4,594 samples, 73.97%)tokio::runtime::blocking::pool::Inner::runtokio::runtime::task::Notified<S>::run (4,594 samples, 73.97%)tokio::runtime::task::Notified<S>::runtokio::runtime::task::raw::RawTask::poll (4,594 samples, 73.97%)tokio::runtime::task::raw::RawTask::polltokio::runtime::task::harness::Harness<T,S>::poll (4,594 samples, 73.97%)tokio::runtime::task::harness::Harness<T,S>::pollstd::panic::catch_unwind (4,594 samples, 73.97%)std::panic::catch_unwindstd::panicking::try (4,594 samples, 73.97%)std::panicking::trystd::panicking::try::do_call (4,594 samples, 73.97%)std::panicking::try::do_call<std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once (4,594 samples, 73.97%)<std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_oncecore::ops::function::FnOnce::call_once (4,594 samples, 73.97%)core::ops::function::FnOnce::call_oncetokio::runtime::task::harness::Harness<T,S>::poll::{{closure}} (4,594 samples, 73.97%)tokio::runtime::task::harness::Harness<T,S>::poll::{{closure}}tokio::runtime::task::core::Core<T,S>::poll (4,594 samples, 73.97%)tokio::runtime::task::core::Core<T,S>::polltokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut (4,594 samples, 73.97%)tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_muttokio::runtime::task::core::Core<T,S>::poll::{{closure}} (4,594 samples, 73.97%)tokio::runtime::task::core::Core<T,S>::poll::{{closure}}<tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll (4,594 samples, 73.97%)<tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::polltokio::runtime::thread_pool::worker::Launch::launch::{{closure}} (4,594 samples, 73.97%)tokio::runtime::thread_pool::worker::Launch::launch::{{closure}}tokio::runtime::thread_pool::worker::run (4,594 samples, 73.97%)tokio::runtime::thread_pool::worker::runtokio::macros::scoped_tls::ScopedKey<T>::set (4,594 samples, 73.97%)tokio::macros::scoped_tls::ScopedKey<T>::settokio::runtime::thread_pool::worker::run::{{closure}} (4,594 samples, 73.97%)tokio::runtime::thread_pool::worker::run::{{closure}}tokio::runtime::thread_pool::worker::Context::run (4,594 samples, 73.97%)tokio::runtime::thread_pool::worker::Context::run__GI___clone (4,596 samples, 74.00%)__GI___clonestart_thread (4,596 samples, 74.00%)start_threadstd::sys::unix::thread::Thread::new::thread_start (4,596 samples, 74.00%)std::sys::unix::thread::Thread::new::thread_start<alloc::boxed::Box<F> as core::ops::function::FnOnce<A>>::call_once (4,596 samples, 74.00%)<alloc::boxed::Box<F> as core::ops::function::FnOnce<A>>::call_once<alloc::boxed::Box<F> as core::ops::function::FnOnce<A>>::call_once (4,596 samples, 74.00%)<alloc::boxed::Box<F> as core::ops::function::FnOnce<A>>::call_oncecore::ops::function::FnOnce::call_once{{vtable-shim}} (4,596 samples, 74.00%)core::ops::function::FnOnce::call_once{{vtable-shim}}std::thread::Builder::spawn_unchecked::{{closure}} (4,596 samples, 74.00%)std::thread::Builder::spawn_unchecked::{{closure}}bytes::buf::buf_mut::BufMut::put (12 samples, 0.19%)serde::ser::SerializeMap::serialize_entry (7 samples, 0.11%)serde::ser::impls::<impl serde::ser::Serialize for &T>::serialize (18 samples, 0.29%)serde::ser::Serializer::collect_seq (97 samples, 1.56%)<serde_json::ser::Compound<W,F> as serde::ser::SerializeSeq>::serialize_element (79 samples, 1.27%)serde_json::ser::Formatter::begin_array_value (18 samples, 0.29%)std::io::Write::write_all (18 samples, 0.29%)<bytes::buf::ext::writer::Writer<B> as std::io::Write>::write (18 samples, 0.29%)tokio-runtime-w (6,167 samples, 99.29%)tokio-runtime-wtokio::runtime::context::enter (10 samples, 0.16%)tokio::runtime::Runtime::block_on::{{closure}} (10 samples, 0.16%)tokio::runtime::thread_pool::ThreadPool::block_on (10 samples, 0.16%)tokio::runtime::enter::Enter::block_on (10 samples, 0.16%)tokio::runtime::thread_pool::ThreadPool::block_on (11 samples, 0.18%)tokio::runtime::enter::Enter::block_on (11 samples, 0.18%)tokio::coop::budget (11 samples, 0.18%)std::thread::local::LocalKey<T>::with (11 samples, 0.18%)std::thread::local::LocalKey<T>::try_with (11 samples, 0.18%)tokio::coop::budget::{{closure}} (11 samples, 0.18%)tokio::runtime::enter::Enter::block_on::{{closure}} (11 samples, 0.18%)<core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll (11 samples, 0.18%)tpcpr::main::{{closure}} (11 samples, 0.18%)tokio::task::spawn::spawn (11 samples, 0.18%)tokio::runtime::spawner::Spawner::spawn (11 samples, 0.18%)tokio::runtime::thread_pool::Spawner::spawn (11 samples, 0.18%)[unknown] (23 samples, 0.37%)__GI___clone (8 samples, 0.13%)all (6,211 samples, 100%)tpcpr (44 samples, 0.71%)_start (11 samples, 0.18%)_dl_start (7 samples, 0.11%)_dl_sysdep_start (7 samples, 0.11%) \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 59ae3a5..dc92db9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,13 @@ use tokio_postgres::{Error, NoTls}; use tokio_postgres::binary_copy::{BinaryCopyInWriter}; use futures::{pin_mut}; use tokio::task; +use tokio::sync::mpsc; + +extern crate jemallocator; + +#[global_allocator] +static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; + /* conditionals */ const FROM_FILE: bool = false; @@ -64,26 +71,66 @@ async fn main() -> Result<(), Error> { match config.is_device { FROM_FILE => { for (_pcap_file, _pcap_info) in pcap_map.iter() { - println!("{:?}: {:?}", &_pcap_file, &_pcap_info); + //println!("{:?}: {:?}", &_pcap_file, &_pcap_info); - let v: Vec = - parser::parse(&_pcap_file, &config.filter, &config.regex_filter); - let packets_serialized = serializer::serialize_packets(v); + + + /* MPSC channeled serialization */ + // This is just patched up atm, mix between std::sync::mpsc and tokio::sync::mpsc + let (qry_data, h1) = parser::mpsc_parser(_pcap_file.to_owned(), config.filter.to_owned(), config.regex_filter.to_owned()).await; + // let (data_serialized, h2) = serializer::mpsc_serialize(qry_data); + // let packets_serialized = serializer::mpsc_collect_serialized(data_serialized); + // let _r1 = h1.join().unwrap(); + // let _r2 = h2.join().unwrap(); + + /* Deprecated */ +// let v: Vec = +// parser::parse(&_pcap_file, &config.filter, &config.regex_filter); +// let len = v.len(); + + /* tokio mpsc channel */ + let (tx, mut rx) = mpsc::channel(100); + // let pcap_file = _pcap_file.clone(); + // let filter = config.filter.clone(); + // let regex_filter = config.regex_filter.clone(); + // let join_handle: task::JoinHandle> = task::spawn( async move { + // parser::tokio_parse(pcap_file, &filter, ®ex_filter).await + // }); + // let v = join_handle.await.unwrap(); + // for packet in v.into_iter(){ + + for packet in qry_data { + let mut tx = tx.clone(); + + tokio::spawn( async move { + //println!("serializing!number {:?}", i); + + let packet_serialized = serializer::tokio_serialize(packet).await; + tx.send(packet_serialized).await.unwrap(); + }); + + } + drop(tx); let sink = client.copy_in("COPY json_dump(packet) from STDIN BINARY").await.unwrap(); let writer = BinaryCopyInWriter::new(sink, &[Type::JSON]); let join = task::spawn( async move { pin_mut!(writer); - for pack in packets_serialized { - writer.as_mut().write(&[&pack]).await.unwrap(); + //for pack in packets_serialized { + while let Some(res) = rx.recv().await { + writer.as_mut().write(&[&res]).await.unwrap(); + drop(res); // Reminder: write_raw() behavior is very strange, so it's write() for now. // writer.as_mut().write_raw(chunk.into_iter().map(|p| p as &dyn ToSql).collect()).await.unwrap(); - } + } + + //thread::sleep(time::Duration::from_millis(3000)); writer.finish().await.unwrap(); }); assert!(join.await.is_ok()); - + + let _r1 = h1.join().unwrap(); // TODO: MPSC channel // let mut v = Vec::::with_capacity(100000); // v.extend(parser::parse(&_pcap_file, &config.filter, &config.regex_filter)); diff --git a/src/main_bkp b/src/main_bkp new file mode 100644 index 0000000..ae1065f --- /dev/null +++ b/src/main_bkp @@ -0,0 +1,211 @@ +extern crate serde_json; +extern crate tokio; +extern crate tokio_postgres; + +mod configure; +mod parser; +mod serializer; +//use postgres::{Client, NoTls}; +//use postgres::types::ToSql; +//use postgres::binary_copy::{BinaryCopyInWriter}; +use tokio_postgres::types::{Type, ToSql}; +use tokio_postgres::{Error, NoTls}; +use tokio_postgres::binary_copy::{BinaryCopyInWriter}; +use futures::{pin_mut}; +use tokio::task; +use tokio::sync::mpsc; +//use std::thread::{spawn, JoinHandle}; +//use std::sync::mpsc::{channel, Receiver}; +//use std::sync::mpsc; +//use std::alloc::System; +// +//#[global_allocator] +//static A: System = System; + +/* conditionals */ +const FROM_FILE: bool = false; +const FROM_DEVICE: bool = true; +//const NON_CHUNKED: bool = true; +//const CHUNKED: bool = false; + +fn query_string(insert_max: &usize, table_name: &str) -> String { + let mut insert_template = String::with_capacity(insert_max * 8 + 96); + insert_template.push_str(&*format!("INSERT INTO {} (packet) Values ", table_name)); + + for insert in 0..insert_max - 1 { + insert_template.push_str(&*format!("(${}), ", insert + 1)); + } + insert_template.push_str(&*format!("(${})", insert_max)); + + insert_template +} + +#[tokio::main(core_threads = 4)] +async fn main() -> Result<(), Error> { + /* Init values from file */ + let config: configure::Config = configure::from_json_file().unwrap(); + let pcap_map = configure::map_pcap_dir(&config.pcap_dir).unwrap(); + + // TODO: Create db table with pcap file hashes + // TODO: hash file metadata, so its state is comparable with future file updates and can be written to a db table (and read e.g. after system crash) + // This db table should include UUIDs as primary keys, so it can be joined effectively with past and future runs. + // TODO: Use inotify crate to update pcap_map according to files created while parser is running + + /* db connection */ + let (client, connection) = tokio_postgres::connect(&config.connection, NoTls).await?; + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + client + .execute(&*format!("DROP TABLE IF EXISTS {}", &config.tablename), &[]) + .await?; + client + .execute( + &*format!( + "CREATE TABLE {} ( ID serial NOT NULL PRIMARY KEY, packet json NOT NULL)", + &config.tablename + ), + &[], + ) + .await?; + + /* device or file input */ + match config.is_device { + FROM_FILE => { + for (_pcap_file, _pcap_info) in pcap_map.iter() { + //println!("{:?}: {:?}", &_pcap_file, &_pcap_info); + + + + /* MPSC channeled serialization */ + // let (qry_data, h1) = parser::mpsc_parser(_pcap_file.to_owned(), config.filter.to_owned(), config.regex_filter.to_owned()); + // let (data_serialized, h2) = serializer::mpsc_serialize(qry_data); + // let packets_serialized = serializer::mpsc_collect_serialized(data_serialized); + // let _r1 = h1.join().unwrap(); + // let _r2 = h2.join().unwrap(); + + /* This is serializing data without mpsc, which results in higher memory consumption, it's faster but 12GB main memory is needed */ +// let v: Vec = +// parser::parse(&_pcap_file, &config.filter, &config.regex_filter); +// let len = v.len(); + + /* tokio mpsc channel */ + let (tx, mut rx) = mpsc::channel(1000); + let pcap_file = _pcap_file.clone(); + let filter = config.filter.clone(); + let regex_filter = config.regex_filter.clone(); + let join_handle: task::JoinHandle> = task::spawn( async move { + parser::tokio_parse(pcap_file, &filter, ®ex_filter).await + }); + let v = join_handle.await.unwrap(); + for packet in v.into_iter(){ + let mut tx = tx.clone(); + + tokio::spawn( async move { + //println!("serializing!number {:?}", i); + + let packet_serialized = serializer::tokio_serialize(packet).await; + tx.send(packet_serialized).await.unwrap(); + }); + + } + drop(tx); + // let mut packets_serialized: Vec = Vec::new(); + // while let Some(res) = rx.recv().await { + // //println!("collecting"); + // packets_serialized.push(res); + // //let packets_serialized = serializer::serialize_packets(v); + // } + + let sink = client.copy_in("COPY json_dump(packet) from STDIN BINARY").await.unwrap(); + let writer = BinaryCopyInWriter::new(sink, &[Type::JSON]); + + let join = task::spawn( async move { + pin_mut!(writer); + //for pack in packets_serialized { + while let Some(res) = rx.recv().await { + writer.as_mut().write(&[&res]).await.unwrap(); + drop(res); + // Reminder: write_raw() behavior is very strange, so it's write() for now. + // writer.as_mut().write_raw(chunk.into_iter().map(|p| p as &dyn ToSql).collect()).await.unwrap(); + } + + //thread::sleep(time::Duration::from_millis(3000)); + writer.finish().await.unwrap(); + }); + assert!(join.await.is_ok()); + + // TODO: MPSC channel + // let mut v = Vec::::with_capacity(100000); + // v.extend(parser::parse(&_pcap_file, &config.filter, &config.regex_filter)); + // let mut packets_serialized = Vec::::with_capacity(100000); + // packets_serialized.extend(serializer::serialize_packets(v)); +// Reminder: If COPY doesn't cut it and INSERT is the way to go, uncomment and use following logic inside FROM_FILE +// /* Do chunks and query data */ +// let chunker = (&packets_serialized.len() < &config.insert_max) && (0 < packets_serialized.len()) ; +// match chunker { +// NON_CHUNKED => { +// let insert_str = query_string(&packets_serialized.len(), &config.tablename); +// let statement = client.prepare(&insert_str).await?; +// client +// .query_raw( +// &statement, +// packets_serialized.iter().map(|p| p as &dyn ToSql), +// ) +// .await?; +// } +// CHUNKED => { +// let insert_str = query_string(&config.insert_max, &config.tablename); +// let statement = client.prepare(&insert_str).await?; +// +// for chunk in packets_serialized.chunks_exact(config.insert_max) { +// client +// .query_raw(&statement, chunk.iter().map(|p| p as &dyn ToSql)) +// .await?; +// } +// let remainder_len = packets_serialized +// .chunks_exact(config.insert_max) +// .remainder() +// .len(); +// if 0 < remainder_len { +// let rem_str = query_string(&remainder_len, &config.tablename); +// let statement = client.prepare(&rem_str).await?; +// client +// .query_raw( +// &statement, +// packets_serialized +// .chunks_exact(config.insert_max) +// .remainder() +// .iter() +// .map(|p| p as &dyn ToSql), +// ) +// .await?; +// } +// } +// } + } + } + FROM_DEVICE => { + let insert_str = query_string(&config.insert_max, &config.tablename); + let statement = client.prepare(&insert_str).await?; + loop { + let v: Vec = parser::parse_device( + &config.device, + &config.filter, + &config.insert_max, + &config.regex_filter, + ); + let packets_serialized = serializer::serialize_packets(v); + client + .query_raw( + &statement, + packets_serialized.iter().map(|p| p as &dyn ToSql), + ) + .await?; + } + } + } + Ok(()) +} diff --git a/src/parser.json b/src/parser.json index 446a598..1e5f5f0 100644 --- a/src/parser.json +++ b/src/parser.json @@ -1,6 +1,6 @@ { "insert_max": 20000, - "filter": "ip6 && tcp", + "filter": "tcp", "regex_filter": "(?:http|https)[[:punct:]]+[[:alnum:]]+[[:punct:]][[:alnum:]]+[[:punct:]](?:com|de|org|net)", "from_device": false, "parse_device": "enp7s0", @@ -8,6 +8,6 @@ "pcap_dir": "../target", "database_tablename": "json_dump", "database_user": "postgres", - "database_host": "localhost", - "database_password": "password" + "database_host": "192.168.0.11", + "database_password": "docker" } diff --git a/src/parser/mod.rs b/src/parser/mod.rs index 29c81d2..d80dea8 100644 --- a/src/parser/mod.rs +++ b/src/parser/mod.rs @@ -7,8 +7,11 @@ use regex::bytes::Regex; use std::convert::TryInto; use std::str; use serde::Serialize; -//use std::thread::{spawn, JoinHandle}; -//use std::sync::mpsc::{channel, Receiver}; +use std::thread::{spawn, JoinHandle}; +use std::sync::mpsc::{channel, Receiver}; +use tokio::sync::mpsc; +use tokio::stream::{self, StreamExt}; +use tokio::task; /* protocol ids, LittleEndian */ const ETH_P_IPV6: usize = 0xDD86; @@ -188,6 +191,7 @@ impl QryData { /* Regex parse _complete_ package */ fn flag_carnage(re: &Regex, payload: &[u8]) -> Option { let mut flags: String = String::new(); + if !re.as_str().is_empty() { for mat in re.find_iter(payload) { // TODO: Test benchmark format! vs. push_str() // flags.push_str(&format!("{} ",std::str::from_utf8(mat.as_bytes()).unwrap())); @@ -195,15 +199,17 @@ fn flag_carnage(re: &Regex, payload: &[u8]) -> Option { flags.push_str(std::str::from_utf8(mat.as_bytes()).unwrap()); flags.push_str(";"); } - if flags.len() > 0{ - println!("{:?}", flags); } - match 0 < flags.len() { - false => None, - true => Some(flags), + if !flags.is_empty(){ +// println!("{:?}", flags); + } + match flags.is_empty() { + true => None, + false => Some(flags), } } +#[allow(dead_code)] pub fn parse(parse_file: &std::path::Path, filter_str: &str, regex_filter: &str) -> Vec { let mut v: Vec = Vec::new(); let mut cap = Capture::from_file(parse_file).unwrap(); @@ -220,21 +226,22 @@ pub fn parse(parse_file: &std::path::Path, filter_str: &str, regex_filter: &str) }; me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64; - me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex overhead is between 4-9% --single threaded-- on complete packet [u8] data + me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex parser on complete packet [u8] data //v.push(me.clone()); - - v.push(QryData { - id: 0, - time: me.time, - data: me.data, - ether_header: me.ether_header, - ipv4_header: me.ipv4_header, - ipv6_header: me.ipv6_header, - tcp_header: me.tcp_header, - udp_header: me.udp_header, - arp_header: me.arp_header, - reg_res: me.reg_res, - }); + v.push(me.clone()); +/* TODO: Will clone() call destructors correctly and the method below won't? */ +// v.push(QryData { +// id: 0, +// time: me.time, +// data: me.data, +// ether_header: me.ether_header, +// ipv4_header: me.ipv4_header, +// ipv6_header: me.ipv6_header, +// tcp_header: me.tcp_header, +// udp_header: me.udp_header, +// arp_header: me.arp_header, +// reg_res: me.reg_res, +// }); } v } @@ -272,3 +279,106 @@ pub fn parse_device( } v } + +#[allow(dead_code)] +pub async fn mpsc_parser (parse_file: std::path::PathBuf, filter_str: String, regex_filter: String) -> (Receiver, JoinHandle<()>) { + let (sender, receiver) = channel(); + let handle = spawn( move || { + let mut cap = Capture::from_file(parse_file).unwrap(); + Capture::filter(&mut cap, &filter_str).unwrap(); + let linktype = cap.get_datalink(); + //println!("{:?}", &linktype); + let re = Regex::new(®ex_filter).unwrap(); + while let Ok(packet) = cap.next() { + let mut me = QryData::new(); + match linktype { + Linktype(1) => me.encap_en10mb(packet.data).unwrap(), // I reversed encapsulation/linktype bytes in pcap/pcapng file by looking at https://www.tcpdump.org/linktypes.html + Linktype(12) => me.encap_raw(packet.data).unwrap(), // Either this source + my implementation is wrong or pcap crate sucks + _ => (), + }; + + me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64; + me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex parser on complete packet [u8] data + //v.push(me.clone()); + if sender.send(me.clone()).is_err(){ + break; + } + } + }); + (receiver, handle) +} + +#[allow(dead_code)] +pub async fn tokio_parse <'a> (parse_file: std::path::PathBuf, filter_str: &'a str, regex_filter: &'a str ) -> Vec { + let mut v: Vec = Vec::new(); + let mut cap = Capture::from_file(parse_file).unwrap(); + Capture::filter(&mut cap, &filter_str).unwrap(); + let linktype = cap.get_datalink(); +// println!("{:?}", &linktype); + let re = Regex::new(®ex_filter).unwrap(); + + while let Ok(packet) = cap.next() { + let mut me = QryData::new(); + match linktype { + Linktype(1) => me.encap_en10mb(packet.data).unwrap(), // I reversed encapsulation/linktype bytes in pcap/pcapng file by looking at https://www.tcpdump.org/linktypes.html + Linktype(12) => me.encap_raw(packet.data).unwrap(), // Either this source + my implementation is wrong or pcap crate sucks + _ => (), + }; + + me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64; + me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex parser on complete packet [u8] data + v.push(me.clone()); +// drop(me); + //std::mem::replace(&mut me, QryData::new()); + } + v +} + +//pub async fn tokio_parse(parse_file: &std::path::Path, filter_str: &str, regex_filter: &str) -> tokio::stream::Iter> { +// //let mut v: Vec = Vec::new(); +// let mut cap = Capture::from_file(parse_file).unwrap(); +// Capture::filter(&mut cap, &filter_str).unwrap(); +// let linktype = cap.get_datalink(); +// let re = Regex::new(regex_filter).unwrap(); +// while let Ok(packet) = cap.next() { +// let mut me = QryData::new(); +// match linktype { +// Linktype(1) => me.encap_en10mb(packet.data).unwrap(), // I reversed encapsulation/linktype bytes in pcap/pcapng file by looking at https://www.tcpdump.org/linktypes.html +// Linktype(12) => me.encap_raw(packet.data).unwrap(), // Either this source + my implementation is wrong or pcap crate sucks +// _ => (), +// }; +// +// me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64; +// me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex parser on complete packet [u8] data +// //v.push(me.clone()); +// tx.send +// } +// let x = stream::iter(&mut v).collect().await +//} + +//pub async fn tokio_parse (parse_file: std::path::PathBuf, filter_str: &'static str, regex_filter: &'static str) { +// //let mut v: Vec = Vec::new(); +// let (mut tx, rx) = mpsc::channel(1000); +// +// tokio::spawn (async move { +// let mut cap = Capture::from_file(parse_file).unwrap(); +// Capture::filter(&mut cap, &filter_str).unwrap(); +// let linktype = cap.get_datalink(); +// let re = Regex::new(regex_filter).unwrap(); +// while let Ok(packet) = cap.next() { +// let mut me = QryData::new(); +// match linktype { +// Linktype(1) => me.encap_en10mb(packet.data).unwrap(), // I reversed encapsulation/linktype bytes in pcap/pcapng file by looking at https://www.tcpdump.org/linktypes.html +// Linktype(12) => me.encap_raw(packet.data).unwrap(), // Either this source + my implementation is wrong or pcap crate sucks +// _ => (), +// }; +// +// me.time = (packet.header.ts.tv_usec as f64 / 1000000.0) + packet.header.ts.tv_sec as f64; +// me.reg_res = Some(flag_carnage(&re, packet.data)).unwrap(); // Regex parser on complete packet [u8] data +// //v.push(me.clone()); +// tx.send(me.clone()).await.unwrap(); +// } +// +// }); +// +//} diff --git a/src/serializer/mod.rs b/src/serializer/mod.rs index 7ba6dd2..b76ec42 100644 --- a/src/serializer/mod.rs +++ b/src/serializer/mod.rs @@ -1,8 +1,12 @@ extern crate serde_json; use crate::parser; use rayon::prelude::*; -//use serde::ser::{Serialize, SerializeStruct, Serializer}; +use std::thread::{spawn, JoinHandle}; +use std::sync::mpsc::{channel, Receiver}; + +// This is not needed atm +//use serde::ser::{Serialize, SerializeStruct, Serializer}; //impl Serialize for parser::QryData { // fn serialize(&self, serializer: S) -> Result // where @@ -30,7 +34,6 @@ pub fn serialize_packets(v: Vec) -> Vec { .map(|x| serde_json::to_value(x).unwrap()) .collect(); // let packets_serialized: Vec = v.par_iter().map(|x| json!(x)).collect(); - packets_serialized } @@ -46,3 +49,32 @@ pub fn serialize_packets_as_string(v: Vec) -> Vec) -> (Receiver, JoinHandle<()>) { + let (sender, receiver) = channel(); + let handle = spawn( move || { + for p in packet{ + let serialized = serde_json::to_value(p).unwrap(); + if sender.send(serialized).is_err(){ + return; + } + } + }); + (receiver, handle) +} + +#[allow(dead_code)] +pub fn mpsc_collect_serialized( packet: Receiver ) -> Vec { + let mut packets_serialized: Vec = Vec::new(); + for p in packet { + packets_serialized.push(p); + } + packets_serialized +} + + +pub async fn tokio_serialize(packet: parser::QryData) -> serde_json::Value { + serde_json::to_value(packet).unwrap() +} +