From 6d5e54a66b96ab504eeb6cca03499fb03761dcb6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Timo=20K=C3=B6sters?= <timo@koesters.xyz>
Date: Sun, 18 Dec 2022 06:37:03 +0100
Subject: [PATCH] fix: jaeger support

---
 Cargo.lock                                    | 31 +++++++++++++++++++
 Cargo.toml                                    |  1 +
 src/api/client_server/sync.rs                 |  2 +-
 .../key_value/rooms/state_accessor.rs         |  9 ++----
 src/main.rs                                   | 24 +++++++++++---
 src/service/rooms/auth_chain/mod.rs           |  1 -
 src/service/rooms/event_handler/mod.rs        | 10 +++---
 src/service/rooms/state_accessor/data.rs      |  7 ++---
 src/service/rooms/state_accessor/mod.rs       |  9 ++----
 9 files changed, 65 insertions(+), 29 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index a659dec0..bb5943a0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -419,6 +419,7 @@ dependencies = [
  "tower-http",
  "tracing",
  "tracing-flame",
+ "tracing-opentelemetry",
  "tracing-subscriber",
  "trust-dns-resolver",
 ]
@@ -574,6 +575,19 @@ dependencies = [
  "zeroize",
 ]
 
+[[package]]
+name = "dashmap"
+version = "5.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
+dependencies = [
+ "cfg-if",
+ "hashbrown",
+ "lock_api",
+ "once_cell",
+ "parking_lot_core",
+]
+
 [[package]]
 name = "data-encoding"
 version = "2.3.2"
@@ -1573,6 +1587,7 @@ version = "0.18.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22"
 dependencies = [
+ "fnv",
  "futures-channel",
  "futures-util",
  "indexmap",
@@ -1590,6 +1605,8 @@ checksum = "1ca41c4933371b61c2a2f214bf16931499af4ec90543604ec828f7a625c09113"
 dependencies = [
  "async-trait",
  "crossbeam-channel",
+ "dashmap",
+ "fnv",
  "futures-channel",
  "futures-executor",
  "futures-util",
@@ -2891,6 +2908,20 @@ dependencies = [
  "tracing-core",
 ]
 
+[[package]]
+name = "tracing-opentelemetry"
+version = "0.18.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "21ebb87a95ea13271332df069020513ab70bdb5637ca42d6e492dc3bbbad48de"
+dependencies = [
+ "once_cell",
+ "opentelemetry",
+ "tracing",
+ "tracing-core",
+ "tracing-log",
+ "tracing-subscriber",
+]
+
 [[package]]
 name = "tracing-subscriber"
 version = "0.3.16"
diff --git a/Cargo.toml b/Cargo.toml
index 87102c0c..737799d3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -69,6 +69,7 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
 tracing-flame = "0.2.0"
 opentelemetry = { version = "0.18.0", features = ["rt-tokio"] }
 opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio"] }
+tracing-opentelemetry = "0.18.0"
 lru-cache = "0.1.2"
 rusqlite = { version = "0.28.0", optional = true, features = ["bundled"] }
 parking_lot = { version = "0.12.1", optional = true }
diff --git a/src/api/client_server/sync.rs b/src/api/client_server/sync.rs
index 43ca238a..568a23ce 100644
--- a/src/api/client_server/sync.rs
+++ b/src/api/client_server/sync.rs
@@ -873,7 +873,7 @@ async fn sync_helper(
 
         let since_state_ids = match since_shortstatehash {
             Some(s) => services().rooms.state_accessor.state_full_ids(s).await?,
-            None => BTreeMap::new(),
+            None => HashMap::new(),
         };
 
         let left_event_id = match services().rooms.state_accessor.room_state_get_id(
diff --git a/src/database/key_value/rooms/state_accessor.rs b/src/database/key_value/rooms/state_accessor.rs
index 70e59acb..0f0c0dc7 100644
--- a/src/database/key_value/rooms/state_accessor.rs
+++ b/src/database/key_value/rooms/state_accessor.rs
@@ -1,7 +1,4 @@
-use std::{
-    collections::{BTreeMap, HashMap},
-    sync::Arc,
-};
+use std::{collections::HashMap, sync::Arc};
 
 use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
 use async_trait::async_trait;
@@ -9,7 +6,7 @@ use ruma::{events::StateEventType, EventId, RoomId};
 
 #[async_trait]
 impl service::rooms::state_accessor::Data for KeyValueDatabase {
-    async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>> {
+    async fn state_full_ids(&self, shortstatehash: u64) -> Result<HashMap<u64, Arc<EventId>>> {
         let full_state = services()
             .rooms
             .state_compressor
@@ -17,7 +14,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
             .pop()
             .expect("there is always one layer")
             .1;
-        let mut result = BTreeMap::new();
+        let mut result = HashMap::new();
         let mut i = 0;
         for compressed in full_state.into_iter() {
             let parsed = services()
diff --git a/src/main.rs b/src/main.rs
index 013c4de5..fa33c094 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -26,7 +26,6 @@ use http::{
     header::{self, HeaderName},
     Method, StatusCode, Uri,
 };
-use opentelemetry::trace::{FutureExt, Tracer};
 use ruma::api::{
     client::{
         error::{Error as RumaError, ErrorBody, ErrorKind},
@@ -93,14 +92,29 @@ async fn main() {
     if config.allow_jaeger {
         opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
         let tracer = opentelemetry_jaeger::new_agent_pipeline()
+            .with_auto_split_batch(true)
+            .with_service_name("conduit")
             .install_batch(opentelemetry::runtime::Tokio)
             .unwrap();
+        let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
 
-        let span = tracer.start("conduit");
-        start.with_current_context().await;
-        drop(span);
+        let filter_layer = match EnvFilter::try_new(&config.log) {
+            Ok(s) => s,
+            Err(e) => {
+                eprintln!(
+                    "It looks like your log config is invalid. The following error occurred: {}",
+                    e
+                );
+                EnvFilter::try_new("warn").unwrap()
+            }
+        };
 
-        println!("exporting");
+        let subscriber = tracing_subscriber::Registry::default()
+            .with(filter_layer)
+            .with(telemetry);
+        tracing::subscriber::set_global_default(subscriber).unwrap();
+        start.await;
+        println!("exporting remaining spans");
         opentelemetry::global::shutdown_tracer_provider();
     } else {
         let registry = tracing_subscriber::Registry::default();
diff --git a/src/service/rooms/auth_chain/mod.rs b/src/service/rooms/auth_chain/mod.rs
index d3b6e401..39636045 100644
--- a/src/service/rooms/auth_chain/mod.rs
+++ b/src/service/rooms/auth_chain/mod.rs
@@ -15,7 +15,6 @@ pub struct Service {
 }
 
 impl Service {
-    #[tracing::instrument(skip(self))]
     pub fn get_cached_eventid_authchain<'a>(
         &'a self,
         key: &[u64],
diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs
index 3c49349f..0bba61c6 100644
--- a/src/service/rooms/event_handler/mod.rs
+++ b/src/service/rooms/event_handler/mod.rs
@@ -7,7 +7,7 @@ use ruma::{
     RoomVersionId,
 };
 use std::{
-    collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet},
+    collections::{hash_map, BTreeMap, HashMap, HashSet},
     pin::Pin,
     sync::{Arc, RwLock, RwLockWriteGuard},
     time::{Duration, Instant, SystemTime},
@@ -553,7 +553,7 @@ impl Service {
                 let mut auth_chain_sets = Vec::with_capacity(extremity_sstatehashes.len());
 
                 for (sstatehash, prev_event) in extremity_sstatehashes {
-                    let mut leaf_state: BTreeMap<_, _> = services()
+                    let mut leaf_state: HashMap<_, _> = services()
                         .rooms
                         .state_accessor
                         .state_full_ids(sstatehash)
@@ -660,7 +660,7 @@ impl Service {
                         )
                         .await;
 
-                    let mut state: BTreeMap<_, Arc<EventId>> = BTreeMap::new();
+                    let mut state: HashMap<_, Arc<EventId>> = HashMap::new();
                     for (pdu, _) in state_vec {
                         let state_key = pdu.state_key.clone().ok_or_else(|| {
                             Error::bad_database("Found non-state pdu in state events.")
@@ -672,10 +672,10 @@ impl Service {
                         )?;
 
                         match state.entry(shortstatekey) {
-                            btree_map::Entry::Vacant(v) => {
+                            hash_map::Entry::Vacant(v) => {
                                 v.insert(Arc::from(&*pdu.event_id));
                             }
-                            btree_map::Entry::Occupied(_) => return Err(
+                            hash_map::Entry::Occupied(_) => return Err(
                                 Error::bad_database("State event's type and state_key combination exists multiple times."),
                             ),
                         }
diff --git a/src/service/rooms/state_accessor/data.rs b/src/service/rooms/state_accessor/data.rs
index 340b19c3..f3ae3c21 100644
--- a/src/service/rooms/state_accessor/data.rs
+++ b/src/service/rooms/state_accessor/data.rs
@@ -1,7 +1,4 @@
-use std::{
-    collections::{BTreeMap, HashMap},
-    sync::Arc,
-};
+use std::{collections::HashMap, sync::Arc};
 
 use async_trait::async_trait;
 use ruma::{events::StateEventType, EventId, RoomId};
@@ -12,7 +9,7 @@ use crate::{PduEvent, Result};
 pub trait Data: Send + Sync {
     /// Builds a StateMap by iterating over all keys that start
     /// with state_hash, this gives the full state for the given state_hash.
-    async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>>;
+    async fn state_full_ids(&self, shortstatehash: u64) -> Result<HashMap<u64, Arc<EventId>>>;
 
     async fn state_full(
         &self,
diff --git a/src/service/rooms/state_accessor/mod.rs b/src/service/rooms/state_accessor/mod.rs
index 1a9c4a9e..87d99368 100644
--- a/src/service/rooms/state_accessor/mod.rs
+++ b/src/service/rooms/state_accessor/mod.rs
@@ -1,8 +1,5 @@
 mod data;
-use std::{
-    collections::{BTreeMap, HashMap},
-    sync::Arc,
-};
+use std::{collections::HashMap, sync::Arc};
 
 pub use data::Data;
 use ruma::{events::StateEventType, EventId, RoomId};
@@ -16,7 +13,8 @@ pub struct Service {
 impl Service {
     /// Builds a StateMap by iterating over all keys that start
     /// with state_hash, this gives the full state for the given state_hash.
-    pub async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>> {
+    #[tracing::instrument(skip(self))]
+    pub async fn state_full_ids(&self, shortstatehash: u64) -> Result<HashMap<u64, Arc<EventId>>> {
         self.db.state_full_ids(shortstatehash).await
     }
 
@@ -39,7 +37,6 @@ impl Service {
     }
 
     /// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
-    #[tracing::instrument(skip(self))]
     pub fn state_get(
         &self,
         shortstatehash: u64,