Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: Add a python walrus event tracker example #15

Merged
merged 5 commits into from
Jun 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions examples/python/track_walrus_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Track Walrus storage related events on the Sui blockchain

import datetime

# Std lib imports
import requests
import re

from utils import num_to_blob_id

PATH_TO_WALRUS_CONFIG = "../CONFIG/config_dir/client_config.yaml"

system_object_id = re.findall(
r"system_object:[ ]*(.*)", open(PATH_TO_WALRUS_CONFIG).read()
)[0]
print(f"System object ID: {system_object_id}")

# Query the Walrus system object on Sui
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "sui_getObject",
"params": [
system_object_id,
{
"showType": True,
"showOwner": False,
"showPreviousTransaction": False,
"showDisplay": False,
"showContent": False,
"showBcs": False,
"showStorageRebate": False,
},
],
}
response = requests.post("https://fullnode.testnet.sui.io:443", json=request)
assert response.status_code == 200

system_object_content = response.json()["result"]["data"]
walrus_package = re.findall("(0x[0-9a-f]+)::system", system_object_content["type"])[0]
print(f"Walrus type: {walrus_package}")

# Query events for the appropriate Walrus type
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "suix_queryEvents",
"params": [
# Query by module type
{"MoveModule": {"package": walrus_package, "module": "blob"}},
None,
# Query the latest 100 events
100,
True, # Indicates descending order
],
}
response = requests.post("https://fullnode.testnet.sui.io:443", json=request)
assert response.status_code == 200

events = response.json()["result"]["data"]
for event in events:
# Parse the Walrus event
tx_digest = event["id"]["txDigest"]
event_type = event["type"][68 + 13 :] # Skip the package & module prefix
parsed_event = event["parsedJson"]
blob_id = num_to_blob_id(int(parsed_event["blob_id"]))
timestamp_ms = int(event["timestampMs"])
time_date = datetime.datetime.fromtimestamp(timestamp_ms / 1000.0)

# For registered blobs get their size in bytes
if event_type == "BlobRegistered":
size = f"{parsed_event['size']}B"
else:
size = ""

print(f"{time_date} {event_type:<15} {size:>10} {blob_id} Tx:{tx_digest:<48}")