Skip to content

Commit

Permalink
aggregate client fees
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Nov 6, 2024
1 parent ca61e47 commit abfe4f3
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 13 deletions.
66 changes: 53 additions & 13 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use serde::Deserialize;
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use titorelli::{
kafka::{assign_partitions, fetch_partition_ids, latest_messages},
messages::{ClientQueryProtobuf, IndexerFeesHourlyProtobuf, IndexerFeesProtobuf},
messages::{
ClientFeesHourlyProtobuf, ClientFeesProtobuf, ClientQueryProtobuf,
IndexerFeesHourlyProtobuf, IndexerFeesProtobuf,
},
print_unix_millis,
};
use tokio::{sync::mpsc, task::JoinHandle};
Expand Down Expand Up @@ -218,7 +221,11 @@ async fn handle_source_msg(
}

async fn latest_sink_timestamp(consumer: &StreamConsumer) -> anyhow::Result<Option<i64>> {
let latest_messages = latest_messages(consumer, &["gateway_indexer_fees_hourly"]).await?;
let latest_messages = latest_messages(
consumer,
&["gateway_client_fees_hourly", "gateway_indexer_fees_hourly"],
)
.await?;
let timestamp = latest_messages
.into_iter()
.map(|msg| -> anyhow::Result<i64> {
Expand Down Expand Up @@ -290,7 +297,33 @@ async fn record_aggregations(
timestamp: i64,
aggregations: Aggregations,
) -> anyhow::Result<()> {
let Aggregations { indexer_fees } = aggregations;
let record_key = timestamp.to_be_bytes();
let Aggregations {
client_fees,
indexer_fees,
} = aggregations;

let record_payload = ClientFeesHourlyProtobuf {
timestamp,
aggregations: client_fees
.into_iter()
.map(|(k, v)| ClientFeesProtobuf {
api_key: k.api_key,
deployment: k.deployment,
fees_usd: v,
})
.collect(),
}
.encode_to_vec();
let record = rdkafka::producer::FutureRecord::to("gateway_client_fees_hourly")
.key(&record_key)
.payload(&record_payload);
producer
.send(record, Duration::from_secs(30))
.await
.map_err(|(err, _)| err)
.context("send aggregation record")?;

let record_payload = IndexerFeesHourlyProtobuf {
timestamp,
aggregations: indexer_fees
Expand All @@ -303,9 +336,8 @@ async fn record_aggregations(
.collect(),
}
.encode_to_vec();
let key = timestamp.to_be_bytes();
let record = rdkafka::producer::FutureRecord::to("gateway_indexer_fees_hourly")
.key(&key)
.key(&record_key)
.payload(&record_payload);
producer
.send(record, Duration::from_secs(30))
Expand Down Expand Up @@ -342,6 +374,13 @@ impl std::fmt::Debug for Address {
}
}

fn deployment_cid(bytes: &[u8]) -> String {
let mut buf = [0_u8; 34];
buf[0..2].copy_from_slice(&[0x12, 0x20]);
buf[2..].copy_from_slice(bytes);
bs58::encode(buf).into_string()
}

#[derive(Debug)]
enum SourceMsg {
Flush {
Expand Down Expand Up @@ -386,9 +425,16 @@ impl SourceMsg {

#[derive(Debug, Default)]
struct Aggregations {
client_fees: BTreeMap<ClientFeesKey, f64>,
indexer_fees: BTreeMap<IndexerFeesKey, f64>,
}

#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
struct ClientFeesKey {
api_key: String,
deployment: String,
}

#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
struct IndexerFeesKey {
signer: Address,
Expand All @@ -400,12 +446,6 @@ pub fn legacy_messages(
client_query: ClientQueryProtobuf,
) -> (serde_json::Value, Vec<serde_json::Value>) {
let address_hex = |bytes| format!("{:?}", Address::from_slice(bytes).unwrap());
let deployment_str = |bytes| {
let mut buf = [0_u8; 34];
buf[0..2].copy_from_slice(&[0x12, 0x20]);
buf[2..].copy_from_slice(bytes);
bs58::encode(buf).into_string()
};
let first_indexer_query = client_query.indexer_queries.first();
let client_request_payload = serde_json::json!({
"gateway_id": address_hex(&client_query.receipt_signer),
Expand All @@ -416,7 +456,7 @@ pub fn legacy_messages(
"timestamp": timestamp,
"api_key": &client_query.api_key,
"user": "0x0000000000000000000000000000000000000000",
"deployment": first_indexer_query.map(|i| deployment_str(&i.deployment)).unwrap_or_default(),
"deployment": first_indexer_query.map(|i| deployment_cid(&i.deployment)).unwrap_or_default(),
"indexed_chain": first_indexer_query.map(|i| i.indexed_chain.clone()).unwrap_or_default(),
"network": first_indexer_query.map(|i| i.indexed_chain.clone()).unwrap_or_default(),
"response_time_ms": client_query.response_time_ms,
Expand Down Expand Up @@ -461,7 +501,7 @@ pub fn legacy_messages(
"timestamp": timestamp,
"api_key": &client_query.api_key,
"user_address": "0x0000000000000000000000000000000000000000",
"deployment": deployment_str(&i.deployment),
"deployment": deployment_cid(&i.deployment),
"network": &i.indexed_chain,
"indexed_chain": &i.indexed_chain,
"indexer": address_hex(&i.indexer),
Expand Down
19 changes: 19 additions & 0 deletions src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,25 @@ pub struct IndexerQueryProtobuf {
pub legacy_scalar: Option<bool>,
}

#[derive(prost::Message)]
pub struct ClientFeesHourlyProtobuf {
/// start timestamp for aggregation, in unix milliseconds
#[prost(int64, tag = "1")]
pub timestamp: i64,
#[prost(message, repeated, tag = "2")]
pub aggregations: Vec<ClientFeesProtobuf>,
}

#[derive(prost::Message)]
pub struct ClientFeesProtobuf {
#[prost(string, tag = "1")]
pub api_key: String,
#[prost(string, tag = "2")]
pub deployment: String,
#[prost(double, tag = "3")]
pub fees_usd: f64,
}

#[derive(prost::Message)]
pub struct IndexerFeesHourlyProtobuf {
/// start timestamp for aggregation, in unix milliseconds
Expand Down

0 comments on commit abfe4f3

Please sign in to comment.