Skip to content

Commit

Permalink
feat(flow): flow worker (#3934)
Browse files Browse the repository at this point in the history
* feat: flow worker

* chore: fix after cherry pick

* refactor: error handling

* refactor: error handling

Signed-off-by: Zhenchi <[email protected]>

* chore: merge origin/main

Signed-off-by: Zhenchi <[email protected]>

---------

Signed-off-by: Zhenchi <[email protected]>
Co-authored-by: Zhenchi <[email protected]>
  • Loading branch information
discord9 and zhongzc authored May 14, 2024
1 parent 72897a2 commit 15d7b97
Show file tree
Hide file tree
Showing 6 changed files with 554 additions and 7 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/flow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ futures = "0.3"
# it is the same with upstream repo
async-trait.workspace = true
common-meta.workspace = true
enum-as-inner = "0.6.0"
greptime-proto.workspace = true
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
itertools.workspace = true
Expand Down
6 changes: 5 additions & 1 deletion src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@
pub(crate) mod error;
pub(crate) mod node_context;

pub(crate) use node_context::FlownodeContext;
pub(crate) use node_context::{FlowId, FlownodeContext, TableName};

mod worker;

pub const PER_REQ_MAX_ROW_CNT: usize = 8192;
29 changes: 25 additions & 4 deletions src/flow/src/adapter/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use datatypes::value::Value;
use servers::define_into_tonic_status;
use snafu::{Location, Snafu};

use crate::adapter::FlowId;
use crate::expr::EvalError;

/// This error is used to represent all possible errors that can occur in the flow module.
Expand All @@ -39,7 +40,11 @@ pub enum Error {
},

#[snafu(display("Internal error"))]
Internal { location: Location, reason: String },
Internal {
reason: String,
#[snafu(implicit)]
location: Location,
},

/// TODO(discord9): add detailed location of column
#[snafu(display("Failed to eval stream"))]
Expand Down Expand Up @@ -71,6 +76,20 @@ pub enum Error {
location: Location,
},

#[snafu(display("Flow not found, id={id}"))]
FlowNotFound {
id: FlowId,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Flow already exist, id={id}"))]
FlowAlreadyExist {
id: FlowId,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to join task"))]
JoinTask {
#[snafu(source)]
Expand Down Expand Up @@ -168,10 +187,12 @@ impl ErrorExt for Error {
Self::Eval { .. } | &Self::JoinTask { .. } | &Self::Datafusion { .. } => {
StatusCode::Internal
}
&Self::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Self::TableNotFound { .. } | Self::TableNotFoundMeta { .. } => {
StatusCode::TableNotFound
&Self::TableAlreadyExist { .. } | Self::FlowAlreadyExist { .. } => {
StatusCode::TableAlreadyExists
}
Self::TableNotFound { .. }
| Self::TableNotFoundMeta { .. }
| Self::FlowNotFound { .. } => StatusCode::TableNotFound,
Self::InvalidQueryPlan { .. }
| Self::InvalidQuerySubstrait { .. }
| Self::InvalidQueryProst { .. }
Expand Down
Loading

0 comments on commit 15d7b97

Please sign in to comment.