From d20f024e85529108328858318204d24a2dda287e Mon Sep 17 00:00:00 2001 From: emptyOVO Date: Thu, 21 Nov 2024 21:21:32 +0800 Subject: [PATCH 1/5] [INLONG-11531][Manager] Fix bug in DolphinScheduler engine --- .../schedule/dolphinscheduler/DolphinScheduleEngine.java | 3 +++ .../schedule/dolphinscheduler/DolphinScheduleEngineTest.java | 1 - 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java index 7b09481cea..5095f1f0a1 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java @@ -132,6 +132,7 @@ public void start() { @Override @VisibleForTesting public boolean handleRegister(ScheduleInfo scheduleInfo) { + start(); String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL; String scheduleUrl = projectBaseUrl + "/" + projectCode + DS_SCHEDULE_URL; String processName = scheduleInfo.getInlongGroupId() + DS_DEFAULT_PROCESS_NAME; @@ -191,6 +192,7 @@ public boolean handleRegister(ScheduleInfo scheduleInfo) { @Override @VisibleForTesting public boolean handleUnregister(String groupId) { + start(); String processName = groupId + DS_DEFAULT_PROCESS_NAME; String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL; @@ -224,6 +226,7 @@ public boolean handleUnregister(String groupId) { @Override @VisibleForTesting public boolean handleUpdate(ScheduleInfo scheduleInfo) { + start(); LOGGER.info("Update dolphin schedule info for {}", scheduleInfo.getInlongGroupId()); try { return handleUnregister(scheduleInfo.getInlongGroupId()) && handleRegister(scheduleInfo); diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java index f95a5268ee..b63b04e736 100644 --- a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java @@ -53,7 +53,6 @@ public void beforeAll() { String token = accessToken(); dolphinScheduleEngine.setToken(token); - dolphinScheduleEngine.start(); } @AfterAll From 104d8f3418955d4da1f687f913c7c14a0cd87454 Mon Sep 17 00:00:00 2001 From: emptyOVO Date: Fri, 22 Nov 2024 16:57:41 +0800 Subject: [PATCH 2/5] fix: delete useless code --- .../manager/schedule/dolphinscheduler/DolphinScheduleEngine.java | 1 - 1 file changed, 1 deletion(-) diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java index 5095f1f0a1..7e064c20f0 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java @@ -226,7 +226,6 @@ public boolean handleUnregister(String groupId) { @Override @VisibleForTesting public boolean handleUpdate(ScheduleInfo scheduleInfo) { - start(); LOGGER.info("Update dolphin schedule info for {}", scheduleInfo.getInlongGroupId()); try { return handleUnregister(scheduleInfo.getInlongGroupId()) && handleRegister(scheduleInfo); From f817b22130560811490763da73ec17a9ff362597 Mon Sep 17 00:00:00 2001 From: emptyOVO Date: Fri, 22 Nov 2024 17:57:21 +0800 Subject: [PATCH 3/5] fix: add retry mechanism while deleting project or process --- .../DolphinScheduleConstants.java | 3 +++ .../DolphinScheduleUtils.java | 26 +++++++++++++++++-- .../exception/DolphinScheduleException.java | 1 + 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java index 89dcda5b77..1488ca1fe8 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java @@ -22,6 +22,7 @@ public class DolphinScheduleConstants { // DS public constants public static final String DS_ID = "id"; public static final String DS_CODE = "code"; + public static final String DS_SUCCESS = "success"; public static final String DS_TOKEN = "token"; public static final String DS_PAGE_SIZE = "pageSize"; public static final String DS_PAGE_NO = "pageNo"; @@ -29,6 +30,8 @@ public class DolphinScheduleConstants { public static final String DS_RESPONSE_DATA = "data"; public static final String DS_RESPONSE_NAME = "name"; public static final String DS_RESPONSE_TOTAL_LIST = "totalList"; + public static final int DS_DEFAULT_RETRY_TIMES = 3; + public static final int DS_DEFAULT_WAIT_MILLS = 1000; public static final String DS_DEFAULT_PAGE_SIZE = "10"; public static final String DS_DEFAULT_PAGE_NO = "1"; public static final String DS_DEFAULT_TIMEZONE_ID = "Asia/Shanghai"; diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java index 87cb1c5127..ac6c783c24 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java @@ -57,11 +57,13 @@ import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_CODE; import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PAGE_NO; import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PAGE_SIZE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_RETRY_TIMES; import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_SCHEDULE_TIME_FORMAT; import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_DESC; import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_GEN_NUM; import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_NAME; import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TIMEZONE_ID; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_WAIT_MILLS; import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ID; import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_URL; import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PAGE_NO; @@ -78,6 +80,7 @@ import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_TOTAL_LIST; import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_DEF; import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SEARCH_VAL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SUCCESS; import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_DEFINITION; import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_GEN_NUM; import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_RELATION; @@ -89,6 +92,7 @@ import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.JSON_PARSE_ERROR; import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.NETWORK_ERROR; import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_CREATION_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_IN_USED_ERROR; import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_QUERY_FAILED; import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_RELEASE_FAILED; import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROJECT_CREATION_FAILED; @@ -489,10 +493,27 @@ public static void delete(String url, String token, long code) { Map header = buildHeader(token); String requestUrl = url + "/" + code; + for (int attempt = 1; attempt <= DS_DEFAULT_RETRY_TIMES; attempt++) { + JsonObject response = executeHttpRequest(requestUrl, DELETE, new HashMap<>(), header); - JsonObject response = executeHttpRequest(requestUrl, DELETE, new HashMap<>(), header); - LOGGER.info("delete process or project success, response data: {}", response); + if (response.get(DS_SUCCESS).getAsBoolean()) { + LOGGER.info("Delete process or project success, response data: {}", response); + return; + } + + if (response.get(DS_CODE).getAsInt() == PROCESS_DEFINITION_IN_USED_ERROR) { + LOGGER.warn("Attempt {} of {}, retrying after {} ms...", attempt, DS_DEFAULT_RETRY_TIMES, + DS_DEFAULT_WAIT_MILLS); + Thread.sleep(DS_DEFAULT_WAIT_MILLS); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.error("Thread interrupted while retrying delete process or project: ", e); + throw new DolphinScheduleException( + DELETION_FAILED, + String.format("Thread interrupted while retrying delete for code: %d at URL: %s", code, url), e); } catch (JsonParseException e) { LOGGER.error("JsonParseException during deleting process or project", e); throw new DolphinScheduleException( @@ -500,6 +521,7 @@ public static void delete(String url, String token, long code) { String.format("Error deleting process or project with code: %d at URL: %s", code, url), e); } catch (DolphinScheduleException e) { + LOGGER.error("Error deleting process or project: ", e); throw new DolphinScheduleException( DELETION_FAILED, String.format("Error deleting process or project with code: %d at URL: %s", code, url), e); diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/DolphinScheduleException.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/DolphinScheduleException.java index 348697b672..b5238a3a35 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/DolphinScheduleException.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/DolphinScheduleException.java @@ -42,6 +42,7 @@ public class DolphinScheduleException extends RuntimeException { public static final String GEN_TASK_CODE_FAILED = "GEN_TASK_CODE_FAILED"; // Process-related error codes + public static final int PROCESS_DEFINITION_IN_USED_ERROR = 10163; public static final String PROCESS_DEFINITION_QUERY_FAILED = "PROCESS_DEFINITION_QUERY_FAILED"; public static final String PROCESS_DEFINITION_CREATION_FAILED = "PROCESS_DEFINITION_CREATION_FAILED"; public static final String PROCESS_DEFINITION_RELEASE_FAILED = "PROCESS_DEFINITION_RELEASE_FAILED"; From d6ac43ddcb23efe9b0ce86b7726295c24058391f Mon Sep 17 00:00:00 2001 From: emptyOVO Date: Mon, 25 Nov 2024 13:38:48 +0800 Subject: [PATCH 4/5] fix: exception handle when retry time exceeded --- .../DolphinScheduleUtils.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java index ac6c783c24..b1a9596cc6 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java @@ -493,7 +493,7 @@ public static void delete(String url, String token, long code) { Map header = buildHeader(token); String requestUrl = url + "/" + code; - for (int attempt = 1; attempt <= DS_DEFAULT_RETRY_TIMES; attempt++) { + for (int retryTime = 1; retryTime <= DS_DEFAULT_RETRY_TIMES; retryTime++) { JsonObject response = executeHttpRequest(requestUrl, DELETE, new HashMap<>(), header); if (response.get(DS_SUCCESS).getAsBoolean()) { @@ -502,7 +502,18 @@ public static void delete(String url, String token, long code) { } if (response.get(DS_CODE).getAsInt() == PROCESS_DEFINITION_IN_USED_ERROR) { - LOGGER.warn("Attempt {} of {}, retrying after {} ms...", attempt, DS_DEFAULT_RETRY_TIMES, + + if (retryTime == DS_DEFAULT_RETRY_TIMES) { + LOGGER.error( + "Maximum retry attempts reached for deleting process or project. URL: {}, Code: {}", + url, code); + throw new DolphinScheduleException( + DELETION_FAILED, + String.format("Failed to delete after %d retries. Code: %d at URL: %s", + DS_DEFAULT_RETRY_TIMES, code, url)); + } + + LOGGER.warn("Attempt {} of {}, retrying after {} ms...", retryTime, DS_DEFAULT_RETRY_TIMES, DS_DEFAULT_WAIT_MILLS); Thread.sleep(DS_DEFAULT_WAIT_MILLS); } @@ -513,18 +524,18 @@ public static void delete(String url, String token, long code) { LOGGER.error("Thread interrupted while retrying delete process or project: ", e); throw new DolphinScheduleException( DELETION_FAILED, - String.format("Thread interrupted while retrying delete for code: %d at URL: %s", code, url), e); + String.format("Thread interrupted while retrying delete for code: %d at URL: %s", code, url)); } catch (JsonParseException e) { LOGGER.error("JsonParseException during deleting process or project", e); throw new DolphinScheduleException( JSON_PARSE_ERROR, - String.format("Error deleting process or project with code: %d at URL: %s", code, url), e); + String.format("Error deleting process or project with code: %d at URL: %s", code, url)); } catch (DolphinScheduleException e) { LOGGER.error("Error deleting process or project: ", e); throw new DolphinScheduleException( DELETION_FAILED, - String.format("Error deleting process or project with code: %d at URL: %s", code, url), e); + String.format("Error deleting process or project with code: %d at URL: %s", code, url)); } } From 8c89339410cbe3f179b8eadd20e0dfab2c28ed9c Mon Sep 17 00:00:00 2001 From: emptyOVO Date: Mon, 25 Nov 2024 14:49:36 +0800 Subject: [PATCH 5/5] fix: handle delete failed logs --- .../DolphinScheduleUtils.java | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java index b1a9596cc6..5fd6dd3629 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java @@ -495,29 +495,28 @@ public static void delete(String url, String token, long code) { String requestUrl = url + "/" + code; for (int retryTime = 1; retryTime <= DS_DEFAULT_RETRY_TIMES; retryTime++) { JsonObject response = executeHttpRequest(requestUrl, DELETE, new HashMap<>(), header); + if (response.get(DS_CODE).getAsInt() == PROCESS_DEFINITION_IN_USED_ERROR) { + + LOGGER.warn( + "Retrying for current retry time ={}, maximum retry count={}, code={}, url={}, after {} ms...", + retryTime, DS_DEFAULT_RETRY_TIMES, code, url, DS_DEFAULT_WAIT_MILLS); + Thread.sleep(DS_DEFAULT_WAIT_MILLS); - if (response.get(DS_SUCCESS).getAsBoolean()) { + } else if (response.get(DS_SUCCESS).getAsBoolean()) { LOGGER.info("Delete process or project success, response data: {}", response); return; + } else { + LOGGER.warn("Delete process or project failed, response data: {}", response); } - if (response.get(DS_CODE).getAsInt() == PROCESS_DEFINITION_IN_USED_ERROR) { - - if (retryTime == DS_DEFAULT_RETRY_TIMES) { - LOGGER.error( - "Maximum retry attempts reached for deleting process or project. URL: {}, Code: {}", - url, code); - throw new DolphinScheduleException( - DELETION_FAILED, - String.format("Failed to delete after %d retries. Code: %d at URL: %s", - DS_DEFAULT_RETRY_TIMES, code, url)); - } - - LOGGER.warn("Attempt {} of {}, retrying after {} ms...", retryTime, DS_DEFAULT_RETRY_TIMES, - DS_DEFAULT_WAIT_MILLS); - Thread.sleep(DS_DEFAULT_WAIT_MILLS); - } } + LOGGER.error( + "Maximum retry attempts reached for deleting process or project. URL: {}, Code: {}", + url, code); + throw new DolphinScheduleException( + DELETION_FAILED, + String.format("Failed to delete after %d retries. Code: %d at URL: %s", + DS_DEFAULT_RETRY_TIMES, code, url)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -532,7 +531,7 @@ public static void delete(String url, String token, long code) { String.format("Error deleting process or project with code: %d at URL: %s", code, url)); } catch (DolphinScheduleException e) { - LOGGER.error("Error deleting process or project: ", e); + LOGGER.error("Error deleting process or project for code={}, url={} ", code, url, e); throw new DolphinScheduleException( DELETION_FAILED, String.format("Error deleting process or project with code: %d at URL: %s", code, url));