Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11531][Manager] Fix bug in DolphinScheduler engine #11532

Merged
merged 5 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ public class DolphinScheduleConstants {
// DS public constants
public static final String DS_ID = "id";
public static final String DS_CODE = "code";
public static final String DS_SUCCESS = "success";
public static final String DS_TOKEN = "token";
public static final String DS_PAGE_SIZE = "pageSize";
public static final String DS_PAGE_NO = "pageNo";
public static final String DS_SEARCH_VAL = "searchVal";
public static final String DS_RESPONSE_DATA = "data";
public static final String DS_RESPONSE_NAME = "name";
public static final String DS_RESPONSE_TOTAL_LIST = "totalList";
public static final int DS_DEFAULT_RETRY_TIMES = 3;
public static final int DS_DEFAULT_WAIT_MILLS = 1000;
public static final String DS_DEFAULT_PAGE_SIZE = "10";
public static final String DS_DEFAULT_PAGE_NO = "1";
public static final String DS_DEFAULT_TIMEZONE_ID = "Asia/Shanghai";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public void start() {
@Override
@VisibleForTesting
public boolean handleRegister(ScheduleInfo scheduleInfo) {
start();
String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL;
String scheduleUrl = projectBaseUrl + "/" + projectCode + DS_SCHEDULE_URL;
String processName = scheduleInfo.getInlongGroupId() + DS_DEFAULT_PROCESS_NAME;
Expand Down Expand Up @@ -191,6 +192,7 @@ public boolean handleRegister(ScheduleInfo scheduleInfo) {
@Override
@VisibleForTesting
public boolean handleUnregister(String groupId) {
start();
String processName = groupId + DS_DEFAULT_PROCESS_NAME;
String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_CODE;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PAGE_NO;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PAGE_SIZE;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_RETRY_TIMES;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_SCHEDULE_TIME_FORMAT;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_DESC;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_GEN_NUM;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_NAME;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TIMEZONE_ID;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_WAIT_MILLS;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ID;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_URL;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PAGE_NO;
Expand All @@ -78,6 +80,7 @@
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_TOTAL_LIST;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_DEF;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SEARCH_VAL;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SUCCESS;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_DEFINITION;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_GEN_NUM;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_RELATION;
Expand All @@ -89,6 +92,7 @@
import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.JSON_PARSE_ERROR;
import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.NETWORK_ERROR;
import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_CREATION_FAILED;
import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_IN_USED_ERROR;
import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_QUERY_FAILED;
import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_RELEASE_FAILED;
import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROJECT_CREATION_FAILED;
Expand Down Expand Up @@ -489,17 +493,35 @@ public static void delete(String url, String token, long code) {
Map<String, String> header = buildHeader(token);

String requestUrl = url + "/" + code;
for (int attempt = 1; attempt <= DS_DEFAULT_RETRY_TIMES; attempt++) {
emptyOVO marked this conversation as resolved.
Show resolved Hide resolved
JsonObject response = executeHttpRequest(requestUrl, DELETE, new HashMap<>(), header);

JsonObject response = executeHttpRequest(requestUrl, DELETE, new HashMap<>(), header);
LOGGER.info("delete process or project success, response data: {}", response);
if (response.get(DS_SUCCESS).getAsBoolean()) {
LOGGER.info("Delete process or project success, response data: {}", response);
return;
}

if (response.get(DS_CODE).getAsInt() == PROCESS_DEFINITION_IN_USED_ERROR) {
emptyOVO marked this conversation as resolved.
Show resolved Hide resolved
emptyOVO marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.warn("Attempt {} of {}, retrying after {} ms...", attempt, DS_DEFAULT_RETRY_TIMES,
DS_DEFAULT_WAIT_MILLS);
Thread.sleep(DS_DEFAULT_WAIT_MILLS);
}
}

} catch (InterruptedException e) {
aloyszhang marked this conversation as resolved.
Show resolved Hide resolved
Thread.currentThread().interrupt();
LOGGER.error("Thread interrupted while retrying delete process or project: ", e);
throw new DolphinScheduleException(
DELETION_FAILED,
String.format("Thread interrupted while retrying delete for code: %d at URL: %s", code, url), e);
emptyOVO marked this conversation as resolved.
Show resolved Hide resolved
} catch (JsonParseException e) {
LOGGER.error("JsonParseException during deleting process or project", e);
throw new DolphinScheduleException(
JSON_PARSE_ERROR,
String.format("Error deleting process or project with code: %d at URL: %s", code, url), e);
emptyOVO marked this conversation as resolved.
Show resolved Hide resolved

} catch (DolphinScheduleException e) {
LOGGER.error("Error deleting process or project: ", e);
emptyOVO marked this conversation as resolved.
Show resolved Hide resolved
throw new DolphinScheduleException(
DELETION_FAILED,
String.format("Error deleting process or project with code: %d at URL: %s", code, url), e);
emptyOVO marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class DolphinScheduleException extends RuntimeException {
public static final String GEN_TASK_CODE_FAILED = "GEN_TASK_CODE_FAILED";

// Process-related error codes
public static final int PROCESS_DEFINITION_IN_USED_ERROR = 10163;
public static final String PROCESS_DEFINITION_QUERY_FAILED = "PROCESS_DEFINITION_QUERY_FAILED";
public static final String PROCESS_DEFINITION_CREATION_FAILED = "PROCESS_DEFINITION_CREATION_FAILED";
public static final String PROCESS_DEFINITION_RELEASE_FAILED = "PROCESS_DEFINITION_RELEASE_FAILED";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public void beforeAll() {

String token = accessToken();
dolphinScheduleEngine.setToken(token);
dolphinScheduleEngine.start();
}

@AfterAll
Expand Down
Loading