Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request] Native Request Batching to Prevent "received message larger than max" Errors #1499

Open
dp-dandy opened this issue Aug 16, 2024 · 0 comments
Labels
enhancement New feature or request

Comments

@dp-dandy
Copy link

dp-dandy commented Aug 16, 2024

Is your feature request related to a problem? Please describe.

Context

Our team recently encountered a significant issue when attempting to start a large number of activities (~600) asynchronously within a single Temporal workflow. The SDK grouped the activity scheduling requests together, leading to a gRPC error (ResourceExhausted) because the total message size (15MB) exceeded the gRPC-imposed limit of 4MB.

This issue causes the workflow to freeze, as the SDK retries the RespondWorkflowTaskCompleted API call, which continues to fail due to the message size. Currently, the SDK does not provide any built-in mechanism to automatically handle this limitation, nor does it throw a runtime error that could help developers detect and manage the issue proactively.

Underlying Issue

  • The Temporal SDK batches multiple activity scheduling commands into a single gRPC request.
  • When the cumulative size of these commands exceeds 4MB, gRPC rejects the request with a ResourceExhausted error.
  • The SDK does not provide an intuitive way to manage this situation, leading to potential workflow freezes and silent failures.

Describe the solution you'd like

Feature Request

To improve the developer experience and robustness of the Temporal SDK, we request the following enhancements:

  1. Automatic Request Splitting:

    • Implement a mechanism in the SDK to detect when the cumulative size of a gRPC request is approaching the 4MB limit.
    • Automatically split the activity scheduling commands into smaller batches that stay within the size limit, sending them in multiple gRPC requests if necessary.
  2. Runtime Error for Exceeding Limits:

    • If automatic splitting is not feasible, the SDK should throw a descriptive runtime error when a request exceeds the gRPC message size limit.
    • This error should include suggestions or documentation references on how to adjust the code to stay within the limit, such as reducing the number of activities started simultaneously or compressing payloads.
  3. Improved Logging and Monitoring:

    • Enhance the logging around gRPC message size limits to make it easier for developers to diagnose issues related to request size.

Additional Considerations

  • Custom Data Converters: Although developers can use custom data converters to compress payloads, this should be a secondary approach. The SDK should still provide safeguards against exceeding gRPC limits.
  • Documentation: Update the SDK documentation to clearly explain the gRPC size limitation, how it interacts with Temporal workflows, and best practices for avoiding issues.

Impact

Implementing these changes would significantly reduce the likelihood of silent workflow failures and improve the overall resilience of systems built using the Temporal SDK. Developers would have better tools to manage gRPC limitations, leading to more reliable and maintainable code.


A rough incomplete solution that we are piloting is something like:

import type { ActivityFunction } from '@temporalio/common';
import { DefaultPayloadConverter } from '@temporalio/common';

const MAX_BATCH_PAYLOAD_SIZE = 3 * 1024 * 1024; // 3MB, leaving a buffer for any unknown overhead associated with the activities

const defaultPayloadConverter: DefaultPayloadConverter = new DefaultPayloadConverter();

function getPayloadSize<PayloadType extends Parameters<ActivityFunction>>(args: PayloadType): number {
    const payload = defaultPayloadConverter.toPayload(args);
    if (payload.data instanceof Uint8Array) {
        return payload.data.length;
    }
    return 0;
}

function groupIntoMaxSizePayloads<PayloadType extends Parameters<ActivityFunction>>(
    activityArgs: PayloadType[],
    maxSize: number = MAX_BATCH_PAYLOAD_SIZE,
): PayloadType[][] {
    const payloadGroups: PayloadType[][] = [];

    let currentGroup: PayloadType[] = [];
    let currentGroupSize = 0;
    for (const args of activityArgs) {
        const argsSize = getPayloadSize(args);
        if (argsSize > maxSize) {
            // throw some error about the request size being too big
        }
        // If the current group size plus the size of the next args is greater than the max size,
        // push the current group and start a new one
        if (currentGroupSize + argsSize > maxSize) {
            payloadGroups.push(currentGroup);
            currentGroup = [];
            currentGroupSize = 0;
        }
        currentGroup.push(args);
        currentGroupSize += argsSize;
    }
    // Push the last group if it's not empty
    if (currentGroup.length > 0) {
        payloadGroups.push(currentGroup);
    }
    return payloadGroups;
}

export async function safeStartActivities<Activity extends ActivityFunction, Arguments>(
    activity: Activity,
    args: Arguments[],
): Promise<PromiseSettledResult<Awaited<ReturnType<Activity>>>[]> {
    const maxRequestSizeBatches: Parameters<Activity>[][] = groupIntoMaxSizePayloads(
        args.map((arg: Arguments) => arg as Parameters<Activity>),
    );
    const results: PromiseSettledResult<Awaited<ReturnType<Activity>>>[] = [];
    for (const maxRequestSizeBatch of maxRequestSizeBatches) {
        const promises: Promise<ReturnType<Activity>>[] = maxRequestSizeBatch.map((args: Parameters<Activity>) =>
            activity(args),
        );
        const awaited = await Promise.allSettled(promises);
        results.push(...awaited);
    }
    return results;
}

Additional context

Cloud Support Thread: https://temporalio.slack.com/archives/C046BRWDV2R/p1723743294696369

@dp-dandy dp-dandy added the enhancement New feature or request label Aug 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant