Skip to content

Commit

Permalink
Showcasing direct entity output from mapper
Browse files Browse the repository at this point in the history
  • Loading branch information
maoueh committed Jul 5, 2023
1 parent a525fac commit 922e633
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 392 deletions.
229 changes: 108 additions & 121 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ anyhow = "1"
prost = "0.11"
prost-types = "0.11"
substreams = "0.5"
substreams-database-change = "1.1"
substreams-entity-change = "1.2"
substreams-database-change = "1.2"
substreams-entity-change = "1.3"
substreams-sink-kv = "0.1.1"
substreams-ethereum = "0.9"
chrono = { version = "0.4", features = [ "std" ], default-features = false }
Expand All @@ -24,3 +24,4 @@ chrono = { version = "0.4", features = [ "std" ], default-features = false }
lto = true
opt-level = 's'
strip = "debuginfo"

12 changes: 0 additions & 12 deletions proto/block_meta.proto

This file was deleted.

46 changes: 0 additions & 46 deletions src/db_out.rs

This file was deleted.

44 changes: 0 additions & 44 deletions src/graph_out.rs

This file was deleted.

20 changes: 0 additions & 20 deletions src/kv_out.rs

This file was deleted.

98 changes: 21 additions & 77 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,86 +1,30 @@
mod block_timestamp;
#[path = "db_out.rs"]
mod db;
#[path = "graph_out.rs"]
mod graph;
#[path = "kv_out.rs"]
mod kv;
mod pb;
mod schema;

use block_timestamp::BlockTimestamp;
use pb::eth::block_meta::v1::BlockMeta;
use substreams::errors::Error;
use substreams::store::{DeltaProto, StoreSetIfNotExistsProto, StoreSetProto};
use substreams::{prelude::*, store};
use substreams::Hex;
use substreams_database_change::pb::database::DatabaseChanges;
use substreams_entity_change::pb::entity::EntityChanges;
use substreams_ethereum::pb::eth::v2::{self as eth};
use substreams_sink_kv::pb::sf::substreams::sink::kv::v1::KvOperations;

#[substreams::handlers::store]
fn store_block_meta_start(blk: eth::Block, s: StoreSetIfNotExistsProto<BlockMeta>) {
let (timestamp, meta) = block_to_block_meta(blk);

s.set_if_not_exists(meta.number, timestamp.start_of_day_key(), &meta);
s.set_if_not_exists(meta.number, timestamp.start_of_month_key(), &meta);
}

#[substreams::handlers::store]
fn store_block_meta_end(blk: eth::Block, s: StoreSetProto<BlockMeta>) {
let (timestamp, meta) = block_to_block_meta(blk);

s.set(meta.number, timestamp.end_of_day_key(), &meta);
s.set(meta.number, timestamp.end_of_month_key(), &meta);
}
use substreams_database_change::tables::Tables;
use substreams_ethereum::pb::eth::v2::Block;

#[substreams::handlers::map]
pub fn db_out(
block_meta_start: store::Deltas<DeltaProto<BlockMeta>>,
block_meta_end: store::Deltas<DeltaProto<BlockMeta>>,
) -> Result<DatabaseChanges, Error> {
let mut tables = substreams_database_change::tables::Tables::new();
db::block_meta_to_database_changes(&mut tables, block_meta_start);
db::block_meta_to_database_changes(&mut tables, block_meta_end);
pub fn db_out(blk: Block) -> Result<DatabaseChanges, Error> {
let mut tables = Tables::new();
add_block_entity(&mut tables, blk);

Ok(tables.to_database_changes())
}

#[substreams::handlers::map]
pub fn kv_out(
block_meta_start: store::Deltas<DeltaProto<BlockMeta>>,
block_meta_end: store::Deltas<DeltaProto<BlockMeta>>,
) -> Result<KvOperations, Error> {
let mut kv_ops: KvOperations = Default::default();
kv::block_meta_to_kv_ops(&mut kv_ops, block_meta_start);
kv::block_meta_to_kv_ops(&mut kv_ops, block_meta_end);

Ok(kv_ops)
}

#[substreams::handlers::map]
pub fn graph_out(
block_meta_start: store::Deltas<DeltaProto<BlockMeta>>,
block_meta_end: store::Deltas<DeltaProto<BlockMeta>>,
) -> Result<EntityChanges, Error> {
let mut tables = substreams_entity_change::tables::Tables::new();
graph::block_meta_to_entities_changes(&mut tables, block_meta_start);
graph::block_meta_to_entities_changes(&mut tables, block_meta_end);

Ok(tables.to_entity_changes())
}

fn block_to_block_meta(blk: eth::Block) -> (BlockTimestamp, BlockMeta) {
let timestamp = BlockTimestamp::from_block(&blk);
let header = blk.header.unwrap();

(
timestamp,
BlockMeta {
number: blk.number,
hash: blk.hash,
parent_hash: header.parent_hash,
timestamp: Some(header.timestamp.unwrap()),
},
)
fn add_block_entity(tables: &mut Tables, blk: Block) {
let block_hash = Hex(&blk.hash).to_string();

tables
.create_row("block_meta", &block_hash)
.set("number", blk.number)
.set("hash", &block_hash)
.set(
"parent_hash",
Hex(&blk.header.as_ref().unwrap().parent_hash).to_string(),
)
.set(
"timestamp",
blk.header.as_ref().unwrap().timestamp.as_ref().unwrap(),
);
}
14 changes: 0 additions & 14 deletions src/pb/eth.block_meta.v1.rs

This file was deleted.

10 changes: 0 additions & 10 deletions src/pb/mod.rs

This file was deleted.

Empty file removed src/schema.rs
Empty file.
49 changes: 3 additions & 46 deletions substreams.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,62 +4,19 @@ package:
version: v0.4.3

imports:
database_change: https://github.com/streamingfast/substreams-database-change/releases/download/v1.1.3/substreams-database-change-v1.1.3.spkg
entities_change: https://github.com/streamingfast/substreams-entity-change/releases/download/v1.2.1/substreams-entity-change-v1.2.1.spkg
database_change: https://github.com/streamingfast/substreams-sink-database-changes/releases/download/v1.2.0/substreams-database-change-v1.2.0.spkg
entities_change: https://github.com/streamingfast/substreams-sink-entity-changes/releases/download/v1.3.0/substreams-sink-entity-changes-v1.3.0.spkg
kv_operations: https://github.com/streamingfast/substreams-sink-kv/releases/download/v2.1.5/substreams-sink-kv-v2.1.5.spkg

protobuf:
files:
- block_meta.proto
importPaths:
- ./proto

binaries:
default:
type: wasm/rust-v1
file: ./target/wasm32-unknown-unknown/release/substreams.wasm

modules:
- name: store_block_meta_start
kind: store
updatePolicy: set_if_not_exists
valueType: proto:eth.block_meta.v1.BlockMeta
inputs:
- source: sf.ethereum.type.v2.Block

- name: store_block_meta_end
kind: store
updatePolicy: set
valueType: proto:eth.block_meta.v1.BlockMeta
inputs:
- source: sf.ethereum.type.v2.Block

- name: db_out
kind: map
inputs:
- store: store_block_meta_start
mode: deltas
- store: store_block_meta_end
mode: deltas
- source: sf.ethereum.type.v2.Block
output:
type: proto:sf.substreams.sink.database.v1.DatabaseChanges

- name: kv_out
kind: map
inputs:
- store: store_block_meta_start
mode: deltas
- store: store_block_meta_end
mode: deltas
output:
type: proto:sf.substreams.sink.kv.v1.KVOperations

- name: graph_out
kind: map
inputs:
- store: store_block_meta_start
mode: deltas
- store: store_block_meta_end
mode: deltas
output:
type: proto:sf.substreams.entity.v1.EntityChanges

0 comments on commit 922e633

Please sign in to comment.