Skip to content

Commit

Permalink
[vpj] VPJ should not close and release resources on its own
Browse files Browse the repository at this point in the history
  • Loading branch information
nisargthakkar committed Oct 31, 2023
1 parent 52fde32 commit 80e2d5a
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ public void run() {
ex);
} finally {
try {
killJobAndCleanup(pushJobSetting, controllerClient, kafkaTopicInfo);
killJob(pushJobSetting, controllerClient, kafkaTopicInfo);
LOGGER.info("Successfully killed the failed push job.");
} catch (Exception ex) {
LOGGER.info("Failed to stop and cleanup the job. New pushes might be blocked.", ex);
Expand Down Expand Up @@ -2273,9 +2273,8 @@ void validateKeySchema(
if (!canonicalizedServerSchema.equals(canonicalizedClientSchema)) {
String briefErrorMessage = "Key schema mis-match for store " + setting.storeName;
LOGGER.error(
"{}\n\t\tController URLs: {}\n\t\tschema defined in HDFS: \t{}\n\t\tschema defined in Venice: \t{}",
"{}\n\t\tschema defined in HDFS: \t{}\n\t\tschema defined in Venice: \t{}",
briefErrorMessage,
controllerClient.getControllerDiscoveryUrls(),
pushJobSchemaInfo.getKeySchemaString(),
serverSchema.toString());
throw new VeniceException(briefErrorMessage);
Expand Down Expand Up @@ -3384,7 +3383,7 @@ private String pushJobPropertiesToString(
* @throws Exception
*/
public void cancel() {
killJobAndCleanup(pushJobSetting, controllerClient, kafkaTopicInfo);
killJob(pushJobSetting, controllerClient, kafkaTopicInfo);
if (kafkaTopicInfo != null && StringUtils.isEmpty(kafkaTopicInfo.topic)) {
pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.ERROR.getValue()));
} else {
Expand All @@ -3395,12 +3394,9 @@ public void cancel() {
sendPushJobDetailsToController();
}

private void killJobAndCleanup(
PushJobSetting pushJobSetting,
ControllerClient controllerClient,
TopicInfo topicInfo) {
private void killJob(PushJobSetting pushJobSetting, ControllerClient controllerClient, TopicInfo topicInfo) {
// Attempting to kill job. There's a race condition, but meh. Better kill when you know it's running
killJob();
killComputeJob();
if (!pushJobSetting.isIncrementalPush && topicInfo != null) {
final int maxRetryAttempt = 10;
int currentRetryAttempt = 0;
Expand All @@ -3421,10 +3417,9 @@ private void killJobAndCleanup(
LOGGER.info("Offline push job has been killed, topic: {}", topicInfo.topic);
}
}
close();
}

private void killJob() {
private void killComputeJob() {
if (runningJob == null) {
LOGGER.warn("No op to kill a null running job");
return;
Expand Down Expand Up @@ -3555,7 +3550,7 @@ public static void main(String[] args) {
Utils.exit("Venice Push Job Completed");
}

public static void runPushJob(String jobId, Properties props) {
private static void runPushJob(String jobId, Properties props) {
try (VenicePushJob job = new VenicePushJob(jobId, props)) {
job.run();
}
Expand Down
Loading

0 comments on commit 80e2d5a

Please sign in to comment.