Skip to content

Commit

Permalink
remove gateway_indexer_fees source
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Nov 5, 2024
1 parent e684e55 commit 674ae61
Showing 1 changed file with 2 additions and 39 deletions.
41 changes: 2 additions & 39 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,7 @@ async fn run() -> anyhow::Result<()> {
.collect()
};

let assignment = assign_partitions(
&consumer,
&["gateway_queries", "gateway_indexer_fees"],
start_timestamp,
)
.await?;
let assignment = assign_partitions(&consumer, &["gateway_queries"], start_timestamp).await?;

let (source_msg_tx, mut source_msg_rx) = mpsc::channel::<SourceMsg>(1024);
let mut partition_consumers: Vec<JoinHandle<()>> = assignment
Expand Down Expand Up @@ -180,18 +175,6 @@ async fn handle_source_msg(
tracing::info!(timestamp = print_unix_millis(t).unwrap(), "flushed");
}
}
SourceMsg::IndexerFees {
aggregation_timestamp,
signer,
receiver,
fees_grt,
} => {
if aggregation_timestamp >= start_timestamp {
let key = IndexerFeesKey { signer, receiver };
let agg = aggregations.entry(aggregation_timestamp).or_default();
*agg.indexer_fees.entry(key).or_default() += fees_grt;
}
}
SourceMsg::ClientQuery {
timestamp,
aggregation_timestamp,
Expand Down Expand Up @@ -266,11 +249,7 @@ fn spawn_partition_consumer(
let msg = SourceMsg::decode(msg, &legacy_source_offsets)?;
let aggregation_timestamp = match &msg {
SourceMsg::Flush { .. } => unreachable!(), // unreachable
SourceMsg::IndexerFees {
aggregation_timestamp,
..
}
| SourceMsg::ClientQuery {
SourceMsg::ClientQuery {
aggregation_timestamp,
..
} => *aggregation_timestamp,
Expand Down Expand Up @@ -366,13 +345,6 @@ enum SourceMsg {
partition_id: String,
aggregation_timestamp: i64,
},
// TODO: remove after migration
IndexerFees {
aggregation_timestamp: i64,
signer: Address,
receiver: Address,
fees_grt: f64,
},
ClientQuery {
timestamp: i64,
aggregation_timestamp: i64,
Expand Down Expand Up @@ -404,15 +376,6 @@ impl SourceMsg {
data: decoded,
})
}
"gateway_indexer_fees" => {
let decoded = IndexerFeesProtobuf::decode(payload).context("decode protobuf")?;
Ok(SourceMsg::IndexerFees {
aggregation_timestamp,
signer: Address::from_slice(&decoded.signer)?,
receiver: Address::from_slice(&decoded.receiver)?,
fees_grt: decoded.fees_grt,
})
}
topic => anyhow::bail!("unexpected topic: {topic}"),
}
}
Expand Down

0 comments on commit 674ae61

Please sign in to comment.