Skip to content

Commit

Permalink
feat(flow): flush_flow function (#4416)
Browse files Browse the repository at this point in the history
* refactor: df err variant

* WIP

* chore: update proto version

* chore: revert mistaken rust-toolchain

* feat(WIP): added FlowService to QueryEngine

* refactor: move flow service to operator

* refactor: flush use flow name not id

* refactor: use full path in macro

* feat: flush flow

* feat: impl flush flow

* chore: remove unused

* chore: meaninful response

* chore: remove unused

* chore: clippy

* fix: flush_flow with proper blocking

* test: sqlness tests added back for flow

* test: better predicate for flush_flow

* refactor: rwlock

* fix: flush lock

* fix: flush lock write then drop

* test: add a new flow sqlness test

* fix: sqlness testcase

* chore: style

---------

Co-authored-by: dennis zhuang <[email protected]>
  • Loading branch information
discord9 and killme2008 authored Jul 26, 2024
1 parent 0710e6f commit 021ec7b
Show file tree
Hide file tree
Showing 51 changed files with 1,399 additions and 145 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ etcd-client = { version = "0.13" }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5c801650435d464891114502539b701c77a1b914" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7ca323090b3ae8faf2c15036b7f41b7c5225cf5f" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
1 change: 1 addition & 0 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
None,
None,
None,
None,
false,
plugins.clone(),
));
Expand Down
1 change: 1 addition & 0 deletions src/common/function/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ serde.workspace = true
serde_json.workspace = true
session.workspace = true
snafu.workspace = true
sql.workspace = true
statrs = "0.16"
store-api.workspace = true
table.workspace = true
Expand Down
164 changes: 164 additions & 0 deletions src/common/function/src/flush_flow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_error::ext::BoxedError;
use common_macro::admin_fn;
use common_query::error::{
ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::{ensure, ResultExt};
use sql::parser::ParserContext;
use store_api::storage::ConcreteDataType;

use crate::handlers::FlowServiceHandlerRef;

fn flush_signature() -> Signature {
Signature::uniform(
1,
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
}

#[admin_fn(
name = FlushFlowFunction,
display_name = flush_flow,
sig_fn = flush_signature,
ret = uint64
)]
pub(crate) async fn flush_flow(
flow_service_handler: &FlowServiceHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?;

let res = flow_service_handler
.flush(&catalog_name, &flow_name, query_ctx.clone())
.await?;
let affected_rows = res.affected_rows;

Ok(Value::from(affected_rows))
}

fn parse_flush_flow(
params: &[ValueRef<'_>],
query_ctx: &QueryContextRef,
) -> Result<(String, String)> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
params.len()
),
}
);

let ValueRef::String(flow_name) = params[0] else {
return UnsupportedInputDataTypeSnafu {
function: "flush_flow",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
.map_err(BoxedError::new)
.context(ExecuteSnafu)?;

let (catalog_name, flow_name) = match &obj_name.0[..] {
[flow_name] => (
query_ctx.current_catalog().to_string(),
flow_name.value.clone(),
),
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
obj_name
),
}
.fail()
}
};
Ok((catalog_name, flow_name))
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use datatypes::scalars::ScalarVector;
use datatypes::vectors::StringVector;
use session::context::QueryContext;

use super::*;
use crate::function::{Function, FunctionContext};

#[test]
fn test_flush_flow_metadata() {
let f = FlushFlowFunction;
assert_eq!("flush_flow", f.name());
assert_eq!(
ConcreteDataType::uint64_datatype(),
f.return_type(&[]).unwrap()
);
assert_eq!(
f.signature(),
Signature::uniform(
1,
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
);
}

#[test]
fn test_missing_flow_service() {
let f = FlushFlowFunction;

let args = vec!["flow_name"];
let args = args
.into_iter()
.map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _)
.collect::<Vec<_>>();

let result = f.eval(FunctionContext::default(), &args).unwrap_err();
assert_eq!(
"Missing FlowServiceHandler, not expected",
result.to_string()
);
}

#[test]
fn test_parse_flow_args() {
let testcases = [
("flow_name", ("greptime", "flow_name")),
("catalog.flow_name", ("catalog", "flow_name")),
];
for (input, expected) in testcases.iter() {
let args = vec![*input];
let args = args.into_iter().map(ValueRef::String).collect::<Vec<_>>();

let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap();
assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
}
}
}
13 changes: 13 additions & 0 deletions src/common/function/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ pub trait ProcedureServiceHandler: Send + Sync {
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse>;
}

/// This flow service handler is only use for flush flow for now.
#[async_trait]
pub trait FlowServiceHandler: Send + Sync {
async fn flush(
&self,
catalog: &str,
flow: &str,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse>;
}

pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;

pub type ProcedureServiceHandlerRef = Arc<dyn ProcedureServiceHandler>;

pub type FlowServiceHandlerRef = Arc<dyn FlowServiceHandler>;
1 change: 1 addition & 0 deletions src/common/function/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#![feature(let_chains)]
#![feature(try_blocks)]

mod flush_flow;
mod macros;
pub mod scalars;
mod system;
Expand Down
20 changes: 18 additions & 2 deletions src/common/function/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::handlers::{ProcedureServiceHandlerRef, TableMutationHandlerRef};
use crate::handlers::{FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef};

/// Shared state for SQL functions.
/// The handlers in state may be `None` in cli command-line or test cases.
Expand All @@ -22,6 +22,8 @@ pub struct FunctionState {
pub table_mutation_handler: Option<TableMutationHandlerRef>,
// The procedure service handler
pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
// The flownode handler
pub flow_service_handler: Option<FlowServiceHandlerRef>,
}

impl FunctionState {
Expand All @@ -42,9 +44,10 @@ impl FunctionState {
CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
};

use crate::handlers::{ProcedureServiceHandler, TableMutationHandler};
use crate::handlers::{FlowServiceHandler, ProcedureServiceHandler, TableMutationHandler};
struct MockProcedureServiceHandler;
struct MockTableMutationHandler;
struct MockFlowServiceHandler;
const ROWS: usize = 42;

#[async_trait]
Expand Down Expand Up @@ -116,9 +119,22 @@ impl FunctionState {
}
}

#[async_trait]
impl FlowServiceHandler for MockFlowServiceHandler {
async fn flush(
&self,
_catalog: &str,
_flow: &str,
_ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
todo!()
}
}

Self {
table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
flow_service_handler: Some(Arc::new(MockFlowServiceHandler)),
}
}
}
10 changes: 2 additions & 8 deletions src/common/function/src/system/procedure_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use api::v1::meta::ProcedureStatus;
use common_macro::admin_fn;
use common_meta::rpc::procedure::ProcedureStateResponse;
use common_query::error::Error::ThreadJoin;
use common_query::error::{
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use common_telemetry::error;
use datatypes::prelude::*;
use datatypes::vectors::VectorRef;
use serde::Serialize;
use session::context::QueryContextRef;
use snafu::{ensure, Location, OptionExt};
use snafu::ensure;

use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
use crate::handlers::ProcedureServiceHandlerRef;

#[derive(Serialize)]
Expand Down Expand Up @@ -103,6 +96,7 @@ mod tests {
use datatypes::vectors::StringVector;

use super::*;
use crate::function::{Function, FunctionContext};

#[test]
fn test_procedure_state_misc() {
Expand Down
2 changes: 2 additions & 0 deletions src/common/function/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use flush_compact_region::{CompactRegionFunction, FlushRegionFunction};
use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use migrate_region::MigrateRegionFunction;

use crate::flush_flow::FlushFlowFunction;
use crate::function_registry::FunctionRegistry;

/// Table functions
Expand All @@ -35,5 +36,6 @@ impl TableFunction {
registry.register(Arc::new(CompactRegionFunction));
registry.register(Arc::new(FlushTableFunction));
registry.register(Arc::new(CompactTableFunction));
registry.register(Arc::new(FlushFlowFunction));
}
}
10 changes: 2 additions & 8 deletions src/common/function/src/table/flush_compact_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use common_macro::admin_fn;
use common_query::error::Error::ThreadJoin;
use common_query::error::{
InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use common_telemetry::error;
use datatypes::prelude::*;
use datatypes::vectors::VectorRef;
use session::context::QueryContextRef;
use snafu::{ensure, Location, OptionExt};
use snafu::ensure;
use store_api::storage::RegionId;

use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
use crate::handlers::TableMutationHandlerRef;
use crate::helper::cast_u64;

Expand Down Expand Up @@ -84,6 +77,7 @@ mod tests {
use datatypes::vectors::UInt64Vector;

use super::*;
use crate::function::{Function, FunctionContext};

macro_rules! define_region_function_test {
($name: ident, $func: ident) => {
Expand Down
Loading

0 comments on commit 021ec7b

Please sign in to comment.