Skip to content

Commit

Permalink
refactor(sql): adjust UDF sql (#306)
Browse files Browse the repository at this point in the history
New sql syntax:
drop UDF:
DROP FUNCTION <udfName>;
old: DROP PYTHON TASK <udfName>;

list all UDFs:
SHOW FUNCTIONS;
old: SHOW REGISTER PYTHON TASK;
  • Loading branch information
shinyano authored Apr 22, 2024
1 parent 6693b1a commit eed056c
Show file tree
Hide file tree
Showing 13 changed files with 53 additions and 63 deletions.
2 changes: 1 addition & 1 deletion .github/scripts/test/cli/test_py_register.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ result=$(bash -c "echo '$COMMAND' | xargs -0 -t -i ${SCRIPT_COMMAND}")

if [[ $result =~ 'success' ]]; then
echo success
COMMAND='DROP PYTHON TASK "'"mock_udf"'";'
COMMAND='DROP FUNCTION "'"mock_udf"'";'
bash -c "echo '$COMMAND' | xargs -0 -t -i ${SCRIPT_COMMAND}"
else
echo 'Error: failed to register udf mock_udf.'
Expand Down
2 changes: 1 addition & 1 deletion .github/scripts/test/cli/test_py_register_macos.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ result=$(sh -c "echo '$COMMAND' | xargs -0 -t -I F sh start_cli.sh -e 'F'")

if [[ $result =~ 'success' ]]; then
echo success
COMMAND='DROP PYTHON TASK "'"mock_udf"'";'
COMMAND='DROP FUNCTION "'"mock_udf"'";'
sh -c "echo '$COMMAND' | xargs -0 -t -I F sh start_cli.sh -e 'F'"
else
echo 'Error: failed to register udf mock_udf.'
Expand Down
2 changes: 1 addition & 1 deletion .github/scripts/test/cli/test_py_register_windows.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ result=$(bash -c "./start_cli.bat -e '$COMMAND'")

if [[ $result =~ 'success' ]]; then
echo success
COMMAND='DROP PYTHON TASK "'"mock_udf"'";'
COMMAND='DROP FUNCTION "'"mock_udf"'";'
bash -c "./start_cli.bat -e '$COMMAND'"
else
echo 'Error: failed to register udf mock_udf.'
Expand Down
26 changes: 9 additions & 17 deletions antlr/src/main/antlr4/cn/edu/tsinghua/iginx/sql/Sql.g4
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ statement
| SHOW REPLICA NUMBER # showReplicationStatement
| ADD STORAGEENGINE storageEngineSpec # addStorageEngineStatement
| SHOW CLUSTER INFO # showClusterInfoStatement
| SHOW REGISTER PYTHON TASK # showRegisterTaskStatement
| SHOW FUNCTIONS # showRegisterTaskStatement
| CREATE FUNCTION udfType udfClassRef (COMMA (udfType)? udfClassRef)* IN filePath = stringLiteral # registerTaskStatement
| DROP PYTHON TASK name = stringLiteral # dropTaskStatement
| DROP FUNCTION name = stringLiteral # dropTaskStatement
| COMMIT TRANSFORM JOB filePath = stringLiteral # commitTransformJobStatement
| SHOW TRANSFORM JOB STATUS jobId = INT # showJobStatusStatement
| CANCEL TRANSFORM JOB jobId = INT # cancelJobStatement
Expand Down Expand Up @@ -426,9 +426,9 @@ keyWords
| DATA
| REPLICA
| DROP
| REGISTER
| PYTHON
| TASK
| CREATE
| FUNCTION
| FUNCTIONS
| COMMIT
| JOB
| STATUS
Expand Down Expand Up @@ -675,18 +675,6 @@ DROP
: D R O P
;

REGISTER
: R E G I S T E R
;

PYTHON
: P Y T H O N
;

TASK
: T A S K
;

COMMIT
: C O M M I T
;
Expand Down Expand Up @@ -759,6 +747,10 @@ FUNCTION
: F U N C T I O N
;

FUNCTIONS
: F U N C T I O N S
;

CREATED
: C R E A T E D
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ private static Completer buildIginxCompleter() {
Arrays.asList("explain", "select"),
Arrays.asList("add", "storageengine"),
Arrays.asList("create", "function"),
Arrays.asList("drop", "python", "task"),
Arrays.asList("drop", "function"),
Arrays.asList("commit", "transform", "job"),
Arrays.asList("show", "transform", "job", "status"),
Arrays.asList("cancel", "transform", "job"),
Expand All @@ -671,7 +671,7 @@ private static Completer buildIginxCompleter() {
Arrays.asList("clear", "data"),
Arrays.asList("show", "columns"),
Arrays.asList("show", "cluster", "info"),
Arrays.asList("show", "register", "python", "task"),
Arrays.asList("show", "functions"),
Arrays.asList("show", "sessionid"),
Arrays.asList("show", "rules"),
Arrays.asList("remove", "historydatasource"));
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/java/cn/edu/tsinghua/iginx/IginxWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ public Status registerTask(RegisterTaskReq req) {
for (UDFClassPair p : pairs) {
TransformTaskMeta transformTaskMeta = metaManager.getTransformTask(p.name.trim());
if (transformTaskMeta != null && transformTaskMeta.getIpSet().contains(config.getIp())) {
errorMsg = String.format("Register task %s already exist", transformTaskMeta);
errorMsg = String.format("Function %s already exist", transformTaskMeta);
LOGGER.error(errorMsg);
return RpcUtils.FAILURE.setMessage(errorMsg);
}
Expand Down Expand Up @@ -960,20 +960,20 @@ public Status dropTask(DropTaskReq req) {
TransformTaskMeta transformTaskMeta = metaManager.getTransformTask(name);
String errorMsg = "";
if (transformTaskMeta == null) {
errorMsg = "Register task not exist";
errorMsg = "Function does not exist";
LOGGER.error(errorMsg);
return RpcUtils.FAILURE.setMessage(errorMsg);
}

TransformJobManager manager = TransformJobManager.getInstance();
if (manager.isRegisterTaskRunning(name)) {
errorMsg = "Register task is running";
errorMsg = "Function is running";
LOGGER.error(errorMsg);
return RpcUtils.FAILURE.setMessage(errorMsg);
}

if (!transformTaskMeta.getIpSet().contains(config.getIp())) {
errorMsg = String.format("Register task exists in node: %s", config.getIp());
errorMsg = String.format("Function exists in node: %s", config.getIp());
LOGGER.error(errorMsg);
return RpcUtils.FAILURE.setMessage(errorMsg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ public class TransformCompare {

private static final List<String> FUNC_LIST = Arrays.asList("min", "max", "sum", "avg", "count");

private static final String SHOW_REGISTER_TASK_SQL = "SHOW REGISTER PYTHON TASK;";
private static final String REGISTER_SQL_FORMATTER =
"REGISTER TRANSFORM PYTHON TASK %s IN %s AS %s";
private static final String DROP_SQL_FORMATTER = "DROP PYTHON TASK %s";
private static final String SHOW_FUNCTION_SQL = "SHOW FUNCTIONS;";
private static final String CREATE_SQL_FORMATTER = "CREATE FUNCTION TRANSFORM %s FROM %s IN %s";
private static final String DROP_SQL_FORMATTER = "DROP FUNCTION %s";

private static final String OUTPUT_DIR_PREFIX =
System.getProperty("user.dir")
Expand Down Expand Up @@ -119,14 +118,14 @@ private static void before() throws SessionException {

registerTask();
// 查询已注册的任务
SessionExecuteSqlResult result = session.executeSql(SHOW_REGISTER_TASK_SQL);
SessionExecuteSqlResult result = session.executeSql(SHOW_FUNCTION_SQL);
result.print(false, "ms");
}

private static void after() throws SessionException {
dropTask();
// 查询已注册的任务
SessionExecuteSqlResult result = session.executeSql(SHOW_REGISTER_TASK_SQL);
SessionExecuteSqlResult result = session.executeSql(SHOW_FUNCTION_SQL);
result.print(false, "ms");

clearData();
Expand All @@ -153,7 +152,7 @@ private static void tearDown() {
private static void registerTask() {
TASK_MAP.forEach(
(k, v) -> {
String registerSQL = String.format(REGISTER_SQL_FORMATTER, k, v, k);
String registerSQL = String.format(CREATE_SQL_FORMATTER, k, k, v);
try {
session.executeSql(registerSQL);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ public class TransformExample {

private static final String QUERY_SQL = "select value1, value2, value3, value4 from transform;";
private static final String SHOW_TIME_SERIES_SQL = "SHOW COLUMNS;";
private static final String SHOW_REGISTER_TASK_SQL = "SHOW REGISTER PYTHON TASK;";
private static final String REGISTER_SQL_FORMATTER =
"REGISTER TRANSFORM PYTHON TASK %s IN %s AS %s";
private static final String DROP_SQL_FORMATTER = "DROP PYTHON TASK %s";
private static final String SHOW_FUNCTION_SQL = "SHOW FUNCTIONS;";
private static final String CREATE_SQL_FORMATTER = "CREATE FUNCTION TRANSFORM %s FROM %s IN %s";
private static final String DROP_SQL_FORMATTER = "DROP FUNCTION %s";

private static final String OUTPUT_DIR_PREFIX =
System.getProperty("user.dir")
Expand Down Expand Up @@ -90,7 +89,7 @@ private static void before() throws SessionException {
registerTask();

// 查询已注册的任务
result = session.executeSql(SHOW_REGISTER_TASK_SQL);
result = session.executeSql(SHOW_FUNCTION_SQL);
result.print(false, "ms");
}

Expand All @@ -99,7 +98,7 @@ private static void after() throws SessionException {
dropTask();

// 查询已注册的任务
SessionExecuteSqlResult result = session.executeSql(SHOW_REGISTER_TASK_SQL);
SessionExecuteSqlResult result = session.executeSql(SHOW_FUNCTION_SQL);
result.print(false, "ms");

// 清除数据
Expand Down Expand Up @@ -134,7 +133,7 @@ private static void runWithSession() throws SessionException, InterruptedExcepti
private static void registerTask() {
TASK_MAP.forEach(
(k, v) -> {
String registerSQL = String.format(REGISTER_SQL_FORMATTER, k, v, k);
String registerSQL = String.format(CREATE_SQL_FORMATTER, k, k, v);
try {
session.executeSql(registerSQL);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ public class UDFExample {
private static final String S3 = "udf.value3";
private static final String S4 = "udf.value4";

private static final String REGISTER_SQL_FORMATTER = "CREATE FUNCTION %s %s FROM %s IN %s";
private static final String DROP_SQL_FORMATTER = "DROP PYTHON TASK %s";
private static final String SHOW_REGISTER_TASK_SQL = "SHOW REGISTER PYTHON TASK;";
private static final String CREATE_SQL_FORMATTER = "CREATE FUNCTION %s %s FROM %s IN %s";
private static final String DROP_SQL_FORMATTER = "DROP FUNCTION %s";
private static final String SHOW_FUNCTION_SQL = "SHOW FUNCTIONS;";

private static final String FILE_DIR =
String.join(
Expand All @@ -43,23 +43,23 @@ public static void main(String[] args) throws SessionException {
// 注册UDTF
String registerSQL =
String.format(
REGISTER_SQL_FORMATTER,
CREATE_SQL_FORMATTER,
"UDTF",
"\"sin\"",
"\"UDFSin\"",
"\"" + FILE_DIR + File.separator + "udtf_sin.py" + "\"");
session.executeSql(registerSQL);
registerSQL =
String.format(
REGISTER_SQL_FORMATTER,
CREATE_SQL_FORMATTER,
"UDAF",
"\"py_count\"",
"\"UDFCount\"",
"\"" + FILE_DIR + File.separator + "udaf_count.py" + "\"");
session.executeSql(registerSQL);

// 查询已注册的UDF
result = session.executeSql(SHOW_REGISTER_TASK_SQL);
result = session.executeSql(SHOW_FUNCTION_SQL);
result.print(false, "ms");

// 使用已注册的UDF
Expand All @@ -76,7 +76,7 @@ public static void main(String[] args) throws SessionException {
session.executeSql(dropSQL);

// 查询已注册的UDF
result = session.executeSql(SHOW_REGISTER_TASK_SQL);
result = session.executeSql(SHOW_FUNCTION_SQL);
result.print(false, "ms");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ private String buildShowRegisterTaskResult() {
StringBuilder builder = new StringBuilder();

if (registerTaskInfos != null && !registerTaskInfos.isEmpty()) {
builder.append("Register task infos:").append("\n");
builder.append("Functions info:").append("\n");
List<List<String>> cache = new ArrayList<>();
cache.add(
new ArrayList<>(Arrays.asList("NAME", "CLASS_NAME", "FILE_NAME", "IP", "UDF_TYPE")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ public class TransformIT {

private static final long END_TIMESTAMP = 15000L;

private static final String SHOW_REGISTER_TASK_SQL = "SHOW REGISTER PYTHON TASK;";
private static final String SHOW_REGISTER_TASK_SQL = "SHOW FUNCTIONS;";

private static final String DROP_SQL_FORMATTER = "DROP PYTHON TASK \"%s\";";
private static final String DROP_SQL_FORMATTER = "DROP FUNCTION \"%s\";";

private static final String REGISTER_SQL_FORMATTER =
private static final String CREATE_SQL_FORMATTER =
"CREATE FUNCTION TRANSFORM \"%s\" FROM \"%s\" IN \"%s\";";

private static final String COMMIT_SQL_FORMATTER = "COMMIT TRANSFORM JOB \"%s\";";
Expand Down Expand Up @@ -188,7 +188,7 @@ private static void dropTask(String task) throws SessionException {

private void registerTask(String task) throws SessionException {
dropTask(task);
session.executeSql(String.format(REGISTER_SQL_FORMATTER, task, task, TASK_MAP.get(task)));
session.executeSql(String.format(CREATE_SQL_FORMATTER, task, task, TASK_MAP.get(task)));
}

private void verifyJobState(long jobId) throws SessionException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ public class UDFIT {

private static final String MULTI_UDF_REGISTER_SQL = "CREATE FUNCTION %s IN \"%s\";";

private static final String DROP_SQL = "DROP PYTHON TASK \"%s\";";
private static final String DROP_SQL = "DROP FUNCTION \"%s\";";

private static final String SHOW_TASK_SQL = "SHOW REGISTER PYTHON TASK;";
private static final String SHOW_FUNCTION_SQL = "SHOW FUNCTIONS;";

private static final String MODULE_PATH =
String.join(
Expand Down Expand Up @@ -218,7 +218,7 @@ private void executeFail(String statement) {
}

private boolean isUDFRegistered(String udfName) {
SessionExecuteSqlResult ret = execute(SHOW_TASK_SQL);
SessionExecuteSqlResult ret = execute(SHOW_FUNCTION_SQL);
List<String> registerUDFs =
ret.getRegisterTaskInfos().stream()
.map(RegisterTaskInfo::getName)
Expand All @@ -227,7 +227,7 @@ private boolean isUDFRegistered(String udfName) {
}

private boolean isUDFsRegistered(List<String> names) {
SessionExecuteSqlResult ret = execute(SHOW_TASK_SQL);
SessionExecuteSqlResult ret = execute(SHOW_FUNCTION_SQL);
List<String> registerUDFs =
ret.getRegisterTaskInfos().stream()
.map(RegisterTaskInfo::getName)
Expand All @@ -242,7 +242,7 @@ private boolean isUDFsRegistered(List<String> names) {

// all udf shouldn't be registered.
private boolean isUDFsUnregistered(List<String> names) {
SessionExecuteSqlResult ret = execute(SHOW_TASK_SQL);
SessionExecuteSqlResult ret = execute(SHOW_FUNCTION_SQL);
List<String> registerUDFs =
ret.getRegisterTaskInfos().stream()
.map(RegisterTaskInfo::getName)
Expand Down Expand Up @@ -295,7 +295,7 @@ public void baseTests() {
String udafSQLFormat = "SELECT %s(s1) FROM us.d1 OVER (RANGE 50 IN [0, 200));";
String udsfSQLFormat = "SELECT %s(s1) FROM us.d1 WHERE key < 50;";

SessionExecuteSqlResult ret = execute(SHOW_TASK_SQL);
SessionExecuteSqlResult ret = execute(SHOW_FUNCTION_SQL);

List<RegisterTaskInfo> registerUDFs = ret.getRegisterTaskInfos();
for (RegisterTaskInfo info : registerUDFs) {
Expand Down Expand Up @@ -1069,7 +1069,7 @@ public void testMultiUDFRegOmit() {
assertEquals(expected, ret.getResultInString(false, ""));

// make sure "udf_b" is dropped and cannot be used
execute("drop python task \"udf_b\";");
execute(String.format(DROP_SQL, "udf_b"));
assertFalse(isUDFRegistered("udf_b"));
taskToBeRemoved.remove("udf_b");
executeFail(statement);
Expand Down Expand Up @@ -1141,7 +1141,7 @@ public void testMultiUDFRegSep() {
assertEquals(expected, ret.getResultInString(false, ""));

// make sure "udf_b" is dropped and cannot be used
execute("drop python task \"udf_b\";");
execute(String.format(DROP_SQL, "udf_b"));
assertFalse(isUDFRegistered("udf_b"));
taskToBeRemoved.remove("udf_b");
executeFail(statement);
Expand Down Expand Up @@ -1217,7 +1217,7 @@ public void testMultiUDFRegFile() {
assertEquals(expected, ret.getResultInString(false, ""));

// make sure "udf_b" is dropped and cannot be used
execute("drop python task \"udf_b\";");
execute(String.format(DROP_SQL, "udf_b"));
assertFalse(isUDFRegistered("udf_b"));
taskToBeRemoved.remove("udf_b");
executeFail(statement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public static void tearDown() throws SessionException {

@Test
public void testUDFFuncList() {
String statement = "show register python task;";
String statement = "SHOW FUNCTIONS;";
String expectedRes =
"Register task infos:\n"
"Functions info:\n"
+ "+----------------+---------------+---------------------+-------+--------+\n"
+ "| NAME| CLASS_NAME| FILE_NAME| IP|UDF_TYPE|\n"
+ "+----------------+---------------+---------------------+-------+--------+\n"
Expand Down

0 comments on commit eed056c

Please sign in to comment.