mirror of
https://gitlab.com/famedly/conduit.git
synced 2025-01-14 21:46:29 +03:00
fix: jaeger support
This commit is contained in:
parent
2b2bfb91c2
commit
6d5e54a66b
9 changed files with 65 additions and 29 deletions
31
Cargo.lock
generated
31
Cargo.lock
generated
|
@ -419,6 +419,7 @@ dependencies = [
|
||||||
"tower-http",
|
"tower-http",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-flame",
|
"tracing-flame",
|
||||||
|
"tracing-opentelemetry",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
"trust-dns-resolver",
|
"trust-dns-resolver",
|
||||||
]
|
]
|
||||||
|
@ -574,6 +575,19 @@ dependencies = [
|
||||||
"zeroize",
|
"zeroize",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "dashmap"
|
||||||
|
version = "5.4.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"hashbrown",
|
||||||
|
"lock_api",
|
||||||
|
"once_cell",
|
||||||
|
"parking_lot_core",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "data-encoding"
|
name = "data-encoding"
|
||||||
version = "2.3.2"
|
version = "2.3.2"
|
||||||
|
@ -1573,6 +1587,7 @@ version = "0.18.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22"
|
checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"fnv",
|
||||||
"futures-channel",
|
"futures-channel",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"indexmap",
|
"indexmap",
|
||||||
|
@ -1590,6 +1605,8 @@ checksum = "1ca41c4933371b61c2a2f214bf16931499af4ec90543604ec828f7a625c09113"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"crossbeam-channel",
|
"crossbeam-channel",
|
||||||
|
"dashmap",
|
||||||
|
"fnv",
|
||||||
"futures-channel",
|
"futures-channel",
|
||||||
"futures-executor",
|
"futures-executor",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
|
@ -2891,6 +2908,20 @@ dependencies = [
|
||||||
"tracing-core",
|
"tracing-core",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tracing-opentelemetry"
|
||||||
|
version = "0.18.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "21ebb87a95ea13271332df069020513ab70bdb5637ca42d6e492dc3bbbad48de"
|
||||||
|
dependencies = [
|
||||||
|
"once_cell",
|
||||||
|
"opentelemetry",
|
||||||
|
"tracing",
|
||||||
|
"tracing-core",
|
||||||
|
"tracing-log",
|
||||||
|
"tracing-subscriber",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tracing-subscriber"
|
name = "tracing-subscriber"
|
||||||
version = "0.3.16"
|
version = "0.3.16"
|
||||||
|
|
|
@ -69,6 +69,7 @@ tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
|
||||||
tracing-flame = "0.2.0"
|
tracing-flame = "0.2.0"
|
||||||
opentelemetry = { version = "0.18.0", features = ["rt-tokio"] }
|
opentelemetry = { version = "0.18.0", features = ["rt-tokio"] }
|
||||||
opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio"] }
|
opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio"] }
|
||||||
|
tracing-opentelemetry = "0.18.0"
|
||||||
lru-cache = "0.1.2"
|
lru-cache = "0.1.2"
|
||||||
rusqlite = { version = "0.28.0", optional = true, features = ["bundled"] }
|
rusqlite = { version = "0.28.0", optional = true, features = ["bundled"] }
|
||||||
parking_lot = { version = "0.12.1", optional = true }
|
parking_lot = { version = "0.12.1", optional = true }
|
||||||
|
|
|
@ -873,7 +873,7 @@ async fn sync_helper(
|
||||||
|
|
||||||
let since_state_ids = match since_shortstatehash {
|
let since_state_ids = match since_shortstatehash {
|
||||||
Some(s) => services().rooms.state_accessor.state_full_ids(s).await?,
|
Some(s) => services().rooms.state_accessor.state_full_ids(s).await?,
|
||||||
None => BTreeMap::new(),
|
None => HashMap::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let left_event_id = match services().rooms.state_accessor.room_state_get_id(
|
let left_event_id = match services().rooms.state_accessor.room_state_get_id(
|
||||||
|
|
|
@ -1,7 +1,4 @@
|
||||||
use std::{
|
use std::{collections::HashMap, sync::Arc};
|
||||||
collections::{BTreeMap, HashMap},
|
|
||||||
sync::Arc,
|
|
||||||
};
|
|
||||||
|
|
||||||
use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
|
use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
@ -9,7 +6,7 @@ use ruma::{events::StateEventType, EventId, RoomId};
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl service::rooms::state_accessor::Data for KeyValueDatabase {
|
impl service::rooms::state_accessor::Data for KeyValueDatabase {
|
||||||
async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>> {
|
async fn state_full_ids(&self, shortstatehash: u64) -> Result<HashMap<u64, Arc<EventId>>> {
|
||||||
let full_state = services()
|
let full_state = services()
|
||||||
.rooms
|
.rooms
|
||||||
.state_compressor
|
.state_compressor
|
||||||
|
@ -17,7 +14,7 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
|
||||||
.pop()
|
.pop()
|
||||||
.expect("there is always one layer")
|
.expect("there is always one layer")
|
||||||
.1;
|
.1;
|
||||||
let mut result = BTreeMap::new();
|
let mut result = HashMap::new();
|
||||||
let mut i = 0;
|
let mut i = 0;
|
||||||
for compressed in full_state.into_iter() {
|
for compressed in full_state.into_iter() {
|
||||||
let parsed = services()
|
let parsed = services()
|
||||||
|
|
24
src/main.rs
24
src/main.rs
|
@ -26,7 +26,6 @@ use http::{
|
||||||
header::{self, HeaderName},
|
header::{self, HeaderName},
|
||||||
Method, StatusCode, Uri,
|
Method, StatusCode, Uri,
|
||||||
};
|
};
|
||||||
use opentelemetry::trace::{FutureExt, Tracer};
|
|
||||||
use ruma::api::{
|
use ruma::api::{
|
||||||
client::{
|
client::{
|
||||||
error::{Error as RumaError, ErrorBody, ErrorKind},
|
error::{Error as RumaError, ErrorBody, ErrorKind},
|
||||||
|
@ -93,14 +92,29 @@ async fn main() {
|
||||||
if config.allow_jaeger {
|
if config.allow_jaeger {
|
||||||
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
|
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
|
||||||
let tracer = opentelemetry_jaeger::new_agent_pipeline()
|
let tracer = opentelemetry_jaeger::new_agent_pipeline()
|
||||||
|
.with_auto_split_batch(true)
|
||||||
|
.with_service_name("conduit")
|
||||||
.install_batch(opentelemetry::runtime::Tokio)
|
.install_batch(opentelemetry::runtime::Tokio)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
|
||||||
|
|
||||||
let span = tracer.start("conduit");
|
let filter_layer = match EnvFilter::try_new(&config.log) {
|
||||||
start.with_current_context().await;
|
Ok(s) => s,
|
||||||
drop(span);
|
Err(e) => {
|
||||||
|
eprintln!(
|
||||||
|
"It looks like your log config is invalid. The following error occurred: {}",
|
||||||
|
e
|
||||||
|
);
|
||||||
|
EnvFilter::try_new("warn").unwrap()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
println!("exporting");
|
let subscriber = tracing_subscriber::Registry::default()
|
||||||
|
.with(filter_layer)
|
||||||
|
.with(telemetry);
|
||||||
|
tracing::subscriber::set_global_default(subscriber).unwrap();
|
||||||
|
start.await;
|
||||||
|
println!("exporting remaining spans");
|
||||||
opentelemetry::global::shutdown_tracer_provider();
|
opentelemetry::global::shutdown_tracer_provider();
|
||||||
} else {
|
} else {
|
||||||
let registry = tracing_subscriber::Registry::default();
|
let registry = tracing_subscriber::Registry::default();
|
||||||
|
|
|
@ -15,7 +15,6 @@ pub struct Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn get_cached_eventid_authchain<'a>(
|
pub fn get_cached_eventid_authchain<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
key: &[u64],
|
key: &[u64],
|
||||||
|
|
|
@ -7,7 +7,7 @@ use ruma::{
|
||||||
RoomVersionId,
|
RoomVersionId,
|
||||||
};
|
};
|
||||||
use std::{
|
use std::{
|
||||||
collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet},
|
collections::{hash_map, BTreeMap, HashMap, HashSet},
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
sync::{Arc, RwLock, RwLockWriteGuard},
|
sync::{Arc, RwLock, RwLockWriteGuard},
|
||||||
time::{Duration, Instant, SystemTime},
|
time::{Duration, Instant, SystemTime},
|
||||||
|
@ -553,7 +553,7 @@ impl Service {
|
||||||
let mut auth_chain_sets = Vec::with_capacity(extremity_sstatehashes.len());
|
let mut auth_chain_sets = Vec::with_capacity(extremity_sstatehashes.len());
|
||||||
|
|
||||||
for (sstatehash, prev_event) in extremity_sstatehashes {
|
for (sstatehash, prev_event) in extremity_sstatehashes {
|
||||||
let mut leaf_state: BTreeMap<_, _> = services()
|
let mut leaf_state: HashMap<_, _> = services()
|
||||||
.rooms
|
.rooms
|
||||||
.state_accessor
|
.state_accessor
|
||||||
.state_full_ids(sstatehash)
|
.state_full_ids(sstatehash)
|
||||||
|
@ -660,7 +660,7 @@ impl Service {
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let mut state: BTreeMap<_, Arc<EventId>> = BTreeMap::new();
|
let mut state: HashMap<_, Arc<EventId>> = HashMap::new();
|
||||||
for (pdu, _) in state_vec {
|
for (pdu, _) in state_vec {
|
||||||
let state_key = pdu.state_key.clone().ok_or_else(|| {
|
let state_key = pdu.state_key.clone().ok_or_else(|| {
|
||||||
Error::bad_database("Found non-state pdu in state events.")
|
Error::bad_database("Found non-state pdu in state events.")
|
||||||
|
@ -672,10 +672,10 @@ impl Service {
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
match state.entry(shortstatekey) {
|
match state.entry(shortstatekey) {
|
||||||
btree_map::Entry::Vacant(v) => {
|
hash_map::Entry::Vacant(v) => {
|
||||||
v.insert(Arc::from(&*pdu.event_id));
|
v.insert(Arc::from(&*pdu.event_id));
|
||||||
}
|
}
|
||||||
btree_map::Entry::Occupied(_) => return Err(
|
hash_map::Entry::Occupied(_) => return Err(
|
||||||
Error::bad_database("State event's type and state_key combination exists multiple times."),
|
Error::bad_database("State event's type and state_key combination exists multiple times."),
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,4 @@
|
||||||
use std::{
|
use std::{collections::HashMap, sync::Arc};
|
||||||
collections::{BTreeMap, HashMap},
|
|
||||||
sync::Arc,
|
|
||||||
};
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use ruma::{events::StateEventType, EventId, RoomId};
|
use ruma::{events::StateEventType, EventId, RoomId};
|
||||||
|
@ -12,7 +9,7 @@ use crate::{PduEvent, Result};
|
||||||
pub trait Data: Send + Sync {
|
pub trait Data: Send + Sync {
|
||||||
/// Builds a StateMap by iterating over all keys that start
|
/// Builds a StateMap by iterating over all keys that start
|
||||||
/// with state_hash, this gives the full state for the given state_hash.
|
/// with state_hash, this gives the full state for the given state_hash.
|
||||||
async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>>;
|
async fn state_full_ids(&self, shortstatehash: u64) -> Result<HashMap<u64, Arc<EventId>>>;
|
||||||
|
|
||||||
async fn state_full(
|
async fn state_full(
|
||||||
&self,
|
&self,
|
||||||
|
|
|
@ -1,8 +1,5 @@
|
||||||
mod data;
|
mod data;
|
||||||
use std::{
|
use std::{collections::HashMap, sync::Arc};
|
||||||
collections::{BTreeMap, HashMap},
|
|
||||||
sync::Arc,
|
|
||||||
};
|
|
||||||
|
|
||||||
pub use data::Data;
|
pub use data::Data;
|
||||||
use ruma::{events::StateEventType, EventId, RoomId};
|
use ruma::{events::StateEventType, EventId, RoomId};
|
||||||
|
@ -16,7 +13,8 @@ pub struct Service {
|
||||||
impl Service {
|
impl Service {
|
||||||
/// Builds a StateMap by iterating over all keys that start
|
/// Builds a StateMap by iterating over all keys that start
|
||||||
/// with state_hash, this gives the full state for the given state_hash.
|
/// with state_hash, this gives the full state for the given state_hash.
|
||||||
pub async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>> {
|
#[tracing::instrument(skip(self))]
|
||||||
|
pub async fn state_full_ids(&self, shortstatehash: u64) -> Result<HashMap<u64, Arc<EventId>>> {
|
||||||
self.db.state_full_ids(shortstatehash).await
|
self.db.state_full_ids(shortstatehash).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,7 +37,6 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
|
||||||
#[tracing::instrument(skip(self))]
|
|
||||||
pub fn state_get(
|
pub fn state_get(
|
||||||
&self,
|
&self,
|
||||||
shortstatehash: u64,
|
shortstatehash: u64,
|
||||||
|
|
Loading…
Reference in a new issue