Skip to content

Commit

Permalink
feat!: add input status to GraphQL API
Browse files Browse the repository at this point in the history
Add a new field to the input object in the GraphQL API and a new column
in the Postgres input table.
Node runners should clean their database before upgrading and update the
GraphQL schema.
  • Loading branch information
gligneul committed Sep 25, 2023
1 parent 895d9f7 commit f87562d
Show file tree
Hide file tree
Showing 19 changed files with 278 additions and 233 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added support to `POST` *inspect state* requests
- Added snapshot validation. The node will now check whether the snapshot's template hash matches the one stored in the blockchain
- Added `cartesi/rollups-node` docker image with all node binaries
- Added completion status to GraphQL API

### Changed

Expand Down
30 changes: 27 additions & 3 deletions offchain/advance-runner/src/server_manager/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
//! rollups-events types
use grpc_interfaces::cartesi_machine::Hash;
use grpc_interfaces::cartesi_server_manager::{
Address, OutputEnum, OutputValidityProof, Proof,
Address, CompletionStatus, OutputEnum, OutputValidityProof, Proof,
};
use rollups_events::{
Address as RollupsAddress, Hash as RollupsHash, Payload, RollupsOutputEnum,
RollupsOutputValidityProof, RollupsProof, ADDRESS_SIZE, HASH_SIZE,
Address as RollupsAddress, Hash as RollupsHash, Payload,
RollupsCompletionStatus, RollupsOutputEnum, RollupsOutputValidityProof,
RollupsProof, ADDRESS_SIZE, HASH_SIZE,
};

use super::error::ServerManagerError;
Expand All @@ -33,6 +34,29 @@ macro_rules! get_field {
// Export the get_field macro for other modules to use
pub(super) use get_field;

/// Convert gRPC completion status to broker equivalent
pub fn convert_completion_status(
status: CompletionStatus,
) -> RollupsCompletionStatus {
match status {
CompletionStatus::Accepted => RollupsCompletionStatus::Accepted,
CompletionStatus::Rejected => RollupsCompletionStatus::Rejected,
CompletionStatus::Exception => RollupsCompletionStatus::Exception,
CompletionStatus::MachineHalted => {
RollupsCompletionStatus::MachineHalted
}
CompletionStatus::CycleLimitExceeded => {
RollupsCompletionStatus::CycleLimitExceeded
}
CompletionStatus::TimeLimitExceeded => {
RollupsCompletionStatus::TimeLimitExceeded
}
CompletionStatus::PayloadLengthLimitExceeded => {
RollupsCompletionStatus::PayloadLengthLimitExceeded
}
}
}

/// Convert gRPC hash to broker equivalent
pub fn convert_hash(hash: Hash) -> Result<RollupsHash, ServerManagerError> {
hash.data.try_into().map(RollupsHash::new).map_err(|data| {
Expand Down
14 changes: 11 additions & 3 deletions offchain/advance-runner/src/server_manager/facade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

use backoff::{future::retry, Error, ExponentialBackoff};
use rollups_events::{
InputMetadata as RollupsInputMetadata, Payload, RollupsClaim,
RollupsNotice, RollupsOutput, RollupsReport, RollupsVoucher,
InputMetadata as RollupsInputMetadata, Payload, RollupsAdvanceResult,
RollupsClaim, RollupsNotice, RollupsOutput, RollupsReport, RollupsVoucher,
};
use snafu::{OptionExt, ResultExt};
use std::path::Path;
Expand All @@ -23,7 +23,8 @@ use grpc_interfaces::cartesi_server_manager::{
use super::claim::compute_epoch_hash;
use super::config::ServerManagerConfig;
use super::conversions::{
convert_address, convert_hash, convert_proof, get_field,
convert_address, convert_completion_status, convert_hash, convert_proof,
get_field,
};
use super::error::{
ConnectionSnafu, EmptyEpochSnafu, InvalidProcessedInputSnafu,
Expand Down Expand Up @@ -223,6 +224,13 @@ impl ServerManagerFacade {

let mut outputs = vec![];

let status = convert_completion_status(processed_input.status());
let result = RollupsAdvanceResult {
input_index: current_input_index,
status,
};
outputs.push(RollupsOutput::AdvanceResult(result));

for (index, report) in processed_input.reports.into_iter().enumerate() {
let report = RollupsReport {
index: index as u64,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- This file should undo anything in `up.sql`

ALTER TABLE "inputs" DROP "status";

DROP TYPE "CompletionStatus";
14 changes: 14 additions & 0 deletions offchain/data/migrations/20230921143147_completion_status/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- Your SQL goes here

CREATE TYPE "CompletionStatus" AS ENUM (
'Unprocessed',
'Accepted',
'Rejected',
'Exception',
'MachineHalted',
'CycleLimitExceeded',
'TimeLimitExceeded',
'PayloadLengthLimitExceeded'
);

ALTER TABLE "inputs" ADD "status" "CompletionStatus" NOT NULL DEFAULT 'Unprocessed';
4 changes: 2 additions & 2 deletions offchain/data/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ pub use migrations::{run_migrations, MigrationError};
pub use pagination::{Connection, Cursor, Edge, PageInfo};
pub use repository::Repository;
pub use types::{
Input, InputQueryFilter, Notice, NoticeQueryFilter, OutputEnum, Proof,
Report, ReportQueryFilter, Voucher, VoucherQueryFilter,
CompletionStatus, Input, InputQueryFilter, Notice, NoticeQueryFilter,
OutputEnum, Proof, Report, ReportQueryFilter, Voucher, VoucherQueryFilter,
};
25 changes: 22 additions & 3 deletions offchain/data/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use backoff::ExponentialBackoff;
use diesel::pg::{Pg, PgConnection};
use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
use diesel::{insert_into, prelude::*};
use diesel::{insert_into, prelude::*, update};
use snafu::ResultExt;
use std::sync::Arc;

Expand All @@ -13,8 +13,8 @@ use super::error::{DatabaseConnectionSnafu, DatabaseSnafu, Error};
use super::pagination::{Connection, Pagination};
use super::schema;
use super::types::{
Input, InputQueryFilter, Notice, NoticeQueryFilter, OutputEnum, Proof,
Report, ReportQueryFilter, Voucher, VoucherQueryFilter,
CompletionStatus, Input, InputQueryFilter, Notice, NoticeQueryFilter,
OutputEnum, Proof, Report, ReportQueryFilter, Voucher, VoucherQueryFilter,
};

pub const POOL_CONNECTION_SIZE: u32 = 3;
Expand Down Expand Up @@ -223,6 +223,25 @@ impl Repository {
}
}

/// Update operations
impl Repository {
pub fn update_input_status(
&self,
input_index: i32,
status: CompletionStatus,
) -> Result<(), Error> {
use schema::inputs;
let mut conn = self.conn()?;
update(inputs::table)
.filter(inputs::dsl::index.eq(input_index))
.set(inputs::status.eq(status))
.execute(&mut conn)
.context(DatabaseSnafu)?;
tracing::trace!("Set {:?} status to input {}", status, input_index);
Ok(())
}
}

/// Generate a boxed query from an input query filter
impl InputQueryFilter {
fn to_query(&self) -> schema::inputs::BoxedQuery<'_, Pg> {
Expand Down
8 changes: 8 additions & 0 deletions offchain/data/src/schema.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
// @generated automatically by Diesel CLI.

pub mod sql_types {
#[derive(diesel::sql_types::SqlType)]
#[diesel(postgres_type(name = "CompletionStatus"))]
pub struct CompletionStatus;

#[derive(diesel::sql_types::SqlType)]
#[diesel(postgres_type(name = "OutputEnum"))]
pub struct OutputEnum;
}

diesel::table! {
use diesel::sql_types::*;
use super::sql_types::CompletionStatus;

inputs (index) {
index -> Int4,
msg_sender -> Bytea,
tx_hash -> Bytea,
block_number -> Int8,
timestamp -> Timestamp,
payload -> Bytea,
status -> CompletionStatus,
}
}

Expand Down
61 changes: 59 additions & 2 deletions offchain/data/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,66 @@ use diesel::{AsExpression, Insertable, Queryable, QueryableByName};
use std::io::Write;

use super::schema::{
inputs, notices, proofs, reports, sql_types::OutputEnum as SQLOutputEnum,
vouchers,
inputs, notices, proofs, reports,
sql_types::CompletionStatus as SQLCompletionStatus,
sql_types::OutputEnum as SQLOutputEnum, vouchers,
};

#[derive(Debug, PartialEq, Eq, Clone, Copy, FromSqlRow, AsExpression)]
#[diesel(sql_type = SQLCompletionStatus)]
pub enum CompletionStatus {
Unprocessed,
Accepted,
Rejected,
Exception,
MachineHalted,
CycleLimitExceeded,
TimeLimitExceeded,
PayloadLengthLimitExceeded,
}

impl ToSql<SQLCompletionStatus, Pg> for CompletionStatus {
fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> serialize::Result {
match *self {
CompletionStatus::Unprocessed => out.write_all(b"Unprocessed")?,
CompletionStatus::Accepted => out.write_all(b"Accepted")?,
CompletionStatus::Rejected => out.write_all(b"Rejected")?,
CompletionStatus::Exception => out.write_all(b"Exception")?,
CompletionStatus::MachineHalted => {
out.write_all(b"MachineHalted")?
}
CompletionStatus::CycleLimitExceeded => {
out.write_all(b"CycleLimitExceeded")?
}
CompletionStatus::TimeLimitExceeded => {
out.write_all(b"TimeLimitExceeded")?
}
CompletionStatus::PayloadLengthLimitExceeded => {
out.write_all(b"PayloadLengthLimitExceeded")?
}
}
Ok(IsNull::No)
}
}

impl FromSql<SQLCompletionStatus, Pg> for CompletionStatus {
fn from_sql(bytes: PgValue<'_>) -> deserialize::Result<Self> {
match bytes.as_bytes() {
b"Unprocessed" => Ok(CompletionStatus::Unprocessed),
b"Accepted" => Ok(CompletionStatus::Accepted),
b"Rejected" => Ok(CompletionStatus::Rejected),
b"Exception" => Ok(CompletionStatus::Exception),
b"MachineHalted" => Ok(CompletionStatus::MachineHalted),
b"CycleLimitExceeded" => Ok(CompletionStatus::CycleLimitExceeded),
b"TimeLimitExceeded" => Ok(CompletionStatus::TimeLimitExceeded),
b"PayloadLengthLimitExceeded" => {
Ok(CompletionStatus::PayloadLengthLimitExceeded)
}
_ => Err("Unrecognized enum variant".into()),
}
}
}

#[derive(Clone, Debug, Insertable, PartialEq, Queryable, QueryableByName)]
#[diesel(table_name = inputs)]
pub struct Input {
Expand All @@ -21,6 +77,7 @@ pub struct Input {
pub block_number: i64,
pub timestamp: std::time::SystemTime,
pub payload: Vec<u8>,
pub status: CompletionStatus,
}

#[derive(Clone, Debug, Insertable, PartialEq, Queryable, QueryableByName)]
Expand Down
27 changes: 25 additions & 2 deletions offchain/data/tests/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use diesel::{
use redacted::Redacted;
use rollups_data::Connection as PaginationConnection;
use rollups_data::{
Cursor, Edge, Error, Input, InputQueryFilter, Notice, PageInfo, Proof,
Report, Repository, RepositoryConfig, Voucher,
CompletionStatus, Cursor, Edge, Error, Input, InputQueryFilter, Notice,
PageInfo, Proof, Report, Repository, RepositoryConfig, Voucher,
};
use serial_test::serial;
use std::time::{Duration, UNIX_EPOCH};
Expand Down Expand Up @@ -70,6 +70,7 @@ pub fn insert_test_input(repo: &Repository) {
block_number: 0,
timestamp: UNIX_EPOCH + Duration::from_secs(1676489717),
payload: "input-0".as_bytes().to_vec(),
status: CompletionStatus::Accepted,
};

repo.insert_input(input)
Expand All @@ -84,6 +85,7 @@ pub fn create_input() -> Input {
block_number: 0,
timestamp: UNIX_EPOCH + Duration::from_secs(1676489717),
payload: "input-0".as_bytes().to_vec(),
status: CompletionStatus::Accepted,
}
}

Expand Down Expand Up @@ -176,6 +178,26 @@ fn test_get_input_error() {
));
}

#[test]
#[serial]
fn test_update_input_status() {
let docker = Cli::default();
let test = TestState::setup(&docker);
let repo = test.get_repository();

let mut input = create_input();
input.status = CompletionStatus::Unprocessed;

repo.insert_input(input.clone())
.expect("Failed to insert input");
repo.update_input_status(0, CompletionStatus::Accepted)
.expect("Failed to update input status");

let get_input = repo.get_input(0).expect("Failed to get input");

assert_eq!(get_input.status, CompletionStatus::Accepted);
}

#[test]
#[serial]
fn test_insert_notice() {
Expand Down Expand Up @@ -583,6 +605,7 @@ fn test_pagination_macro() {
block_number: 0,
timestamp: UNIX_EPOCH + Duration::from_secs(1676489717),
payload: "input-1".as_bytes().to_vec(),
status: CompletionStatus::Accepted,
};

repo.insert_input(input0.clone())
Expand Down
Loading

0 comments on commit f87562d

Please sign in to comment.