diff --git a/CHANGELOG.md b/CHANGELOG.md index 897ba624f..585f2bff0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,32 @@ # Changelog +## 2.7.6 +- Fix getVersion override when added new version +- Add async signal to untypedstub +- Fix RetryOptions.addDoNotRetry +- Add missing metrics from go client +- Fix a bug in setting retry expiration while getting history +- Fix start async return + +## 2.7.5 +- Added supports contextPropagators for localActivity + +## v2.7.4 +- Fix prometheus reporting issue +- Fix Promise.allOf should not block on empty input +- Misc: Added project directory to sourceItems path +- Add async start to untype stub + +## v2.7.3 +- Add wf type tag in decider metrics scope +- Fix WorkflowStub.fromTyped method +- Added missing fields to local activity task +- Honor user timeout for get workflow result + +## v2.7.2 +- Fix leak in Async GetWorkflowExecutionHistory +- Fix context timeout in execute workflow + ## v2.7.1 - Fix a bug in build.gradle that prevented javadoc and sources from being published diff --git a/README.md b/README.md index 25ac7c381..2b5de6621 100644 --- a/README.md +++ b/README.md @@ -31,12 +31,12 @@ Add *cadence-client* as a dependency to your *pom.xml*: com.uber.cadence cadence-client - 2.7.1 + 2.7.6 or to *build.gradle*: - compile group: 'com.uber.cadence', name: 'cadence-client', version: '2.7.1' + compile group: 'com.uber.cadence', name: 'cadence-client', version: '2.7.6' ## Documentation diff --git a/build.gradle b/build.gradle index 0415313ae..e7d03f6f2 100644 --- a/build.gradle +++ b/build.gradle @@ -37,7 +37,7 @@ googleJavaFormat { } group = 'com.uber.cadence' -version = '2.7.1' +version = '2.7.6' description = '''Uber Cadence Java Client''' diff --git a/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java b/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java index d24b99a1e..144e3fce4 100644 --- a/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java +++ b/src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java @@ -296,12 +296,6 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m }; decisions.addAllMissingVersionMarker(true, Optional.of(changeIdEquals)); - Integer version = versionMap.get(changeId); - if (version != null) { - validateVersion(changeId, version, minSupported, maxSupported); - return version; - } - Optional result = versionHandler.handle( changeId, @@ -313,6 +307,12 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m return Optional.of(converter.toData(maxSupported)); }); + Integer version = versionMap.get(changeId); + if (version != null) { + validateVersion(changeId, version, minSupported, maxSupported); + return version; + } + if (!result.isPresent()) { return WorkflowInternal.DEFAULT_VERSION; } diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index 2b76badc2..5c1c94a5e 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -25,6 +25,8 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.google.common.base.Strings; import com.google.common.util.concurrent.UncheckedExecutionException; @@ -411,6 +413,27 @@ public interface TestWorkflow2 { List getTrace(); } + public interface TestWorkflow3 { + + @WorkflowMethod + String execute(String taskList); + + @SignalMethod(name = "testSignal") + void signal1(String arg); + + @QueryMethod(name = "getState") + String getState(); + } + + public interface TestWorkflowQuery { + + @WorkflowMethod() + String execute(String taskList); + + @QueryMethod() + String query(); + } + public static class TestSyncWorkflowImpl implements TestWorkflow1 { @Override @@ -4334,13 +4357,13 @@ public void testGetVersion2() { static CompletableFuture executionStarted = new CompletableFuture<>(); - public static class TestGetVersionWithoutDecisionEventWorkflowImpl - implements TestWorkflowSignaled { + public static class TestGetVersionWithoutDecisionEventWorkflowImpl implements TestWorkflow3 { CompletablePromise signalReceived = Workflow.newPromise(); + String result = ""; @Override - public String execute() { + public String execute(String taskList) { try { if (!getVersionExecuted.contains("getVersionWithoutDecisionEvent")) { // Execute getVersion in non-replay mode. @@ -4353,10 +4376,11 @@ public String execute() { int version = Workflow.getVersion("test_change", Workflow.DEFAULT_VERSION, 1); if (version == Workflow.DEFAULT_VERSION) { signalReceived.get(); - return "result 1"; + result = "result 1"; } else { - return "result 2"; + result = "result 2"; } + return result; } } catch (Exception e) { throw new RuntimeException("failed to get from signal"); @@ -4369,6 +4393,11 @@ public String execute() { public void signal1(String arg) { signalReceived.complete(true); } + + @Override + public String getState() { + return result; + } } @Test @@ -4377,25 +4406,26 @@ public void testGetVersionWithoutDecisionEvent() throws Exception { executionStarted = new CompletableFuture<>(); getVersionExecuted.remove("getVersionWithoutDecisionEvent"); startWorkerFor(TestGetVersionWithoutDecisionEventWorkflowImpl.class); - TestWorkflowSignaled workflowStub = + TestWorkflow3 workflowStub = workflowClient.newWorkflowStub( - TestWorkflowSignaled.class, newWorkflowOptionsBuilder(taskList).build()); - WorkflowClient.start(workflowStub::execute); + TestWorkflow3.class, newWorkflowOptionsBuilder(taskList).build()); + WorkflowClient.start(workflowStub::execute, taskList); executionStarted.get(); workflowStub.signal1("test signal"); - String result = workflowStub.execute(); + String result = workflowStub.execute(taskList); assertEquals("result 1", result); + assertEquals("result 1", workflowStub.getState()); } // The following test covers the scenario where getVersion call is removed before a // non-version-marker decision. - public static class TestGetVersionRemovedInReplayWorkflowImpl implements TestWorkflow1 { + public static class TestGetVersionRemovedInReplayWorkflowImpl implements TestWorkflowQuery { + String result = ""; @Override public String execute(String taskList) { TestActivities testActivities = Workflow.newActivityStub(TestActivities.class, newActivityOptions1(taskList)); - String result; // Test removing a version check in replay code. if (!getVersionExecuted.contains(taskList)) { int version = Workflow.getVersion("test_change", Workflow.DEFAULT_VERSION, 1); @@ -4412,25 +4442,33 @@ public String execute(String taskList) { result += testActivities.activity(); return result; } + + @Override + public String query() { + return result; + } } @Test public void testGetVersionRemovedInReplay() { startWorkerFor(TestGetVersionRemovedInReplayWorkflowImpl.class); - TestWorkflow1 workflowStub = + TestWorkflowQuery workflowStub = workflowClient.newWorkflowStub( - TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build()); + TestWorkflowQuery.class, newWorkflowOptionsBuilder(taskList).build()); String result = workflowStub.execute(taskList); assertEquals("activity22activity", result); tracer.setExpected( + "registerQuery TestWorkflowQuery::query", "getVersion", "executeActivity TestActivities::activity2", "executeActivity TestActivities::activity"); + assertEquals("activity22activity", workflowStub.query()); } // The following test covers the scenario where getVersion call is removed before another // version-marker decision. - public static class TestGetVersionRemovedInReplay2WorkflowImpl implements TestWorkflow1 { + public static class TestGetVersionRemovedInReplay2WorkflowImpl implements TestWorkflowQuery { + String result = ""; @Override public String execute(String taskList) { @@ -4445,19 +4483,30 @@ public String execute(String taskList) { Workflow.getVersion("test_change_2", Workflow.DEFAULT_VERSION, 2); } - return testActivities.activity(); + result = testActivities.activity(); + return result; + } + + @Override + public String query() { + return result; } } @Test public void testGetVersionRemovedInReplay2() { startWorkerFor(TestGetVersionRemovedInReplay2WorkflowImpl.class); - TestWorkflow1 workflowStub = + TestWorkflowQuery workflowStub = workflowClient.newWorkflowStub( - TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build()); + TestWorkflowQuery.class, newWorkflowOptionsBuilder(taskList).build()); String result = workflowStub.execute(taskList); assertEquals("activity", result); - tracer.setExpected("getVersion", "getVersion", "executeActivity TestActivities::activity"); + tracer.setExpected( + "registerQuery TestWorkflowQuery::query", + "getVersion", + "getVersion", + "executeActivity TestActivities::activity"); + assertEquals("activity", workflowStub.query()); } public static class TestVersionNotSupportedWorkflowImpl implements TestWorkflow1 { @@ -5162,15 +5211,6 @@ public void testParallelLocalActivityExecutionWorkflow() { result); } - public interface TestWorkflowQuery { - - @WorkflowMethod() - String execute(String taskList); - - @QueryMethod() - String query(); - } - public static final class TestLocalActivityAndQueryWorkflow implements TestWorkflowQuery { String message = "initial value"; @@ -5846,4 +5886,73 @@ public void upsertSearchAttributes(Map searchAttributes) { next.upsertSearchAttributes(searchAttributes); } } + + public static class TestGetVersionWorkflowRetryImpl implements TestWorkflow3 { + private String result = ""; + + @Override + public String execute(String taskList) { + int version = Workflow.getVersion("test_change", Workflow.DEFAULT_VERSION, 1); + int act = 0; + if (version == 1) { + ActivityOptions options = + new ActivityOptions.Builder() + .setTaskList(taskList) + .setHeartbeatTimeout(Duration.ofSeconds(5)) + .setScheduleToCloseTimeout(Duration.ofSeconds(5)) + .setScheduleToStartTimeout(Duration.ofSeconds(5)) + .setStartToCloseTimeout(Duration.ofSeconds(10)) + .setRetryOptions( + new RetryOptions.Builder() + .setMaximumAttempts(3) + .setInitialInterval(Duration.ofSeconds(1)) + .build()) + .build(); + + TestActivities testActivities = Workflow.newActivityStub(TestActivities.class, options); + act = testActivities.activity1(1); + } + + result += "activity" + act; + return result; + } + + @Override + public void signal1(String arg) { + Workflow.sleep(1000); + } + + @Override + public String getState() { + return result; + } + } + + @Test + public void testGetVersionRetry() throws ExecutionException, InterruptedException { + TestActivities activity = mock(TestActivities.class); + when(activity.activity1(1)).thenReturn(1); + worker.registerActivitiesImplementations(activity); + + startWorkerFor(TestGetVersionWorkflowRetryImpl.class); + TestWorkflow3 workflowStub = + workflowClient.newWorkflowStub( + TestWorkflow3.class, newWorkflowOptionsBuilder(taskList).build()); + CompletableFuture result = WorkflowClient.execute(workflowStub::execute, taskList); + workflowStub.signal1("test"); + assertEquals("activity1", result.get()); + + // test replay + assertEquals("activity1", workflowStub.getState()); + } + + @Test + public void testGetVersionWithRetryReplay() throws Exception { + // Avoid executing 4 times + Assume.assumeFalse("skipping for docker tests", useExternalService); + Assume.assumeFalse("skipping for sticky off", disableStickyExecution); + + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testGetVersionWithRetryHistory.json", TestGetVersionWorkflowRetryImpl.class); + } } diff --git a/src/test/resources/testGetVersionWithRetryHistory.json b/src/test/resources/testGetVersionWithRetryHistory.json new file mode 100644 index 000000000..7ac7af29b --- /dev/null +++ b/src/test/resources/testGetVersionWithRetryHistory.json @@ -0,0 +1,247 @@ +[ + { + "eventId":1, + "timestamp":1600908280230646000, + "eventType":"WorkflowExecutionStarted", + "version":-24, + "taskId":12582912, + "workflowExecutionStartedEventAttributes":{ + "workflowType":{ + "name":"TestWorkflow3::execute" + }, + "taskList":{ + "name":"WorkflowTest-testGetVersionNew[Docker Sticky ON]-017b6e2b-89c8-41b6-8c08-7bd8e9a4b31f" + }, + "input":"IldvcmtmbG93VGVzdC10ZXN0R2V0VmVyc2lvbk5ld1tEb2NrZXIgU3RpY2t5IE9OXS0wMTdiNmUyYi04OWM4LTQxYjYtOGMwOC03YmQ4ZTlhNGIzMWYi", + "executionStartToCloseTimeoutSeconds":30, + "taskStartToCloseTimeoutSeconds":5, + "originalExecutionRunId":"fab6e1da-7f25-472d-95ca-f9c5f59e6181", + "identity":"", + "firstExecutionRunId":"fab6e1da-7f25-472d-95ca-f9c5f59e6181", + "attempt":0, + "firstDecisionTaskBackoffSeconds":0 + } + }, + { + "eventId":2, + "timestamp":1600908280230662000, + "eventType":"DecisionTaskScheduled", + "version":-24, + "taskId":12582913, + "decisionTaskScheduledEventAttributes":{ + "taskList":{ + "name":"WorkflowTest-testGetVersionNew[Docker Sticky ON]-017b6e2b-89c8-41b6-8c08-7bd8e9a4b31f" + }, + "startToCloseTimeoutSeconds":5, + "attempt":0 + } + }, + { + "eventId":3, + "timestamp":1600908280245759000, + "eventType":"DecisionTaskStarted", + "version":-24, + "taskId":12582918, + "decisionTaskStartedEventAttributes":{ + "scheduledEventId":2, + "identity":"13730@boweixu-C02V61JZHTDG", + "requestId":"23fed7ea-8bda-4b40-aacf-f674cf466ea3" + } + }, + { + "eventId":4, + "timestamp":1600908280579150000, + "eventType":"DecisionTaskCompleted", + "version":-24, + "taskId":12582921, + "decisionTaskCompletedEventAttributes":{ + "scheduledEventId":2, + "startedEventId":3, + "identity":"13730@boweixu-C02V61JZHTDG" + } + }, + { + "eventId":5, + "timestamp":1600908280579167000, + "eventType":"MarkerRecorded", + "version":-24, + "taskId":12582922, + "markerRecordedEventAttributes":{ + "markerName":"Version", + "details":"MQ==", + "decisionTaskCompletedEventId":4, + "header":{ + "fields":{ + "MutableMarkerHeader":"eyJpZCI6InRlc3RfY2hhbmdlIiwiZXZlbnRJZCI6NSwiYWNjZXNzQ291bnQiOjB9" + } + } + } + }, + { + "eventId":6, + "timestamp":1600908280579177000, + "eventType":"ActivityTaskScheduled", + "version":-24, + "taskId":12582923, + "activityTaskScheduledEventAttributes":{ + "activityId":"0", + "activityType":{ + "name":"customActivity1" + }, + "taskList":{ + "name":"WorkflowTest-testGetVersionNew[Docker Sticky ON]-017b6e2b-89c8-41b6-8c08-7bd8e9a4b31f" + }, + "input":"MQ==", + "scheduleToCloseTimeoutSeconds":30, + "scheduleToStartTimeoutSeconds":30, + "startToCloseTimeoutSeconds":10, + "heartbeatTimeoutSeconds":5, + "decisionTaskCompletedEventId":4, + "retryPolicy":{ + "initialIntervalInSeconds":1, + "backoffCoefficient":2, + "maximumIntervalInSeconds":0, + "maximumAttempts":3, + "expirationIntervalInSeconds":0 + } + } + }, + { + "eventId":7, + "timestamp":1600908280391225000, + "eventType":"WorkflowExecutionSignaled", + "version":-24, + "taskId":12582924, + "workflowExecutionSignaledEventAttributes":{ + "signalName":"testSignal", + "input":"InRlc3Qi", + "identity":"" + } + }, + { + "eventId":8, + "timestamp":1600908280579193000, + "eventType":"DecisionTaskScheduled", + "version":-24, + "taskId":12582929, + "decisionTaskScheduledEventAttributes":{ + "taskList":{ + "name":"boweixu-C02V61JZHTDG:aef436a5-3ef4-4ccb-b7b9-5fbedca492be" + }, + "startToCloseTimeoutSeconds":5, + "attempt":0 + } + }, + { + "eventId":9, + "timestamp":1600908280588213000, + "eventType":"DecisionTaskStarted", + "version":-24, + "taskId":12582936, + "decisionTaskStartedEventAttributes":{ + "scheduledEventId":8, + "identity":"aef436a5-3ef4-4ccb-b7b9-5fbedca492be", + "requestId":"a0ee4248-4230-4561-ac2f-912f731b39a1" + } + }, + { + "eventId":10, + "timestamp":1600908280624228000, + "eventType":"DecisionTaskCompleted", + "version":-24, + "taskId":12582939, + "decisionTaskCompletedEventAttributes":{ + "scheduledEventId":8, + "startedEventId":9, + "identity":"13730@boweixu-C02V61JZHTDG" + } + }, + { + "eventId":11, + "timestamp":1600908280624277000, + "eventType":"TimerStarted", + "version":-24, + "taskId":12582940, + "timerStartedEventAttributes":{ + "timerId":"1", + "startToFireTimeoutSeconds":1, + "decisionTaskCompletedEventId":10 + } + }, + { + "eventId":12, + "timestamp":1600908280584857000, + "eventType":"ActivityTaskStarted", + "version":-24, + "taskId":12582941, + "activityTaskStartedEventAttributes":{ + "scheduledEventId":6, + "identity":"13730@boweixu-C02V61JZHTDG", + "requestId":"8e553cb5-a137-4a23-a26a-6c957582a227", + "attempt":0, + "lastFailureReason":"" + } + }, + { + "eventId":13, + "timestamp":1600908280614589000, + "eventType":"ActivityTaskCompleted", + "version":-24, + "taskId":12582942, + "activityTaskCompletedEventAttributes":{ + "result":"MQ==", + "scheduledEventId":6, + "startedEventId":12, + "identity":"13730@boweixu-C02V61JZHTDG" + } + }, + { + "eventId":14, + "timestamp":1600908280624342000, + "eventType":"DecisionTaskScheduled", + "version":-24, + "taskId":12582947, + "decisionTaskScheduledEventAttributes":{ + "taskList":{ + "name":"boweixu-C02V61JZHTDG:aef436a5-3ef4-4ccb-b7b9-5fbedca492be" + }, + "startToCloseTimeoutSeconds":5, + "attempt":0 + } + }, + { + "eventId":15, + "timestamp":1600908280633848000, + "eventType":"DecisionTaskStarted", + "version":-24, + "taskId":12582952, + "decisionTaskStartedEventAttributes":{ + "scheduledEventId":14, + "identity":"aef436a5-3ef4-4ccb-b7b9-5fbedca492be", + "requestId":"aa7d5ff5-4566-485b-a325-4b9b788f13e5" + } + }, + { + "eventId":16, + "timestamp":1600908280661966000, + "eventType":"DecisionTaskCompleted", + "version":-24, + "taskId":12582955, + "decisionTaskCompletedEventAttributes":{ + "scheduledEventId":14, + "startedEventId":15, + "identity":"13730@boweixu-C02V61JZHTDG" + } + }, + { + "eventId":17, + "timestamp":1600908280661988000, + "eventType":"WorkflowExecutionCompleted", + "version":-24, + "taskId":12582956, + "workflowExecutionCompletedEventAttributes":{ + "result":"ImFjdGl2aXR5MSI=", + "decisionTaskCompletedEventId":16 + } + } +] \ No newline at end of file