Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added replayer test for continue as new #84

Merged
merged 4 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.uber.cadence.samples.replaytests;

import com.uber.cadence.samples.hello.HelloActivity;
import com.uber.cadence.samples.hello.HelloPeriodic;
import com.uber.cadence.testing.WorkflowReplayer;
import org.junit.Test;

Expand All @@ -38,11 +37,4 @@ public void testReplay() throws Exception {
WorkflowReplayer.replayWorkflowExecutionFromResource(
"replaytests/HelloActivity.json", HelloActivity.GreetingWorkflowImpl.class);
}

// continue-as-new case for replayer tests
@Test
public void testReplay_continueAsNew() throws Exception {
WorkflowReplayer.replayWorkflowExecutionFromResource(
"replaytests/HelloPeriodic.json", HelloPeriodic.GreetingWorkflowImpl.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.samples.replaytests;

import com.uber.cadence.samples.hello.HelloPeriodic;
import com.uber.cadence.testing.WorkflowReplayer;
import org.junit.Test;

public class HelloPeriodicReplayTest {

/* Runs a history which ends with WorkflowExecutionContinuedAsNew. Replay fails because of the additional checks done for continue as new case by replayWorkflowHistory(). This should not have any error because it's a valid continue as new case. */
@Test
public void testReplay_continueAsNew() throws Exception {
WorkflowReplayer.replayWorkflowExecutionFromResource(
"replaytests/HelloPeriodic.json", HelloPeriodic.GreetingWorkflowImpl.class);
}

// Continue as new case: change in frequency compared to original workflow definition by
// increasing number of times greet is hit. It should
// fail. BUT it is currently passing.
@Test
public void testReplay_continueAsNew_moreFrequency() throws Exception {
WorkflowReplayer.replayWorkflowExecutionFromResource(
"replaytests/HelloPeriodic.json", HelloPeriodic_moreFrequency.GreetingWorkflowImpl.class);
}

// Continue as new case: If frequency is changed to lesser number.
// FAIL As expected: It should hit non-determinism case and it is hitting properly.
// @Test
// public void testReplay_continueAsNew_lessFrequency() throws Exception {
// WorkflowReplayer.replayWorkflowExecutionFromResource(
// "replaytests/HelloPeriodic.json", HelloPeriodic_lessFrequency.GreetingWorkflowImpl.class);
// }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.samples.replaytests;

import static com.uber.cadence.samples.common.SampleConstants.DOMAIN;

import com.google.common.base.Throwables;
import com.uber.cadence.WorkflowExecution;
import com.uber.cadence.WorkflowIdReusePolicy;
import com.uber.cadence.activity.Activity;
import com.uber.cadence.activity.ActivityOptions;
import com.uber.cadence.client.DuplicateWorkflowException;
import com.uber.cadence.client.WorkflowClient;
import com.uber.cadence.client.WorkflowClientOptions;
import com.uber.cadence.client.WorkflowException;
import com.uber.cadence.client.WorkflowStub;
import com.uber.cadence.internal.compatibility.Thrift2ProtoAdapter;
import com.uber.cadence.internal.compatibility.proto.serviceclient.IGrpcServiceStubs;
import com.uber.cadence.worker.Worker;
import com.uber.cadence.worker.WorkerFactory;
import com.uber.cadence.workflow.Workflow;
import com.uber.cadence.workflow.WorkflowMethod;
import java.time.Duration;
import java.util.Optional;

public class HelloPeriodic_lessFrequency {

static final String TASK_LIST = "HelloPeriodic";
static final String PERIODIC_WORKFLOW_ID = "HelloPeriodic";

public interface GreetingWorkflow {
@WorkflowMethod(
// At most one instance.
workflowId = PERIODIC_WORKFLOW_ID,
// To allow starting workflow with the same ID after the previous one has terminated.
workflowIdReusePolicy = WorkflowIdReusePolicy.AllowDuplicate,
// Adjust this value to the maximum time workflow is expected to run.
// It usually depends on the number of repetitions and interval between them.
executionStartToCloseTimeoutSeconds = 300,
taskList = TASK_LIST
)
void greetPeriodically(String name, Duration delay);
}

public interface GreetingActivities {
void greet(String greeting);
}

public static class GreetingWorkflowImpl implements GreetingWorkflow {

// If we change the value to 10 (compared to 1000 in original case), then non-determinism case
// is not hitting.
private final int CONTINUE_AS_NEW_FREQUENCEY = 1;

private final GreetingActivities activities =
Workflow.newActivityStub(
GreetingActivities.class,
new ActivityOptions.Builder()
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
.build());

/**
* Stub used to terminate this workflow run and create the next one with the same ID atomically.
*/
private final GreetingWorkflow continueAsNew =
Workflow.newContinueAsNewStub(GreetingWorkflow.class);

@Override
public void greetPeriodically(String name, Duration delay) {
// Loop the predefined number of times then continue this workflow as new.
// This is needed to periodically truncate the history size.
for (int i = 0; i < CONTINUE_AS_NEW_FREQUENCEY; i++) {
activities.greet("Hello " + name + "!");
Workflow.sleep(delay);
}
// Current workflow run stops executing after this call.
continueAsNew.greetPeriodically(name, delay);
// unreachable line
}
}

static class GreetingActivitiesImpl implements GreetingActivities {
@Override
public void greet(String greeting) {
System.out.println("From " + Activity.getWorkflowExecution() + ": " + greeting);
}
}

public static void main(String[] args) throws InterruptedException {
// Get a new client
// NOTE: to set a different options, you can do like this:
// ClientOptions.newBuilder().setRpcTimeout(5 * 1000).build();
WorkflowClient workflowClient =
WorkflowClient.newInstance(
new Thrift2ProtoAdapter(IGrpcServiceStubs.newInstance()),
WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build());
// Get worker to poll the task list.
WorkerFactory factory = WorkerFactory.newInstance(workflowClient);
Worker worker = factory.newWorker(TASK_LIST);
// Workflows are stateful. So you need a type to create instances.
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);
// Activities are stateless and thread safe. So a shared instance is used.
worker.registerActivitiesImplementations(new GreetingActivitiesImpl());
// Start listening to the workflow and activity task lists.
factory.start();

// Start a workflow execution. Usually this is done from another program.
// To ensure that this daemon type workflow is always running try to start it periodically
// ignoring the duplicated exception.
// It is only to protect from application level failures.
// Failures of a workflow worker don't lead to workflow failures.
WorkflowExecution execution = null;
while (true) {
// Print reason of failure of the previous run, before restarting.
if (execution != null) {
WorkflowStub workflow = workflowClient.newUntypedWorkflowStub(execution, Optional.empty());
try {
workflow.getResult(Void.class); //
} catch (WorkflowException e) {
System.out.println("Previous instance failed:\n" + Throwables.getStackTraceAsString(e));
}
}
// New stub instance should be created for each new workflow start.
GreetingWorkflow workflow = workflowClient.newWorkflowStub(GreetingWorkflow.class);
try {
execution =
WorkflowClient.start(workflow::greetPeriodically, "World", Duration.ofSeconds(3));
System.out.println("Started " + execution);
} catch (DuplicateWorkflowException e) {
System.out.println("Still running as " + e.getExecution());
} catch (Throwable e) {
e.printStackTrace();
System.exit(1);
}
// This value is so low just for the sample purpose. In production workflow
// it is usually much higher.
Thread.sleep(10000);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.samples.replaytests;

import static com.uber.cadence.samples.common.SampleConstants.DOMAIN;

import com.google.common.base.Throwables;
import com.uber.cadence.WorkflowExecution;
import com.uber.cadence.WorkflowIdReusePolicy;
import com.uber.cadence.activity.Activity;
import com.uber.cadence.activity.ActivityOptions;
import com.uber.cadence.client.DuplicateWorkflowException;
import com.uber.cadence.client.WorkflowClient;
import com.uber.cadence.client.WorkflowClientOptions;
import com.uber.cadence.client.WorkflowException;
import com.uber.cadence.client.WorkflowStub;
import com.uber.cadence.internal.compatibility.Thrift2ProtoAdapter;
import com.uber.cadence.internal.compatibility.proto.serviceclient.IGrpcServiceStubs;
import com.uber.cadence.worker.Worker;
import com.uber.cadence.worker.WorkerFactory;
import com.uber.cadence.workflow.Workflow;
import com.uber.cadence.workflow.WorkflowMethod;
import java.time.Duration;
import java.util.Optional;

public class HelloPeriodic_moreFrequency {

static final String TASK_LIST = "HelloPeriodic";
static final String PERIODIC_WORKFLOW_ID = "HelloPeriodic";

public interface GreetingWorkflow {
@WorkflowMethod(
// At most one instance.
workflowId = PERIODIC_WORKFLOW_ID,
// To allow starting workflow with the same ID after the previous one has terminated.
workflowIdReusePolicy = WorkflowIdReusePolicy.AllowDuplicate,
// Adjust this value to the maximum time workflow is expected to run.
// It usually depends on the number of repetitions and interval between them.
executionStartToCloseTimeoutSeconds = 300,
taskList = TASK_LIST
)
void greetPeriodically(String name, Duration delay);
}

public interface GreetingActivities {
void greet(String greeting);
}

public static class GreetingWorkflowImpl implements GreetingWorkflow {

// If we change the value to 1, then non-determinism case will hit.
private final int CONTINUE_AS_NEW_FREQUENCEY = 1000;

private final GreetingActivities activities =
Workflow.newActivityStub(
GreetingActivities.class,
new ActivityOptions.Builder()
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
.build());

/**
* Stub used to terminate this workflow run and create the next one with the same ID atomically.
*/
private final GreetingWorkflow continueAsNew =
Workflow.newContinueAsNewStub(GreetingWorkflow.class);

@Override
public void greetPeriodically(String name, Duration delay) {
// Loop the predefined number of times then continue this workflow as new.
// This is needed to periodically truncate the history size.
for (int i = 0; i < CONTINUE_AS_NEW_FREQUENCEY; i++) {
activities.greet("Hello " + name + "!");
Workflow.sleep(delay);
}
// Current workflow run stops executing after this call.
continueAsNew.greetPeriodically(name, delay);
// unreachable line
}
}

static class GreetingActivitiesImpl implements GreetingActivities {
@Override
public void greet(String greeting) {
System.out.println("From " + Activity.getWorkflowExecution() + ": " + greeting);
}
}

public static void main(String[] args) throws InterruptedException {
// Get a new client
// NOTE: to set a different options, you can do like this:
// ClientOptions.newBuilder().setRpcTimeout(5 * 1000).build();
WorkflowClient workflowClient =
WorkflowClient.newInstance(
new Thrift2ProtoAdapter(IGrpcServiceStubs.newInstance()),
WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build());
// Get worker to poll the task list.
WorkerFactory factory = WorkerFactory.newInstance(workflowClient);
Worker worker = factory.newWorker(TASK_LIST);
// Workflows are stateful. So you need a type to create instances.
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);
// Activities are stateless and thread safe. So a shared instance is used.
worker.registerActivitiesImplementations(new GreetingActivitiesImpl());
// Start listening to the workflow and activity task lists.
factory.start();

// Start a workflow execution. Usually this is done from another program.
// To ensure that this daemon type workflow is always running try to start it periodically
// ignoring the duplicated exception.
// It is only to protect from application level failures.
// Failures of a workflow worker don't lead to workflow failures.
WorkflowExecution execution = null;
while (true) {
// Print reason of failure of the previous run, before restarting.
if (execution != null) {
WorkflowStub workflow = workflowClient.newUntypedWorkflowStub(execution, Optional.empty());
try {
workflow.getResult(Void.class); //
} catch (WorkflowException e) {
System.out.println("Previous instance failed:\n" + Throwables.getStackTraceAsString(e));
}
}
// New stub instance should be created for each new workflow start.
GreetingWorkflow workflow = workflowClient.newWorkflowStub(GreetingWorkflow.class);
try {
execution =
WorkflowClient.start(workflow::greetPeriodically, "World", Duration.ofSeconds(3));
System.out.println("Started " + execution);
} catch (DuplicateWorkflowException e) {
System.out.println("Still running as " + e.getExecution());
} catch (Throwable e) {
e.printStackTrace();
System.exit(1);
}
// This value is so low just for the sample purpose. In production workflow
// it is usually much higher.
Thread.sleep(10000);
}
}
}
Loading