Skip to content

Commit

Permalink
[vpj] VPJ should not close and release resources on its own (#724)
Browse files Browse the repository at this point in the history
VPJ with D2 fails to send final push job details to controller because VPJ internally closes the D2 client and the subsequent "ControllerClient" requests fail. The URL based "ControllerClient" doesn't have this issue because the close method is essentially a no-op
  • Loading branch information
nisargthakkar authored Nov 16, 2023
1 parent c1cd5b8 commit aed7bfb
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1219,7 +1219,7 @@ public void run() {
ex);
} finally {
try {
killJobAndCleanup(pushJobSetting, controllerClient, kafkaTopicInfo);
killJob(pushJobSetting, controllerClient, kafkaTopicInfo);
LOGGER.info("Successfully killed the failed push job.");
} catch (Exception ex) {
LOGGER.info("Failed to stop and cleanup the job. New pushes might be blocked.", ex);
Expand Down Expand Up @@ -2289,9 +2289,8 @@ void validateKeySchema(
if (!canonicalizedServerSchema.equals(canonicalizedClientSchema)) {
String briefErrorMessage = "Key schema mis-match for store " + setting.storeName;
LOGGER.error(
"{}\n\t\tController URLs: {}\n\t\tschema defined in HDFS: \t{}\n\t\tschema defined in Venice: \t{}",
"{}\n\t\tschema defined in HDFS: \t{}\n\t\tschema defined in Venice: \t{}",
briefErrorMessage,
controllerClient.getControllerDiscoveryUrls(),
pushJobSchemaInfo.getKeySchemaString(),
serverSchema.toString());
throw new VeniceException(briefErrorMessage);
Expand Down Expand Up @@ -3406,7 +3405,7 @@ private String pushJobPropertiesToString(
* @throws Exception
*/
public void cancel() {
killJobAndCleanup(pushJobSetting, controllerClient, kafkaTopicInfo);
killJob(pushJobSetting, controllerClient, kafkaTopicInfo);
if (kafkaTopicInfo != null && StringUtils.isEmpty(kafkaTopicInfo.topic)) {
pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.ERROR.getValue()));
} else {
Expand All @@ -3417,12 +3416,9 @@ public void cancel() {
sendPushJobDetailsToController();
}

private void killJobAndCleanup(
PushJobSetting pushJobSetting,
ControllerClient controllerClient,
TopicInfo topicInfo) {
private void killJob(PushJobSetting pushJobSetting, ControllerClient controllerClient, TopicInfo topicInfo) {
// Attempting to kill job. There's a race condition, but meh. Better kill when you know it's running
killJob();
killComputeJob();
if (!pushJobSetting.isIncrementalPush && topicInfo != null) {
final int maxRetryAttempt = 10;
int currentRetryAttempt = 0;
Expand All @@ -3443,10 +3439,9 @@ private void killJobAndCleanup(
LOGGER.info("Offline push job has been killed, topic: {}", topicInfo.topic);
}
}
close();
}

private void killJob() {
private void killComputeJob() {
if (runningJob == null) {
LOGGER.warn("No op to kill a null running job");
return;
Expand Down Expand Up @@ -3577,7 +3572,7 @@ public static void main(String[] args) {
Utils.exit("Venice Push Job Completed");
}

public static void runPushJob(String jobId, Properties props) {
private static void runPushJob(String jobId, Properties props) {
try (VenicePushJob job = new VenicePushJob(jobId, props)) {
job.run();
}
Expand Down
Loading

0 comments on commit aed7bfb

Please sign in to comment.