Skip to content

Commit

Permalink
pw_transfer: Always terminate transfers on stream reopen
Browse files Browse the repository at this point in the history
Previously the transfer client code attempted to retry ongoing transfers
when the underlying RPC stream was closed (only initial packets in Java,
all packets in Python). This removes the retry code and always
terminates the streams, as the server no longer supports automatic
recovery.

Clients will have to manually restart transfers which failed in this
case, which should be uncommon.

Change-Id: Id6f65affc8a1892dbcb871196ba9bfa8012c0289
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/212953
Presubmit-Verified: CQ Bot Account <[email protected]>
Lint: Lint 🤖 <[email protected]>
Reviewed-by: Jordan Brauer <[email protected]>
Commit-Queue: Alexei Frolov <[email protected]>
  • Loading branch information
frolv authored and CQ Bot Account committed May 31, 2024
1 parent 66c06bc commit 962d662
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 362 deletions.
2 changes: 1 addition & 1 deletion pw_transfer/integration_test/python_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
def _perform_transfer_action(
action: config_pb2.TransferAction, transfer_manager: pw_transfer.Manager
) -> bool:
"""Performs the transfer action and returns Truen on success."""
"""Performs the transfer action and returns True on success."""
protocol_version = pw_transfer.ProtocolVersion(int(action.protocol_version))

# Default to the latest protocol version if none is specified.
Expand Down
33 changes: 0 additions & 33 deletions pw_transfer/java/main/dev/pigweed/pw_transfer/Transfer.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,6 @@ abstract class Transfer<T> extends AbstractFuture<T> {
private State state;
private VersionedChunk lastChunkSent;

// The number of times this transfer has retried due to an RPC disconnection.
// Limit this to
// maxRetries to prevent repeated crashes if reading to / writing from a
// particular transfer is
// causing crashes.
private int disconnectionRetries = 0;
private int lifetimeRetries = 0;

/**
Expand Down Expand Up @@ -182,10 +176,6 @@ final void start() {

/** Processes an incoming chunk from the server. */
final void handleChunk(VersionedChunk chunk) {
// Since a packet has been received, don't allow retries on disconnection; abort
// instead.
disconnectionRetries = Integer.MAX_VALUE;

try {
if (chunk.type() == Chunk.Type.COMPLETION) {
state.handleFinalChunk(chunk.status().orElseGet(() -> {
Expand Down Expand Up @@ -218,29 +208,6 @@ final void handleCancellation() {
state.handleCancellation();
}

/** Restarts a transfer after an RPC disconnection. */
final void handleDisconnection() {
// disconnectionRetries is set to Int.MAX_VALUE when a packet is received to
// prevent retries
// after the initial packet.
if (disconnectionRetries++ < timeoutSettings.maxRetries()) {
logger.atFine().log("Restarting the pw_transfer RPC for %s (attempt %d/%d)",
this,
disconnectionRetries,
timeoutSettings.maxRetries());
try {
sendChunk(getChunkForRetry());
} catch (TransferAbortedException e) {
return; // Transfer is aborted; nothing else to do.
}
setInitialTimeout();
} else {
changeState(new Completed(new TransferError("Transfer " + sessionId + " restarted "
+ timeoutSettings.maxRetries() + " times, aborting",
Status.INTERNAL)));
}
}

/** Returns the State to enter immediately after sending the first packet. */
abstract State getWaitingForDataState();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ class TransferEventHandler {

// Map session ID to transfer.
private final Map<Integer, Transfer<?>> sessionIdToTransfer = new HashMap<>();
// Legacy transfers only use the resource ID. The client assigns an arbitrary session ID that
// legacy servers ignore. The client then maps from the legacy ID to its local session ID.
// Legacy transfers only use the resource ID. The client assigns an arbitrary
// session ID that
// legacy servers ignore. The client then maps from the legacy ID to its local
// session ID.
private final Map<Integer, Integer> legacyIdToSessionId = new HashMap<>();

@Nullable private Call.ClientStreaming<Chunk> readStream = null;
Expand Down Expand Up @@ -131,7 +133,8 @@ private void startTransferAsClient(Transfer<?> transfer) {
throw new AssertionError("Cannot start non-zero offset transfer with legacy version");
}

// The v2 protocol supports multiple transfers for a single resource. For simplicity while
// The v2 protocol supports multiple transfers for a single resource. For
// simplicity while
// supporting both protocols, only support a single transfer per resource.
if (legacyIdToSessionId.containsKey(transfer.getResourceId())) {
transfer.terminate(
Expand All @@ -156,15 +159,21 @@ void run() {
}

/**
* Test version of run() that processes all enqueued events before checking for timeouts.
* Test version of run() that processes all enqueued events before checking for
* timeouts.
*
* Tests that need to time out should process all enqueued events first to prevent flaky failures.
* If handling one of several queued packets takes longer than the timeout (which must be short
* Tests that need to time out should process all enqueued events first to
* prevent flaky failures.
* If handling one of several queued packets takes longer than the timeout
* (which must be short
* for a unit test), then the test may fail spuriously.
*
* This run function is not used outside of tests because processing all incoming packets before
* checking for timeouts could delay the transfer client's outgoing write packets if there are
* lots of inbound packets. This could delay transfers and cause unnecessary timeouts.
* This run function is not used outside of tests because processing all
* incoming packets before
* checking for timeouts could delay the transfer client's outgoing write
* packets if there are
* lots of inbound packets. This could delay transfers and cause unnecessary
* timeouts.
*/
void runForTestsThatMustTimeOut() {
while (processEvents) {
Expand All @@ -174,6 +183,7 @@ void runForTestsThatMustTimeOut() {
handleTimeouts();
}
}

/** Stops the transfer event handler from processing events. */
void stop() {
enqueueEvent(() -> {
Expand All @@ -183,7 +193,10 @@ void stop() {
});
}

/** Blocks until all events currently in the queue are processed; for test use only. */
/**
* Blocks until all events currently in the queue are processed; for test use
* only.
*/
void waitUntilEventsAreProcessedForTest() {
Semaphore semaphore = new Semaphore(0);
enqueueEvent(semaphore::release);
Expand Down Expand Up @@ -244,9 +257,9 @@ abstract class TransferInterface {
private TransferInterface() {}

/**
* Sends the provided transfer chunk.
* Sends the provided transfer chunk.
*
* Must be called on the transfer thread.
* Must be called on the transfer thread.
*/
void sendChunk(Chunk chunk) throws TransferError {
try {
Expand All @@ -257,9 +270,9 @@ void sendChunk(Chunk chunk) throws TransferError {
}

/**
* Removes this transfer from the list of active transfers.
* Removes this transfer from the list of active transfers.
*
* Must be called on the transfer thread.
* Must be called on the transfer thread.
*/
// TODO(frolv): Investigate why this is occurring -- there shouldn't be any
// futures here.
Expand Down Expand Up @@ -311,19 +324,13 @@ public final void onError(Status status) {
enqueueEvent(() -> {
resetStream();

// The transfers remove themselves from the Map during cleanup, iterate over a copied list.
// The transfers remove themselves from the Map during cleanup, iterate over a
// copied list.
List<Transfer<?>> activeTransfers = new ArrayList<>(sessionIdToTransfer.values());

// FAILED_PRECONDITION indicates that the stream packet was not recognized as the stream is
// not open. This could occur if the server resets. Notify pending transfers that this has
// occurred so they can restart.
if (status.equals(Status.FAILED_PRECONDITION)) {
activeTransfers.forEach(Transfer::handleDisconnection);
} else {
TransferError error = new TransferError(
"Transfer stream RPC closed unexpectedly with status " + status, Status.INTERNAL);
activeTransfers.forEach(t -> t.terminate(error));
}
TransferError error = new TransferError(
"Transfer stream RPC closed unexpectedly with status " + status, Status.INTERNAL);
activeTransfers.forEach(t -> t.terminate(error));
});
}

Expand Down
Loading

0 comments on commit 962d662

Please sign in to comment.