Skip to content

Commit

Permalink
pw_transfer: Send continue parameters for already recevied chunks
Browse files Browse the repository at this point in the history
This changes the behavior of pw_transfer when a receiver receives a
chunk of data it already has. Instead of entering the recovery state,
slashing its window size, and requesting an immediate retransmission,
it instead simply sends a PARAMETERS_CONTINUE chunk with its current
offset and window end offset.

This is done to minimize back-and-forth between the transfer peers by
not automatically invalidating any in-flight packets, in case the
expected packet is already on the way.

Change-Id: Iae97db413da14061eaf85594446cf83a93cdf803
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/235100
Lint: Lint 🤖 <[email protected]>
Reviewed-by: Jordan Brauer <[email protected]>
Reviewed-by: Wyatt Hepler <[email protected]>
Commit-Queue: Alexei Frolov <[email protected]>
  • Loading branch information
frolv authored and CQ Bot Account committed Sep 12, 2024
1 parent 382fb98 commit 2496aee
Show file tree
Hide file tree
Showing 9 changed files with 361 additions and 75 deletions.
31 changes: 21 additions & 10 deletions pw_transfer/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -878,18 +878,29 @@ void Context::HandleReceiveChunk(const Chunk& chunk) {

void Context::HandleReceivedData(const Chunk& chunk) {
if (chunk.offset() != offset_) {
// Bad offset; reset window size to send another parameters chunk.
PW_LOG_DEBUG(
"Transfer %u expected offset %u, received %u; entering recovery "
"state",
static_cast<unsigned>(session_id_),
static_cast<unsigned>(offset_),
static_cast<unsigned>(chunk.offset()));
if (chunk.offset() + chunk.payload().size() <= offset_) {
// If the chunk's data has already been received, don't go through a full
// recovery cycle to avoid shrinking the window size and potentially
// thrashing. The expected data may already be in-flight, so just allow
// the transmitter to keep going with a CONTINUE parameters chunk.
PW_LOG_DEBUG("Transfer %u received duplicate chunk with offset %u",
id_for_log(),
static_cast<unsigned>(chunk.offset()));
SendTransferParameters(TransmitAction::kExtend);
} else {
// Bad offset; reset window size to send another parameters chunk.
PW_LOG_WARN(
"Transfer %u expected offset %u, received %u; entering recovery "
"state",
static_cast<unsigned>(session_id_),
static_cast<unsigned>(offset_),
static_cast<unsigned>(chunk.offset()));

set_transfer_state(TransferState::kRecovery);
SetTimeout(chunk_timeout_);
set_transfer_state(TransferState::kRecovery);
UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
}

UpdateAndSendTransferParameters(TransmitAction::kRetransmit);
SetTimeout(chunk_timeout_);
return;
}

Expand Down
14 changes: 9 additions & 5 deletions pw_transfer/docs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,10 @@ timeouts, data retransmissions, and handshakes.
A non-seekable stream could prematurely terminate a transfer following a
packet drop.

At present, ``pw_transfer`` requires in-order data transmission. If packets are
received out-of-order, the receiver will request that the transmitter re-send
data from the last received position.

Opening handshake
=================
Transfers begin with a three-way handshake, whose purpose is to identify the
Expand Down Expand Up @@ -647,12 +651,12 @@ pw_transfer uses a congestion control algorithm similar to that of TCP
adapted to pw_transfer's mode of operation that tunes parameters per window.

Once a portion of a window has successfully been received, it is acknowledged by
the reciever and the window size is extended. Transfers begin in a "slow start"
the receiver and the window size is extended. Transfers begin in a "slow start"
phase, during which the window is doubled on each ACK. This continues until the
transfer detects a packet loss. Once this occurs, the window size is halved and
the transfer enters a "congestion avoidance" phase for the remainder of its run.
During this phase, successful ACKs increase the window size by a single chunk,
whereas packet loss continues to half it.
transfer detects a packet loss or times out. Once this occurs, the window size
is halved and the transfer enters a "congestion avoidance" phase for the
remainder of its run. During this phase, successful ACKs increase the window
size by a single chunk, whereas packet loss continues to half it.

Transfer completion
===================
Expand Down
68 changes: 44 additions & 24 deletions pw_transfer/java/main/dev/pigweed/pw_transfer/ReadTransfer.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,26 @@ public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedExceptio
lastReceivedOffset = chunk.offset();

if (chunk.offset() != getOffset()) {
logger.atFine().log("%s expected offset %d, received %d; resending transfer parameters",
ReadTransfer.this,
getOffset(),
chunk.offset());

// For now, only in-order transfers are supported. If data is received out of
// order,
// discard this data and retransmit from the last received offset.
sendChunk(prepareTransferParameters(TransmitAction.RETRANSMIT));
changeState(new DropRecovery());
// If the chunk's data has already been received, don't go through a full
// recovery cycle to avoid shrinking the window size and potentially
// thrashing. The expected data may already be in-flight, so just allow
// the transmitter to keep going with a CONTINUE parameters chunk.
if (chunk.offset() + chunk.data().size() <= getOffset()) {
logger.atFine().log("%s received duplicate chunk with offset offset %d",
ReadTransfer.this,
chunk.offset());
sendChunk(prepareTransferParameters(TransmitAction.EXTEND, false));
} else {
logger.atFine().log("%s expected offset %d, received %d; resending transfer parameters",
ReadTransfer.this,
getOffset(),
chunk.offset());

// For now, only in-order transfers are supported. If data is received out of
// order, discard this data and retransmit from the last received offset.
sendChunk(prepareTransferParameters(TransmitAction.RETRANSMIT));
changeState(new DropRecovery());
}
setNextChunkTimeout();
return;
}
Expand Down Expand Up @@ -230,6 +240,10 @@ void setFutureResult() {
}

private VersionedChunk prepareTransferParameters(TransmitAction action) {
return prepareTransferParameters(action, true);
}

private VersionedChunk prepareTransferParameters(TransmitAction action, boolean update) {
Chunk.Type type;

switch (action) {
Expand All @@ -250,16 +264,18 @@ private VersionedChunk prepareTransferParameters(TransmitAction action) {
// avoidance.
type = Chunk.Type.PARAMETERS_CONTINUE;

if (transmitPhase == TransmitPhase.SLOW_START) {
windowSizeMultiplier *= 2;
} else {
windowSizeMultiplier += 1;
if (update) {
if (transmitPhase == TransmitPhase.SLOW_START) {
windowSizeMultiplier *= 2;
} else {
windowSizeMultiplier += 1;
}

// The window size can never exceed the user-specified maximum bytes. If it
// does, reduce
// the multiplier to the largest size that fits.
windowSizeMultiplier = min(windowSizeMultiplier, parameters.maxChunksInWindow());
}

// The window size can never exceed the user-specified maximum bytes. If it
// does, reduce
// the multiplier to the largest size that fits.
windowSizeMultiplier = min(windowSizeMultiplier, parameters.maxChunksInWindow());
break;

case RETRANSMIT:
Expand All @@ -268,18 +284,22 @@ private VersionedChunk prepareTransferParameters(TransmitAction action) {
// transition from the slow start to the congestion avoidance phase of the
// transfer.
type = Chunk.Type.PARAMETERS_RETRANSMIT;
windowSizeMultiplier = max(windowSizeMultiplier / 2, 1);
if (transmitPhase == TransmitPhase.SLOW_START) {
transmitPhase = TransmitPhase.CONGESTION_AVOIDANCE;
if (update) {
windowSizeMultiplier = max(windowSizeMultiplier / 2, 1);
if (transmitPhase == TransmitPhase.SLOW_START) {
transmitPhase = TransmitPhase.CONGESTION_AVOIDANCE;
}
}
break;

default:
throw new AssertionError("Invalid transmit action");
}

windowSize = windowSizeMultiplier * parameters.maxChunkSizeBytes();
windowEndOffset = getOffset() + windowSize;
if (update) {
windowSize = windowSizeMultiplier * parameters.maxChunkSizeBytes();
windowEndOffset = getOffset() + windowSize;
}

return setTransferParameters(newChunk(type)).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1425,12 +1425,12 @@ public void read_onlySendsOneUpdateAfterDrops() throws Exception {
.setWindowEndOffset(110)
.setMaxChunkSizeBytes(10)
.build(),
newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) // Sent due to repeated paket
newChunk(Chunk.Type.PARAMETERS_CONTINUE, id) // Sent due to repeated packet
.setOffset(100)
.setWindowEndOffset(110)
.setMaxChunkSizeBytes(10)
.build(),
newChunk(Chunk.Type.PARAMETERS_RETRANSMIT, id) // Sent due to repeated paket
newChunk(Chunk.Type.PARAMETERS_CONTINUE, id) // Sent due to repeated packet
.setOffset(100)
.setWindowEndOffset(110)
.setMaxChunkSizeBytes(10)
Expand Down
77 changes: 53 additions & 24 deletions pw_transfer/py/pw_transfer/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -778,23 +778,41 @@ async def _handle_data_chunk(self, chunk: Chunk) -> None:
assert self._state is Transfer._State.WAITING

if chunk.offset != self._offset:
# Initially, the transfer service only supports in-order transfers.
# If data is received out of order, request that the server
# retransmit from the previous offset.
_LOG.debug(
'Transfer %d expected offset %d, received %d: '
'entering recovery state',
self.id,
self._offset,
chunk.offset,
)
self._state = Transfer._State.RECOVERY
if chunk.offset + len(chunk.data) <= self._offset:
# If the chunk's data has already been received, don't go
# through a full recovery cycle to avoid shrinking the window
# size and potentially thrashing. The expected data may already
# be in-flight, so just allow the transmitter to keep going with
# a CONTINUE parameters chunk.
_LOG.debug(
'Transfer %d received duplicate chunk with offset %d',
self.id,
chunk.offset,
)
self._send_chunk(
self._transfer_parameters(
ReadTransfer._TransmitAction.EXTEND,
update=False,
)
)
else:
# Initially, the transfer service only supports in-order
# transfers. If data is received out of order, request that the
# server retransmit from the previous offset.
_LOG.debug(
'Transfer %d expected offset %d, received %d: '
'entering recovery state',
self.id,
self._offset,
chunk.offset,
)
self._state = Transfer._State.RECOVERY

self._send_chunk(
self._transfer_parameters(
ReadTransfer._TransmitAction.RETRANSMIT
self._send_chunk(
self._transfer_parameters(
ReadTransfer._TransmitAction.RETRANSMIT
)
)
)
return

self._data += chunk.data
Expand Down Expand Up @@ -881,6 +899,17 @@ def _retry_after_data_timeout(self) -> None:
)
)

def _set_transfer_parameters(
self,
chunk: Chunk,
) -> None:
chunk.offset = self._offset
chunk.window_end_offset = self._window_end_offset
chunk.max_chunk_size_bytes = self._max_chunk_size

if self._chunk_delay_us:
chunk.min_delay_microseconds = self._chunk_delay_us

def _update_and_set_transfer_parameters(
self, chunk: Chunk, action: 'ReadTransfer._TransmitAction'
) -> None:
Expand Down Expand Up @@ -921,16 +950,12 @@ def _update_and_set_transfer_parameters(
)

self._window_end_offset = self._offset + self._window_size

chunk.offset = self._offset
chunk.window_end_offset = self._window_end_offset
chunk.max_chunk_size_bytes = self._max_chunk_size

if self._chunk_delay_us:
chunk.min_delay_microseconds = self._chunk_delay_us
self._set_transfer_parameters(chunk)

def _transfer_parameters(
self, action: 'ReadTransfer._TransmitAction'
self,
action: 'ReadTransfer._TransmitAction',
update: bool = True,
) -> Chunk:
"""Returns an updated transfer parameters chunk."""

Expand All @@ -944,6 +969,10 @@ def _transfer_parameters(
chunk = Chunk(
self._configured_protocol_version, chunk_type, session_id=self.id
)
self._update_and_set_transfer_parameters(chunk, action)

if update:
self._update_and_set_transfer_parameters(chunk, action)
else:
self._set_transfer_parameters(chunk)

return chunk
Loading

0 comments on commit 2496aee

Please sign in to comment.