Skip to content

Commit

Permalink
Merge pull request #84 from uber/nond2
Browse files Browse the repository at this point in the history
Added replayer test for continue as new
  • Loading branch information
abhishekj720 authored May 22, 2024
2 parents 2bce1c5 + 3fd8caf commit 1a2c896
Show file tree
Hide file tree
Showing 4 changed files with 358 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.uber.cadence.samples.replaytests;

import com.uber.cadence.samples.hello.HelloActivity;
import com.uber.cadence.samples.hello.HelloPeriodic;
import com.uber.cadence.testing.WorkflowReplayer;
import org.junit.Test;

Expand All @@ -38,11 +37,4 @@ public void testReplay() throws Exception {
WorkflowReplayer.replayWorkflowExecutionFromResource(
"replaytests/HelloActivity.json", HelloActivity.GreetingWorkflowImpl.class);
}

// continue-as-new case for replayer tests
@Test
public void testReplay_continueAsNew() throws Exception {
WorkflowReplayer.replayWorkflowExecutionFromResource(
"replaytests/HelloPeriodic.json", HelloPeriodic.GreetingWorkflowImpl.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.samples.replaytests;

import com.uber.cadence.samples.hello.HelloPeriodic;
import com.uber.cadence.testing.WorkflowReplayer;
import org.junit.Test;

public class HelloPeriodicReplayTest {

/* Runs a history which ends with WorkflowExecutionContinuedAsNew. Replay fails because of the additional checks done for continue as new case by replayWorkflowHistory(). This should not have any error because it's a valid continue as new case. */
@Test
public void testReplay_continueAsNew() throws Exception {
WorkflowReplayer.replayWorkflowExecutionFromResource(
"replaytests/HelloPeriodic.json", HelloPeriodic.GreetingWorkflowImpl.class);
}

// Continue as new case: change in frequency compared to original workflow definition by
// increasing number of times greet is hit. It should
// fail. BUT it is currently passing.
@Test
public void testReplay_continueAsNew_moreFrequency() throws Exception {
WorkflowReplayer.replayWorkflowExecutionFromResource(
"replaytests/HelloPeriodic.json", HelloPeriodic_moreFrequency.GreetingWorkflowImpl.class);
}

// Continue as new case: If frequency is changed to lesser number.
// FAIL As expected: It should hit non-determinism case and it is hitting properly.
// @Test
// public void testReplay_continueAsNew_lessFrequency() throws Exception {
// WorkflowReplayer.replayWorkflowExecutionFromResource(
// "replaytests/HelloPeriodic.json", HelloPeriodic_lessFrequency.GreetingWorkflowImpl.class);
// }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.samples.replaytests;

import static com.uber.cadence.samples.common.SampleConstants.DOMAIN;

import com.google.common.base.Throwables;
import com.uber.cadence.WorkflowExecution;
import com.uber.cadence.WorkflowIdReusePolicy;
import com.uber.cadence.activity.Activity;
import com.uber.cadence.activity.ActivityOptions;
import com.uber.cadence.client.DuplicateWorkflowException;
import com.uber.cadence.client.WorkflowClient;
import com.uber.cadence.client.WorkflowClientOptions;
import com.uber.cadence.client.WorkflowException;
import com.uber.cadence.client.WorkflowStub;
import com.uber.cadence.internal.compatibility.Thrift2ProtoAdapter;
import com.uber.cadence.internal.compatibility.proto.serviceclient.IGrpcServiceStubs;
import com.uber.cadence.worker.Worker;
import com.uber.cadence.worker.WorkerFactory;
import com.uber.cadence.workflow.Workflow;
import com.uber.cadence.workflow.WorkflowMethod;
import java.time.Duration;
import java.util.Optional;

public class HelloPeriodic_lessFrequency {

static final String TASK_LIST = "HelloPeriodic";
static final String PERIODIC_WORKFLOW_ID = "HelloPeriodic";

public interface GreetingWorkflow {
@WorkflowMethod(
// At most one instance.
workflowId = PERIODIC_WORKFLOW_ID,
// To allow starting workflow with the same ID after the previous one has terminated.
workflowIdReusePolicy = WorkflowIdReusePolicy.AllowDuplicate,
// Adjust this value to the maximum time workflow is expected to run.
// It usually depends on the number of repetitions and interval between them.
executionStartToCloseTimeoutSeconds = 300,
taskList = TASK_LIST
)
void greetPeriodically(String name, Duration delay);
}

public interface GreetingActivities {
void greet(String greeting);
}

public static class GreetingWorkflowImpl implements GreetingWorkflow {

// If we change the value to 10 (compared to 1000 in original case), then non-determinism case
// is not hitting.
private final int CONTINUE_AS_NEW_FREQUENCEY = 1;

private final GreetingActivities activities =
Workflow.newActivityStub(
GreetingActivities.class,
new ActivityOptions.Builder()
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
.build());

/**
* Stub used to terminate this workflow run and create the next one with the same ID atomically.
*/
private final GreetingWorkflow continueAsNew =
Workflow.newContinueAsNewStub(GreetingWorkflow.class);

@Override
public void greetPeriodically(String name, Duration delay) {
// Loop the predefined number of times then continue this workflow as new.
// This is needed to periodically truncate the history size.
for (int i = 0; i < CONTINUE_AS_NEW_FREQUENCEY; i++) {
activities.greet("Hello " + name + "!");
Workflow.sleep(delay);
}
// Current workflow run stops executing after this call.
continueAsNew.greetPeriodically(name, delay);
// unreachable line
}
}

static class GreetingActivitiesImpl implements GreetingActivities {
@Override
public void greet(String greeting) {
System.out.println("From " + Activity.getWorkflowExecution() + ": " + greeting);
}
}

public static void main(String[] args) throws InterruptedException {
// Get a new client
// NOTE: to set a different options, you can do like this:
// ClientOptions.newBuilder().setRpcTimeout(5 * 1000).build();
WorkflowClient workflowClient =
WorkflowClient.newInstance(
new Thrift2ProtoAdapter(IGrpcServiceStubs.newInstance()),
WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build());
// Get worker to poll the task list.
WorkerFactory factory = WorkerFactory.newInstance(workflowClient);
Worker worker = factory.newWorker(TASK_LIST);
// Workflows are stateful. So you need a type to create instances.
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);
// Activities are stateless and thread safe. So a shared instance is used.
worker.registerActivitiesImplementations(new GreetingActivitiesImpl());
// Start listening to the workflow and activity task lists.
factory.start();

// Start a workflow execution. Usually this is done from another program.
// To ensure that this daemon type workflow is always running try to start it periodically
// ignoring the duplicated exception.
// It is only to protect from application level failures.
// Failures of a workflow worker don't lead to workflow failures.
WorkflowExecution execution = null;
while (true) {
// Print reason of failure of the previous run, before restarting.
if (execution != null) {
WorkflowStub workflow = workflowClient.newUntypedWorkflowStub(execution, Optional.empty());
try {
workflow.getResult(Void.class); //
} catch (WorkflowException e) {
System.out.println("Previous instance failed:\n" + Throwables.getStackTraceAsString(e));
}
}
// New stub instance should be created for each new workflow start.
GreetingWorkflow workflow = workflowClient.newWorkflowStub(GreetingWorkflow.class);
try {
execution =
WorkflowClient.start(workflow::greetPeriodically, "World", Duration.ofSeconds(3));
System.out.println("Started " + execution);
} catch (DuplicateWorkflowException e) {
System.out.println("Still running as " + e.getExecution());
} catch (Throwable e) {
e.printStackTrace();
System.exit(1);
}
// This value is so low just for the sample purpose. In production workflow
// it is usually much higher.
Thread.sleep(10000);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.samples.replaytests;

import static com.uber.cadence.samples.common.SampleConstants.DOMAIN;

import com.google.common.base.Throwables;
import com.uber.cadence.WorkflowExecution;
import com.uber.cadence.WorkflowIdReusePolicy;
import com.uber.cadence.activity.Activity;
import com.uber.cadence.activity.ActivityOptions;
import com.uber.cadence.client.DuplicateWorkflowException;
import com.uber.cadence.client.WorkflowClient;
import com.uber.cadence.client.WorkflowClientOptions;
import com.uber.cadence.client.WorkflowException;
import com.uber.cadence.client.WorkflowStub;
import com.uber.cadence.internal.compatibility.Thrift2ProtoAdapter;
import com.uber.cadence.internal.compatibility.proto.serviceclient.IGrpcServiceStubs;
import com.uber.cadence.worker.Worker;
import com.uber.cadence.worker.WorkerFactory;
import com.uber.cadence.workflow.Workflow;
import com.uber.cadence.workflow.WorkflowMethod;
import java.time.Duration;
import java.util.Optional;

public class HelloPeriodic_moreFrequency {

static final String TASK_LIST = "HelloPeriodic";
static final String PERIODIC_WORKFLOW_ID = "HelloPeriodic";

public interface GreetingWorkflow {
@WorkflowMethod(
// At most one instance.
workflowId = PERIODIC_WORKFLOW_ID,
// To allow starting workflow with the same ID after the previous one has terminated.
workflowIdReusePolicy = WorkflowIdReusePolicy.AllowDuplicate,
// Adjust this value to the maximum time workflow is expected to run.
// It usually depends on the number of repetitions and interval between them.
executionStartToCloseTimeoutSeconds = 300,
taskList = TASK_LIST
)
void greetPeriodically(String name, Duration delay);
}

public interface GreetingActivities {
void greet(String greeting);
}

public static class GreetingWorkflowImpl implements GreetingWorkflow {

// If we change the value to 1, then non-determinism case will hit.
private final int CONTINUE_AS_NEW_FREQUENCEY = 1000;

private final GreetingActivities activities =
Workflow.newActivityStub(
GreetingActivities.class,
new ActivityOptions.Builder()
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
.build());

/**
* Stub used to terminate this workflow run and create the next one with the same ID atomically.
*/
private final GreetingWorkflow continueAsNew =
Workflow.newContinueAsNewStub(GreetingWorkflow.class);

@Override
public void greetPeriodically(String name, Duration delay) {
// Loop the predefined number of times then continue this workflow as new.
// This is needed to periodically truncate the history size.
for (int i = 0; i < CONTINUE_AS_NEW_FREQUENCEY; i++) {
activities.greet("Hello " + name + "!");
Workflow.sleep(delay);
}
// Current workflow run stops executing after this call.
continueAsNew.greetPeriodically(name, delay);
// unreachable line
}
}

static class GreetingActivitiesImpl implements GreetingActivities {
@Override
public void greet(String greeting) {
System.out.println("From " + Activity.getWorkflowExecution() + ": " + greeting);
}
}

public static void main(String[] args) throws InterruptedException {
// Get a new client
// NOTE: to set a different options, you can do like this:
// ClientOptions.newBuilder().setRpcTimeout(5 * 1000).build();
WorkflowClient workflowClient =
WorkflowClient.newInstance(
new Thrift2ProtoAdapter(IGrpcServiceStubs.newInstance()),
WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build());
// Get worker to poll the task list.
WorkerFactory factory = WorkerFactory.newInstance(workflowClient);
Worker worker = factory.newWorker(TASK_LIST);
// Workflows are stateful. So you need a type to create instances.
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);
// Activities are stateless and thread safe. So a shared instance is used.
worker.registerActivitiesImplementations(new GreetingActivitiesImpl());
// Start listening to the workflow and activity task lists.
factory.start();

// Start a workflow execution. Usually this is done from another program.
// To ensure that this daemon type workflow is always running try to start it periodically
// ignoring the duplicated exception.
// It is only to protect from application level failures.
// Failures of a workflow worker don't lead to workflow failures.
WorkflowExecution execution = null;
while (true) {
// Print reason of failure of the previous run, before restarting.
if (execution != null) {
WorkflowStub workflow = workflowClient.newUntypedWorkflowStub(execution, Optional.empty());
try {
workflow.getResult(Void.class); //
} catch (WorkflowException e) {
System.out.println("Previous instance failed:\n" + Throwables.getStackTraceAsString(e));
}
}
// New stub instance should be created for each new workflow start.
GreetingWorkflow workflow = workflowClient.newWorkflowStub(GreetingWorkflow.class);
try {
execution =
WorkflowClient.start(workflow::greetPeriodically, "World", Duration.ofSeconds(3));
System.out.println("Started " + execution);
} catch (DuplicateWorkflowException e) {
System.out.println("Still running as " + e.getExecution());
} catch (Throwable e) {
e.printStackTrace();
System.exit(1);
}
// This value is so low just for the sample purpose. In production workflow
// it is usually much higher.
Thread.sleep(10000);
}
}
}

0 comments on commit 1a2c896

Please sign in to comment.