From 80e2d5a731176d3489d943537bc20b0cabb99afc Mon Sep 17 00:00:00 2001 From: Nisarg Thakkar Date: Thu, 5 Oct 2023 13:49:15 -0700 Subject: [PATCH] [vpj] VPJ should not close and release resources on its own --- .../linkedin/venice/hadoop/VenicePushJob.java | 19 +- .../venice/hadoop/VenicePushJobTest.java | 272 ++++++++++-------- 2 files changed, 152 insertions(+), 139 deletions(-) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java index fed25c5408..fd6ac364a4 100755 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java @@ -1213,7 +1213,7 @@ public void run() { ex); } finally { try { - killJobAndCleanup(pushJobSetting, controllerClient, kafkaTopicInfo); + killJob(pushJobSetting, controllerClient, kafkaTopicInfo); LOGGER.info("Successfully killed the failed push job."); } catch (Exception ex) { LOGGER.info("Failed to stop and cleanup the job. New pushes might be blocked.", ex); @@ -2273,9 +2273,8 @@ void validateKeySchema( if (!canonicalizedServerSchema.equals(canonicalizedClientSchema)) { String briefErrorMessage = "Key schema mis-match for store " + setting.storeName; LOGGER.error( - "{}\n\t\tController URLs: {}\n\t\tschema defined in HDFS: \t{}\n\t\tschema defined in Venice: \t{}", + "{}\n\t\tschema defined in HDFS: \t{}\n\t\tschema defined in Venice: \t{}", briefErrorMessage, - controllerClient.getControllerDiscoveryUrls(), pushJobSchemaInfo.getKeySchemaString(), serverSchema.toString()); throw new VeniceException(briefErrorMessage); @@ -3384,7 +3383,7 @@ private String pushJobPropertiesToString( * @throws Exception */ public void cancel() { - killJobAndCleanup(pushJobSetting, controllerClient, kafkaTopicInfo); + killJob(pushJobSetting, controllerClient, kafkaTopicInfo); if (kafkaTopicInfo != null && StringUtils.isEmpty(kafkaTopicInfo.topic)) { pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.ERROR.getValue())); } else { @@ -3395,12 +3394,9 @@ public void cancel() { sendPushJobDetailsToController(); } - private void killJobAndCleanup( - PushJobSetting pushJobSetting, - ControllerClient controllerClient, - TopicInfo topicInfo) { + private void killJob(PushJobSetting pushJobSetting, ControllerClient controllerClient, TopicInfo topicInfo) { // Attempting to kill job. There's a race condition, but meh. Better kill when you know it's running - killJob(); + killComputeJob(); if (!pushJobSetting.isIncrementalPush && topicInfo != null) { final int maxRetryAttempt = 10; int currentRetryAttempt = 0; @@ -3421,10 +3417,9 @@ private void killJobAndCleanup( LOGGER.info("Offline push job has been killed, topic: {}", topicInfo.topic); } } - close(); } - private void killJob() { + private void killComputeJob() { if (runningJob == null) { LOGGER.warn("No op to kill a null running job"); return; @@ -3555,7 +3550,7 @@ public static void main(String[] args) { Utils.exit("Venice Push Job Completed"); } - public static void runPushJob(String jobId, Properties props) { + private static void runPushJob(String jobId, Properties props) { try (VenicePushJob job = new VenicePushJob(jobId, props)) { job.run(); } diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java index f7707c0b71..8c8f21fe91 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java @@ -109,8 +109,9 @@ public void testVPJcheckInputUpdateSchema() { public void testRepushTTLJobWithNonKafkaInput() { Properties repushProps = new Properties(); repushProps.setProperty(REPUSH_TTL_ENABLE, "true"); - VenicePushJob pushJob = getSpyVenicePushJob(repushProps, null); - pushJob.run(); + try (VenicePushJob pushJob = getSpyVenicePushJob(repushProps, null)) { + pushJob.run(); + } } @Test(expectedExceptions = VeniceException.class, expectedExceptionsMessageRegExp = ".*Repush TTL is only supported for real-time only store.*") @@ -126,8 +127,9 @@ public void testRepushTTLJobWithBatchStore() { } }); }); - VenicePushJob pushJob = getSpyVenicePushJob(repushProps, client); - pushJob.run(); + try (VenicePushJob pushJob = getSpyVenicePushJob(repushProps, client)) { + pushJob.run(); + } } @Test @@ -138,17 +140,19 @@ public void testPushJobSettingWithD2Routing() { storeInfo.setVersions(Collections.singletonList(version)); storeInfo.setHybridStoreConfig(new HybridStoreConfigImpl(0, 0, 0, null, null)); }); - VenicePushJob pushJob = getSpyVenicePushJobWithD2Routing(new Properties(), client); - VenicePushJob.PushJobSetting pushJobSetting = pushJob.getPushJobSetting(); - Assert.assertTrue(pushJobSetting.d2Routing); - Assert.assertEquals(pushJobSetting.controllerD2ServiceName, TEST_CHILD_CONTROLLER_D2_SERVICE); - Assert.assertEquals(pushJobSetting.childControllerRegionD2ZkHosts, TEST_ZK_ADDRESS); + try (VenicePushJob pushJob = getSpyVenicePushJobWithD2Routing(new Properties(), client)) { + VenicePushJob.PushJobSetting pushJobSetting = pushJob.getPushJobSetting(); + Assert.assertTrue(pushJobSetting.d2Routing); + Assert.assertEquals(pushJobSetting.controllerD2ServiceName, TEST_CHILD_CONTROLLER_D2_SERVICE); + Assert.assertEquals(pushJobSetting.childControllerRegionD2ZkHosts, TEST_ZK_ADDRESS); + } - VenicePushJob multiRegionPushJob = getSpyVenicePushJobWithMultiRegionD2Routing(new Properties(), client); - VenicePushJob.PushJobSetting multiRegionPushJobSetting = multiRegionPushJob.getPushJobSetting(); - Assert.assertTrue(multiRegionPushJobSetting.d2Routing); - Assert.assertEquals(multiRegionPushJobSetting.controllerD2ServiceName, TEST_PARENT_CONTROLLER_D2_SERVICE); - Assert.assertEquals(multiRegionPushJobSetting.parentControllerRegionD2ZkHosts, TEST_PARENT_ZK_ADDRESS); + try (VenicePushJob multiRegionPushJob = getSpyVenicePushJobWithMultiRegionD2Routing(new Properties(), client)) { + VenicePushJob.PushJobSetting multiRegionPushJobSetting = multiRegionPushJob.getPushJobSetting(); + Assert.assertTrue(multiRegionPushJobSetting.d2Routing); + Assert.assertEquals(multiRegionPushJobSetting.controllerD2ServiceName, TEST_PARENT_CONTROLLER_D2_SERVICE); + Assert.assertEquals(multiRegionPushJobSetting.parentControllerRegionD2ZkHosts, TEST_PARENT_ZK_ADDRESS); + } } @Test @@ -161,9 +165,10 @@ public void testPushJobSettingWithLivenessHeartbeat() { storeInfo.setVersions(Collections.singletonList(version)); storeInfo.setHybridStoreConfig(new HybridStoreConfigImpl(0, 0, 0, null, null)); }); - VenicePushJob pushJob = getSpyVenicePushJob(vpjProps, client); - VenicePushJob.PushJobSetting pushJobSetting = pushJob.getPushJobSetting(); - Assert.assertTrue(pushJobSetting.livenessHeartbeatEnabled); + try (VenicePushJob pushJob = getSpyVenicePushJob(vpjProps, client)) { + VenicePushJob.PushJobSetting pushJobSetting = pushJob.getPushJobSetting(); + Assert.assertTrue(pushJobSetting.livenessHeartbeatEnabled); + } } @Test @@ -174,23 +179,25 @@ public void testPushJobPollStatus() { JobStatusQueryResponse response = mock(JobStatusQueryResponse.class); doReturn("UNKNOWN").when(response).getStatus(); doReturn(response).when(client).queryOverallJobStatus(anyString(), eq(Optional.empty()), eq(null)); - VenicePushJob pushJob = getSpyVenicePushJob(vpjProps, client); - VenicePushJob.PushJobSetting pushJobSetting = pushJob.getPushJobSetting(); - pushJobSetting.jobStatusInUnknownStateTimeoutMs = 10; - Assert.assertTrue(pushJobSetting.livenessHeartbeatEnabled); - VenicePushJob.TopicInfo topicInfo = new VenicePushJob.TopicInfo(); - topicInfo.version = 1; - topicInfo.topic = "abc"; - pushJob.storeSetting = new VenicePushJob.StoreSetting(); - pushJob.storeSetting.storeResponse = new StoreResponse(); - pushJob.storeSetting.storeResponse.setName("abc"); - StoreInfo storeInfo = new StoreInfo(); - storeInfo.setBootstrapToOnlineTimeoutInHours(0); - pushJob.storeSetting.storeResponse.setStore(storeInfo); - VeniceException exception = Assert.expectThrows( - VeniceException.class, - () -> pushJob.pollStatusUntilComplete(Optional.empty(), client, pushJobSetting, topicInfo, null, false)); - Assert.assertEquals(exception.getMessage(), "Failing push-job for store abc which is still running after 0 hours."); + try (VenicePushJob pushJob = getSpyVenicePushJob(vpjProps, client)) { + VenicePushJob.PushJobSetting pushJobSetting = pushJob.getPushJobSetting(); + pushJobSetting.jobStatusInUnknownStateTimeoutMs = 10; + Assert.assertTrue(pushJobSetting.livenessHeartbeatEnabled); + VenicePushJob.TopicInfo topicInfo = new VenicePushJob.TopicInfo(); + topicInfo.version = 1; + topicInfo.topic = "abc"; + pushJob.storeSetting = new VenicePushJob.StoreSetting(); + pushJob.storeSetting.storeResponse = new StoreResponse(); + pushJob.storeSetting.storeResponse.setName("abc"); + StoreInfo storeInfo = new StoreInfo(); + storeInfo.setBootstrapToOnlineTimeoutInHours(0); + pushJob.storeSetting.storeResponse.setStore(storeInfo); + VeniceException exception = Assert.expectThrows( + VeniceException.class, + () -> pushJob.pollStatusUntilComplete(Optional.empty(), client, pushJobSetting, topicInfo, null, false)); + Assert + .assertEquals(exception.getMessage(), "Failing push-job for store abc which is still running after 0 hours."); + } } private Properties getRepushReadyProps() { @@ -391,30 +398,33 @@ public void testVenicePushJobCanHandleLegacyFields() { Properties props = getVpjRequiredProperties(); props.put(LEGACY_AVRO_KEY_FIELD_PROP, "id"); props.put(LEGACY_AVRO_VALUE_FIELD_PROP, "message"); - VenicePushJob vpj = new VenicePushJob(PUSH_JOB_ID, props); - VeniceProperties veniceProperties = vpj.getVeniceProperties(); - assertNotNull(veniceProperties); - assertEquals(veniceProperties.getString(KEY_FIELD_PROP), "id"); - assertEquals(veniceProperties.getString(VALUE_FIELD_PROP), "message"); + try (VenicePushJob vpj = new VenicePushJob(PUSH_JOB_ID, props)) { + VeniceProperties veniceProperties = vpj.getVeniceProperties(); + assertNotNull(veniceProperties); + assertEquals(veniceProperties.getString(KEY_FIELD_PROP), "id"); + assertEquals(veniceProperties.getString(VALUE_FIELD_PROP), "message"); + } } @Test public void testGetPushJobSetting() { Properties props = getVpjRequiredProperties(); - VenicePushJob vpj = new VenicePushJob(PUSH_JOB_ID, props); - VenicePushJob.PushJobSetting pushJobSetting = vpj.getPushJobSetting(); - assertNotNull(pushJobSetting); - assertTrue(pushJobSetting.d2Routing); + try (VenicePushJob vpj = new VenicePushJob(PUSH_JOB_ID, props)) { + VenicePushJob.PushJobSetting pushJobSetting = vpj.getPushJobSetting(); + assertNotNull(pushJobSetting); + assertTrue(pushJobSetting.d2Routing); + } } @Test public void testGetPushJobSettingShouldNotUseD2RoutingIfControllerUrlDoesNotStartWithD2() { Properties props = getVpjRequiredProperties(); props.put(VENICE_DISCOVER_URL_PROP, "http://venice.db:9898"); - VenicePushJob vpj = new VenicePushJob(PUSH_JOB_ID, props); - VenicePushJob.PushJobSetting pushJobSetting = vpj.getPushJobSetting(); - assertNotNull(pushJobSetting); - assertFalse(pushJobSetting.d2Routing); + try (VenicePushJob vpj = new VenicePushJob(PUSH_JOB_ID, props)) { + VenicePushJob.PushJobSetting pushJobSetting = vpj.getPushJobSetting(); + assertNotNull(pushJobSetting); + assertFalse(pushJobSetting.d2Routing); + } } @Test(expectedExceptions = VeniceException.class, expectedExceptionsMessageRegExp = "Incremental push is not supported while using Kafka Input Format") @@ -526,27 +536,30 @@ public void testTargetedRegionPushConfigOverride() throws Exception { ControllerClient client = getClient(store -> { store.setNativeReplicationSourceFabric(""); }); - VenicePushJob pushJob = getSpyVenicePushJob(props, client); JobStatusQueryResponse response = mockJobStatusQuery(); - doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), anyString()); - skipVPJValidation(pushJob); - try { - pushJob.run(); - Assert.fail("Test should fail, but doesn't."); - } catch (VeniceException e) { - Assert.assertTrue( - e.getMessage() - .contains( - "The store either does not have native replication mode enabled or set up default source fabric.")); + + try (VenicePushJob pushJob = getSpyVenicePushJob(props, client)) { + doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), anyString()); + skipVPJValidation(pushJob); + try { + pushJob.run(); + Assert.fail("Test should fail, but doesn't."); + } catch (VeniceException e) { + Assert.assertTrue( + e.getMessage() + .contains( + "The store either does not have native replication mode enabled or set up default source fabric.")); + } } props.put(TARGETED_REGION_PUSH_LIST, "dc-0, dc-1"); client = getClient(); - pushJob = getSpyVenicePushJob(props, client); - doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), anyString()); - skipVPJValidation(pushJob); - pushJob.run(); - Assert.assertEquals(pushJob.pushJobSetting.targetedRegions, "dc-0, dc-1"); + try (VenicePushJob pushJob = getSpyVenicePushJob(props, client)) { + doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), anyString()); + skipVPJValidation(pushJob); + pushJob.run(); + Assert.assertEquals(pushJob.pushJobSetting.targetedRegions, "dc-0, dc-1"); + } } @Test @@ -556,25 +569,26 @@ public void testTargetedRegionPushReporting() throws Exception { props.put(POST_VALIDATION_CONSUMPTION_ENABLED, false); props.put(TARGETED_REGION_PUSH_LIST, "dc-0, dc-1"); ControllerClient client = getClient(); - VenicePushJob pushJob = getSpyVenicePushJob(props, client); - skipVPJValidation(pushJob); + try (VenicePushJob pushJob = getSpyVenicePushJob(props, client)) { + skipVPJValidation(pushJob); + + JobStatusQueryResponse response = mockJobStatusQuery(); + Map extraInfo = response.getExtraInfo(); + // one of the regions failed, so should fail + extraInfo.put("dc-0", ExecutionStatus.NOT_STARTED.toString()); + doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), anyString()); + try { + pushJob.run(); + Assert.fail("Test should fail, but doesn't."); + } catch (VeniceException e) { + assertTrue(e.getMessage().contains("Push job error")); + } - JobStatusQueryResponse response = mockJobStatusQuery(); - Map extraInfo = response.getExtraInfo(); - // one of the regions failed, so should fail - extraInfo.put("dc-0", ExecutionStatus.NOT_STARTED.toString()); - doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), anyString()); - try { + extraInfo.put("dc-0", ExecutionStatus.COMPLETED.toString()); + extraInfo.put("dc-1", ExecutionStatus.COMPLETED.toString()); + // both regions completed, so should succeed pushJob.run(); - Assert.fail("Test should fail, but doesn't."); - } catch (VeniceException e) { - assertTrue(e.getMessage().contains("Push job error")); } - - extraInfo.put("dc-0", ExecutionStatus.COMPLETED.toString()); - extraInfo.put("dc-1", ExecutionStatus.COMPLETED.toString()); - // both regions completed, so should succeed - pushJob.run(); } @Test @@ -583,42 +597,44 @@ public void testTargetedRegionPushPostValidationConsumptionForBatchStore() throw props.put(TARGETED_REGION_PUSH_ENABLED, true); props.put(POST_VALIDATION_CONSUMPTION_ENABLED, true); ControllerClient client = getClient(); - VenicePushJob pushJob = getSpyVenicePushJob(props, client); - skipVPJValidation(pushJob); + try (VenicePushJob pushJob = getSpyVenicePushJob(props, client)) { + skipVPJValidation(pushJob); - JobStatusQueryResponse response = mockJobStatusQuery(); - doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), anyString()); + JobStatusQueryResponse response = mockJobStatusQuery(); + doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), anyString()); - VersionCreationResponse mockVersionCreationResponse = mockVersionCreationResponse(client); - mockVersionCreationResponse.setKafkaSourceRegion(null); + VersionCreationResponse mockVersionCreationResponse = mockVersionCreationResponse(client); + mockVersionCreationResponse.setKafkaSourceRegion(null); - // verify the kafka source region must be present when kick off post-validation consumption - try { - pushJob.run(); - Assert.fail("Test should fail, but doesn't."); - } catch (VeniceException e) { - assertTrue(e.getMessage().contains("Post-validation consumption halted due to no available source region found")); - } - mockVersionCreationResponse.setKafkaSourceRegion("dc-0"); - verify(pushJob, times(1)).postPushValidation(); - - ControllerResponse badDataRecoveryResponse = new ControllerResponse(); - badDataRecoveryResponse.setError("error"); - doReturn(badDataRecoveryResponse).when(client) - .dataRecovery(anyString(), anyString(), anyString(), anyInt(), anyBoolean(), anyBoolean(), any()); - // verify failure of data recovery will fail the push job - try { - pushJob.run(); - } catch (VeniceException e) { - assertTrue(e.getMessage().contains("Can't push data for region")); - } + // verify the kafka source region must be present when kick off post-validation consumption + try { + pushJob.run(); + Assert.fail("Test should fail, but doesn't."); + } catch (VeniceException e) { + assertTrue( + e.getMessage().contains("Post-validation consumption halted due to no available source region found")); + } + mockVersionCreationResponse.setKafkaSourceRegion("dc-0"); + verify(pushJob, times(1)).postPushValidation(); + + ControllerResponse badDataRecoveryResponse = new ControllerResponse(); + badDataRecoveryResponse.setError("error"); + doReturn(badDataRecoveryResponse).when(client) + .dataRecovery(anyString(), anyString(), anyString(), anyInt(), anyBoolean(), anyBoolean(), any()); + // verify failure of data recovery will fail the push job + try { + pushJob.run(); + } catch (VeniceException e) { + assertTrue(e.getMessage().contains("Can't push data for region")); + } - ControllerResponse goodDataRecoveryResponse = new ControllerResponse(); - doReturn(goodDataRecoveryResponse).when(client) - .dataRecovery(anyString(), anyString(), anyString(), anyInt(), anyBoolean(), anyBoolean(), any()); + ControllerResponse goodDataRecoveryResponse = new ControllerResponse(); + doReturn(goodDataRecoveryResponse).when(client) + .dataRecovery(anyString(), anyString(), anyString(), anyInt(), anyBoolean(), anyBoolean(), any()); - // the job should succeed - pushJob.run(); + // the job should succeed + pushJob.run(); + } } @Test @@ -629,17 +645,18 @@ public void testTargetedRegionPushPostValidationConsumptionForHybridStore() thro ControllerClient client = getClient(storeInfo -> { storeInfo.setHybridStoreConfig(new HybridStoreConfigImpl(0, 0, 0, null, null)); }, true); - VenicePushJob pushJob = getSpyVenicePushJob(props, client); - skipVPJValidation(pushJob); + try (VenicePushJob pushJob = getSpyVenicePushJob(props, client)) { + skipVPJValidation(pushJob); - JobStatusQueryResponse response = mockJobStatusQuery(); - doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), anyString()); - // skip the mocking for repush - doCallRealMethod().doNothing().when(pushJob).run(); - pushJob.run(); + JobStatusQueryResponse response = mockJobStatusQuery(); + doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), anyString()); + // skip the mocking for repush + doCallRealMethod().doNothing().when(pushJob).run(); + pushJob.run(); - // for hybrid store, the job is supposed to ran twice, one for targeted region push and another is for repush - verify(pushJob, times(2)).run(); + // for hybrid store, the job is supposed to ran twice, one for targeted region push and another is for repush + verify(pushJob, times(2)).run(); + } } @Test @@ -648,17 +665,18 @@ public void testTargetedRegionPushPostValidationFailedForValidation() throws Exc props.put(TARGETED_REGION_PUSH_ENABLED, true); props.put(POST_VALIDATION_CONSUMPTION_ENABLED, true); ControllerClient client = getClient(); - VenicePushJob pushJob = getSpyVenicePushJob(props, client); - skipVPJValidation(pushJob); + try (VenicePushJob pushJob = getSpyVenicePushJob(props, client)) { + skipVPJValidation(pushJob); - JobStatusQueryResponse response = mockJobStatusQuery(); - doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), anyString()); - mockVersionCreationResponse(client); + JobStatusQueryResponse response = mockJobStatusQuery(); + doReturn(response).when(client).queryOverallJobStatus(anyString(), any(), anyString()); + mockVersionCreationResponse(client); - doThrow(new VeniceValidationException("error")).when(pushJob).postPushValidation(); + doThrow(new VeniceValidationException("error")).when(pushJob).postPushValidation(); - assertThrows(VeniceValidationException.class, pushJob::run); - verify(pushJob, never()).postValidationConsumption(any()); + assertThrows(VeniceValidationException.class, pushJob::run); + verify(pushJob, never()).postValidationConsumption(any()); + } } private JobStatusQueryResponse mockJobStatusQuery() {