Skip to content

Commit

Permalink
Fix wrong state of previous commit and now preparing for 0.5.0
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/db_out.rs
#	src/graph_out.rs
#	src/lib.rs
#	substreams.yaml
  • Loading branch information
maoueh committed Jul 5, 2023
2 parents 922e633 + 7a3503d commit 7854dea
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 23 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## v0.5.0

* Updated to latest version of crates `substreams-entity-change`, `substreams-database-change`, `substreams` and `substreams-ethereum`.

## v0.4.3

* Updated `graph_out` to use new `substreams_entity_change::tables:Tables` abstraction (output format stays the same).
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "substreams-eth-block-meta"
version = "0.4.3"
version = "0.5.0"
description = "Substream Ethereum Block Meta Substreams tracking block at day/month boundaries"
edition = "2021"
repository = "https://github.com/streamingfast/substreams-eth-block-meta"
Expand Down
12 changes: 12 additions & 0 deletions proto/block_meta.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
syntax = "proto3";

package eth.block_meta.v1;

import "google/protobuf/timestamp.proto";

message BlockMeta {
uint64 number = 1;
bytes hash = 2;
bytes parent_hash = 3;
google.protobuf.Timestamp timestamp = 4;
}
43 changes: 43 additions & 0 deletions src/db_out.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use crate::{block_timestamp::BlockTimestamp, pb::eth::block_meta::v1::BlockMeta};
use substreams::{
store::{DeltaProto, Deltas},
Hex,
};
use substreams_database_change::tables::Tables;

pub fn add_block_meta_to_tables(tables: &mut Tables, deltas: Deltas<DeltaProto<BlockMeta>>) {
use substreams::pb::substreams::store_delta::Operation;

for delta in deltas {
match delta.operation {
Operation::Create => push_create(
tables,
&delta.key,
BlockTimestamp::from_key(&delta.key),
delta.new_value,
),
Operation::Update => push_update(tables, &delta.key, delta.new_value),
Operation::Delete => panic!("delete should not happen"),
x => panic!("unsupported opeation {:?}", x),
}
}
}

fn push_create(tables: &mut Tables, key: &str, timestamp: BlockTimestamp, value: BlockMeta) {
tables
.create_row("block_meta", key)
.set("at", timestamp)
.set("number", value.number)
.set("hash", Hex(value.hash))
.set("parent_hash", Hex(value.parent_hash))
.set("timestamp", value.timestamp.unwrap());
}

fn push_update(tables: &mut Tables, key: &str, value: BlockMeta) {
tables
.update_row("block_meta", key)
.set("number", value.number)
.set("hash", Hex(value.hash))
.set("parent_hash", Hex(value.parent_hash))
.set("timestamp", value.timestamp.unwrap());
}
41 changes: 41 additions & 0 deletions src/graph_out.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use substreams::store::{DeltaProto, Deltas};
use substreams_entity_change::tables::Tables;

use crate::{block_timestamp::BlockTimestamp, pb::eth::block_meta::v1::BlockMeta};

pub fn add_block_meta_to_tables(tables: &mut Tables, deltas: Deltas<DeltaProto<BlockMeta>>) {
use substreams::pb::substreams::store_delta::Operation;

for delta in deltas {
match delta.operation {
Operation::Create => push_create(
tables,
&delta.key,
BlockTimestamp::from_key(&delta.key),
delta.new_value,
),
Operation::Update => push_update(tables, &delta.key, delta.new_value),
Operation::Delete => todo!(),
x => panic!("unsupported opeation {:?}", x),
}
}
}

fn push_create(tables: &mut Tables, key: &str, timestamp: BlockTimestamp, value: BlockMeta) {
tables
.create_row("block_meta", key)
.set("at", timestamp)
.set("number", value.number)
.set("hash", value.hash)
.set("parent_hash", value.parent_hash)
.set("timestamp", value.timestamp.unwrap());
}

fn push_update(tables: &mut Tables, key: &str, value: BlockMeta) {
tables
.update_row("block_meta", key)
.set("number", value.number)
.set("hash", value.hash)
.set("parent_hash", value.parent_hash)
.set("timestamp", value.timestamp.unwrap());
}
20 changes: 20 additions & 0 deletions src/kv_out.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use substreams::proto;
use substreams::store::{DeltaProto, Deltas};
use substreams_sink_kv::pb::sf::substreams::sink::kv::v1::KvOperations;

use crate::pb::eth::block_meta::v1::BlockMeta;

pub fn block_meta_to_kv_ops(ops: &mut KvOperations, deltas: Deltas<DeltaProto<BlockMeta>>) {
use substreams::pb::substreams::store_delta::Operation;

for delta in deltas {
match delta.operation {
Operation::Create | Operation::Update => {
let val = proto::encode(&delta.new_value).unwrap();
ops.push_new(delta.key, val, delta.ordinal);
}
Operation::Delete => ops.push_delete(&delta.key, delta.ordinal),
x => panic!("unsupported opeation {:?}", x),
}
}
}
97 changes: 77 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,87 @@
mod block_timestamp;
#[path = "db_out.rs"]
mod db;
#[path = "graph_out.rs"]
mod graph;
#[path = "kv_out.rs"]
mod kv;
mod pb;

use block_timestamp::BlockTimestamp;
use pb::eth::block_meta::v1::BlockMeta;
use substreams::errors::Error;
use substreams::Hex;
use substreams::store::{
DeltaProto, Deltas, StoreNew, StoreSet, StoreSetIfNotExists, StoreSetIfNotExistsProto,
StoreSetProto,
};
use substreams_database_change::pb::database::DatabaseChanges;
use substreams_database_change::tables::Tables;
use substreams_entity_change::pb::entity::EntityChanges;
use substreams_ethereum::pb::eth::v2::Block;
use substreams_sink_kv::pb::sf::substreams::sink::kv::v1::KvOperations;

#[substreams::handlers::store]
fn store_block_meta_start(blk: Block, s: StoreSetIfNotExistsProto<BlockMeta>) {
let (timestamp, meta) = block_to_block_meta(blk);

s.set_if_not_exists(meta.number, timestamp.start_of_day_key(), &meta);
s.set_if_not_exists(meta.number, timestamp.start_of_month_key(), &meta);
}

#[substreams::handlers::store]
fn store_block_meta_end(blk: Block, s: StoreSetProto<BlockMeta>) {
let (timestamp, meta) = block_to_block_meta(blk);

s.set(meta.number, timestamp.end_of_day_key(), &meta);
s.set(meta.number, timestamp.end_of_month_key(), &meta);
}

#[substreams::handlers::map]
pub fn db_out(blk: Block) -> Result<DatabaseChanges, Error> {
let mut tables = Tables::new();
add_block_entity(&mut tables, blk);
pub fn db_out(
block_meta_start: Deltas<DeltaProto<BlockMeta>>,
block_meta_end: Deltas<DeltaProto<BlockMeta>>,
) -> Result<DatabaseChanges, Error> {
let mut tables = substreams_database_change::tables::Tables::new();
db::add_block_meta_to_tables(&mut tables, block_meta_start);
db::add_block_meta_to_tables(&mut tables, block_meta_end);

Ok(tables.to_database_changes())
}

fn add_block_entity(tables: &mut Tables, blk: Block) {
let block_hash = Hex(&blk.hash).to_string();

tables
.create_row("block_meta", &block_hash)
.set("number", blk.number)
.set("hash", &block_hash)
.set(
"parent_hash",
Hex(&blk.header.as_ref().unwrap().parent_hash).to_string(),
)
.set(
"timestamp",
blk.header.as_ref().unwrap().timestamp.as_ref().unwrap(),
);
#[substreams::handlers::map]
pub fn graph_out(
block_meta_start: Deltas<DeltaProto<BlockMeta>>,
block_meta_end: Deltas<DeltaProto<BlockMeta>>,
) -> Result<EntityChanges, Error> {
let mut tables = substreams_entity_change::tables::Tables::new();
graph::add_block_meta_to_tables(&mut tables, block_meta_start);
graph::add_block_meta_to_tables(&mut tables, block_meta_end);

Ok(tables.to_entity_changes())
}

#[substreams::handlers::map]
pub fn kv_out(
block_meta_start: Deltas<DeltaProto<BlockMeta>>,
block_meta_end: Deltas<DeltaProto<BlockMeta>>,
) -> Result<KvOperations, Error> {
let mut kv_ops: KvOperations = Default::default();
kv::block_meta_to_kv_ops(&mut kv_ops, block_meta_start);
kv::block_meta_to_kv_ops(&mut kv_ops, block_meta_end);

Ok(kv_ops)
}

fn block_to_block_meta(blk: Block) -> (BlockTimestamp, BlockMeta) {
let timestamp = BlockTimestamp::from_block(&blk);
let header = blk.header.unwrap();

(
timestamp,
BlockMeta {
number: blk.number,
hash: blk.hash,
parent_hash: header.parent_hash,
timestamp: Some(header.timestamp.unwrap()),
},
)
}
14 changes: 14 additions & 0 deletions src/pb/eth.block_meta.v1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// @generated
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct BlockMeta {
#[prost(uint64, tag="1")]
pub number: u64,
#[prost(bytes="vec", tag="2")]
pub hash: ::prost::alloc::vec::Vec<u8>,
#[prost(bytes="vec", tag="3")]
pub parent_hash: ::prost::alloc::vec::Vec<u8>,
#[prost(message, optional, tag="4")]
pub timestamp: ::core::option::Option<::prost_types::Timestamp>,
}
// @@protoc_insertion_point(module)
10 changes: 10 additions & 0 deletions src/pb/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// @generated
pub mod eth {
pub mod block_meta {
// @@protoc_insertion_point(attribute:eth.block_meta.v1)
pub mod v1 {
include!("eth.block_meta.v1.rs");
// @@protoc_insertion_point(eth.block_meta.v1)
}
}
}
45 changes: 44 additions & 1 deletion substreams.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,58 @@ imports:
entities_change: https://github.com/streamingfast/substreams-sink-entity-changes/releases/download/v1.3.0/substreams-sink-entity-changes-v1.3.0.spkg
kv_operations: https://github.com/streamingfast/substreams-sink-kv/releases/download/v2.1.5/substreams-sink-kv-v2.1.5.spkg

protobuf:
files:
- block_meta.proto
importPaths:
- ./proto

binaries:
default:
type: wasm/rust-v1
file: ./target/wasm32-unknown-unknown/release/substreams.wasm

modules:
- name: store_block_meta_start
kind: store
updatePolicy: set_if_not_exists
valueType: proto:eth.block_meta.v1.BlockMeta
inputs:
- source: sf.ethereum.type.v2.Block

- name: store_block_meta_end
kind: store
updatePolicy: set
valueType: proto:eth.block_meta.v1.BlockMeta
inputs:
- source: sf.ethereum.type.v2.Block

- name: db_out
kind: map
inputs:
- source: sf.ethereum.type.v2.Block
- store: store_block_meta_start
mode: deltas
- store: store_block_meta_end
mode: deltas
output:
type: proto:sf.substreams.sink.database.v1.DatabaseChanges

- name: kv_out
kind: map
inputs:
- store: store_block_meta_start
mode: deltas
- store: store_block_meta_end
mode: deltas
output:
type: proto:sf.substreams.sink.kv.v1.KVOperations

- name: graph_out
kind: map
inputs:
- store: store_block_meta_start
mode: deltas
- store: store_block_meta_end
mode: deltas
output:
type: proto:sf.substreams.sink.entity.v1.EntityChanges

0 comments on commit 7854dea

Please sign in to comment.