Skip to content

Commit

Permalink
Refactoring and addressed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vsinghal85 committed Nov 19, 2024
1 parent 30929f8 commit a782202
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
import org.apache.gobblin.runtime.api.LeaseUnavailableException;
import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
Expand Down Expand Up @@ -257,10 +257,10 @@ public CreateKVResponse<ComplexResourceKey<FlowId, FlowStatusId>, FlowConfig> cr
responseMap = this.flowCatalog.put(flowSpec, true);
} catch (QuotaExceededException e) {
throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
} catch(LeaseUnavailableException e){
throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, e.getMessage());
}
catch (Throwable e) {
} catch(TooSoonToRerunSameFlowException e) {
return new CreateKVResponse<>(new RestLiServiceException(HttpStatus.S_409_CONFLICT,
"FlowSpec with URI " + flowSpec.getUri() + " was launched within the lease consolidation period, no action will be taken"));
} catch (Throwable e) {
// TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings
log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName()), e);
throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@
/**
* An {@link RuntimeException} thrown when lease cannot be acquired on provided entity.
*/
public class LeaseUnavailableException extends RuntimeException {
public LeaseUnavailableException(String message) {
public class TooSoonToRerunSameFlowException extends RuntimeException {
private final FlowSpec flowSpec;

public TooSoonToRerunSameFlowException(String message, FlowSpec flowSpec) {
super(message);
this.flowSpec = flowSpec;
}

public FlowSpec getFlowSpec() {
return flowSpec;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,13 @@ public interface DagManagementStateStore {

/**
* Returns true if lease can be acquired on entity provided in leaseParams.
* @param leaseParams uniquely identifies the flow, the present action upon it, the time the action was triggered,
* and if the dag action event we're checking on is a reminder event
* Check if an action exists in dagAction store by flow group, flow name, flow execution id, and job name.
* @param flowGroup flow group for the dag action
* @param flowName flow name for the dag action
* @param flowExecutionId flow execution for the dag action
* @throws IOException
*/
boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException;
boolean existsCurrentlyLaunchingSimilarFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException;

/**
* Returns the requested {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link JobStatus}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams
}

@Override
public boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException {
return decoratedMultiActiveLeaseArbiter.isLeaseAcquirable(leaseParams);
public boolean existsSimilarLeaseWithinConsolidationPeriod(DagActionStore.LeaseParams leaseParams) throws IOException {
return decoratedMultiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(leaseParams);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boole

/**
* This method checks if lease can be acquired on provided flow in lease params
* returns true if entry for the same flow does not exists within epsilon time
* returns true if entry for the same flow does not exists within Lease Consolidation Period
* in leaseArbiterStore, else returns false
* @param leaseParams uniquely identifies the flow, the present action upon it, the time the action
* was triggered, and if the dag action event we're checking on is a reminder event
* @return true if lease can be acquired on the flow passed in the lease params, false otherwise
*/
boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams)
boolean existsSimilarLeaseWithinConsolidationPeriod(DagActionStore.LeaseParams leaseParams)
throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.flow.FlowUtils;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
Expand Down Expand Up @@ -171,8 +172,11 @@ public synchronized void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode)
}

@Override
public boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException {
return multiActiveLeaseArbiter.isLeaseAcquirable(leaseParams);
public boolean existsCurrentlyLaunchingSimilarFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException {
DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName,
flowExecutionId, DagActionStore.DagActionType.LAUNCH);
DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
return multiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(leaseParams);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,9 @@ else if (leaseValidityStatus == 2) {
no existing lease record exists in arbiter table or the record is older then epsilon time
*/
@Override
public boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws IOException {
public boolean existsSimilarLeaseWithinConsolidationPeriod(DagActionStore.LeaseParams leaseParams) throws IOException {
Optional<GetEventInfoResult> infoResult = getExistingEventInfo(leaseParams);
return infoResult.isPresent() ? !infoResult.get().isWithinEpsilon() : true;
return infoResult.isPresent() ? infoResult.get().isWithinEpsilon() : false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.gobblin.runtime.api.LeaseUnavailableException;
import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -128,7 +128,7 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
_log.info("Orchestrator - onAdd[Topology]Spec: " + addedSpec);
this.specCompiler.onAddSpec(addedSpec);
} else if (addedSpec instanceof FlowSpec) {
validateAdhocFlowLeasability((FlowSpec) addedSpec);
enforceSimilarAdhocFlowExistence((FlowSpec) addedSpec);
_log.info("Orchestrator - onAdd[Flow]Spec: " + addedSpec);
return this.specCompiler.onAddSpec(addedSpec);
} else {
Expand All @@ -138,26 +138,23 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
}

/*
validates if lease can be acquired on the provided flowSpec,
else throw LeaseUnavailableException
enforces that a similar flow is not launching,
else throw TooSoonToRerunSameFlowException
*/
private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) {
if (!flowSpec.isScheduled()) {
Config flowConfig = flowSpec.getConfig();
String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);

DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName,
FlowUtils.getOrCreateFlowExecutionId(flowSpec), DagActionStore.DagActionType.LAUNCH);
DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
_log.info("validation of lease acquirability of adhoc flow with lease params: " + leaseParams);
_log.info("checking existing adhoc flow existence for " + flowGroup + "." + flowName);
try {
if (!dagManagementStateStore.isLeaseAcquirable(leaseParams)) {
throw new LeaseUnavailableException("Lease already occupied by another execution of this flow");
if (dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup, flowName, FlowUtils.getOrCreateFlowExecutionId(flowSpec))) {
throw new TooSoonToRerunSameFlowException("Lease already occupied by another execution of this flow", flowSpec);
}
} catch (IOException exception) {
_log.error(String.format("Failed to query leaseArbiterTable for existing flow details: %s", flowSpec), exception);
throw new RuntimeException("Error querying leaseArbiterTable", exception);
_log.error("unable to check whether similar flow exists " + flowGroup + "." + flowName);
throw new RuntimeException("unable to check whether similar flow exists " + flowGroup + "." + flowName, exception);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.CompletedFuture;

import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -96,13 +95,19 @@ public static <T> boolean compareLists(List<T> list1, List<T> list2) {
}

@Test
public void testcanAcquireLeaseOnEntity() throws Exception{
Mockito.when(leaseArbiter.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
public void testExistsCurrentlyLaunchingSimilarFlowGivesTrue() throws Exception{
Mockito.when(leaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
String flowName = "testFlow";
String flowGroup = "testGroup";
DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(), "testJob", DagActionStore.DagActionType.LAUNCH);
DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction);
Assert.assertTrue(dagManagementStateStore.isLeaseAcquirable(leaseParams));
Assert.assertTrue(dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup, flowName, any(Long.class)));
}

@Test
public void testExistsCurrentlyLaunchingSimilarFlowGivesFalse() throws Exception{
Mockito.when(leaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false);
String flowName = "testFlow";
String flowGroup = "testGroup";
Assert.assertFalse(dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup, flowName, any(Long.class)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
private static final String flowName = "testFlowName";
private static final String jobName = "testJobName";
private static final long flowExecutionId = 12345677L;
private static final long flowExecutionId1 = 12345996L;
private static final long eventTimeMillis = 1710451837L;
// Dag actions with the same flow info but different flow action types are considered unique
private static final DagActionStore.DagAction launchDagAction =
Expand All @@ -81,6 +82,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH);
private static final DagActionStore.LeaseParams
launchLeaseParams4 = new DagActionStore.LeaseParams(launchDagAction4, false, eventTimeMillis);
private static final DagActionStore.DagAction launchDagAction3_similar =
new DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId1, jobName, DagActionStore.DagActionType.LAUNCH);
private static final DagActionStore.LeaseParams
launchLeaseParams3_similar = new DagActionStore.LeaseParams(launchDagAction3_similar, false, eventTimeMillis);
private static final DagActionStore.DagAction launchDagAction4_similar =
new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId1, jobName, DagActionStore.DagActionType.LAUNCH);
private static final DagActionStore.LeaseParams
launchLeaseParams4_similar = new DagActionStore.LeaseParams(launchDagAction4_similar, false, eventTimeMillis);
private static final Timestamp dummyTimestamp = new Timestamp(99999);
private ITestMetastoreDatabase testDb;
private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
Expand Down Expand Up @@ -217,26 +226,26 @@ public void testAcquireLeaseSingleParticipant() throws Exception {
to account for clock drift
*/
@Test
public void testWhenLeasableEntityUnavailable() throws Exception{
public void testExistsSimilarLeaseWithinConsolidationPeriod() throws Exception{
LeaseAttemptStatus firstLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams3, true);
Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus);
completeLeaseHelper(launchLeaseParams3);
Thread.sleep(LESS_THAN_EPSILON);
Assert.assertFalse(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams3));
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(launchLeaseParams3_similar));
}

/*
test to verify if leasable entity exists post epsilon time
*/
@Test
public void testWhenLeasableEntityAvailable() throws Exception{
public void testDoesNotExistsSimilarLeaseWithinConsolidationPeriod() throws Exception{
LeaseAttemptStatus firstLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams4, true);
Assert.assertTrue(firstLaunchStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus);
completeLeaseHelper(launchLeaseParams4);
Thread.sleep(MORE_THAN_EPSILON);
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams4));
Assert.assertFalse(mysqlMultiActiveLeaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(launchLeaseParams4_similar));
}

/*
Expand Down
Loading

0 comments on commit a782202

Please sign in to comment.