Skip to content

Commit

Permalink
pw_transfer: Handle continue chunks with earlier window end offsets
Browse files Browse the repository at this point in the history
http://pwrev.dev/249532 allows transfer receivers to shrink their window
size when sending a CONTINUE chunk in response to retried data. This can
cause transmitters to receive a window end offset that is earlier than
their current offset.

This updates transfer clients to handle this case by simply ignoring the
CONTINUE chunk. This will result in the receiver timing out and retrying
the chunk as a RETRANSMIT instead.

The Java client already handled this case, so it is untouched.

Fixes: 383043232
Change-Id: I0811036ffff7e92c23f3e9390dfb2764fd063e66
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/256874
Reviewed-by: Jordan Brauer <[email protected]>
Presubmit-Verified: CQ Bot Account <[email protected]>
Lint: Lint 🤖 <[email protected]>
Docs-Not-Needed: Jordan Brauer <[email protected]>
Docs-Not-Needed: Alexei Frolov <[email protected]>
Commit-Queue: Alexei Frolov <[email protected]>
  • Loading branch information
frolv authored and CQ Bot Account committed Jan 2, 2025
1 parent 912c1b5 commit 6582c5e
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 4 deletions.
69 changes: 69 additions & 0 deletions pw_transfer/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,75 @@ TEST_F(WriteTransfer, AbortIfZeroBytesAreRequested) {
EXPECT_EQ(transfer_status, Status::ResourceExhausted());
}

TEST_F(WriteTransfer, IgnoresEarlierWindowEndOffsetInContinueParameters) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();

Result<Client::Handle> handle =
legacy_client_.Write(9, reader, [&transfer_status](Status status) {
transfer_status = status;
});
ASSERT_EQ(handle.status(), OkStatus());
transfer_thread_.WaitUntilEventIsProcessed();

// The client begins by sending the ID of the resource to transfer.
rpc::PayloadsView payloads =
context_.output().payloads<Transfer::Write>(context_.channel().id());
ASSERT_EQ(payloads.size(), 1u);
EXPECT_EQ(transfer_status, Status::Unknown());

Chunk c0 = DecodeChunk(payloads[0]);
EXPECT_EQ(c0.session_id(), 9u);
EXPECT_EQ(c0.resource_id(), 9u);
EXPECT_EQ(c0.type(), Chunk::Type::kStart);

context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
.set_session_id(9)
.set_offset(0)
.set_window_end_offset(16)
.set_max_chunk_size_bytes(16)));
transfer_thread_.WaitUntilEventIsProcessed();

ASSERT_EQ(payloads.size(), 2u);

Chunk chunk = DecodeChunk(payloads[1]);
EXPECT_EQ(chunk.session_id(), 9u);
EXPECT_EQ(chunk.offset(), 0u);
EXPECT_EQ(chunk.payload().size(), 16u);

// Rewind the window end offset to earlier than the client's offset using a
// CONTINUE chunk.
context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersContinue)
.set_session_id(9)
.set_offset(10)
.set_window_end_offset(14)));
transfer_thread_.WaitUntilEventIsProcessed();

// The client should ignore it.
ASSERT_EQ(payloads.size(), 2u);

// Retry the same chunk as a RETRANSMIT.
context_.server().SendServerStream<Transfer::Write>(EncodeChunk(
Chunk(ProtocolVersion::kLegacy, Chunk::Type::kParametersRetransmit)
.set_session_id(9)
.set_offset(10)
.set_window_end_offset(14)));
transfer_thread_.WaitUntilEventIsProcessed();

// The client should respond correctly.
ASSERT_EQ(payloads.size(), 3u);
chunk = DecodeChunk(payloads[2]);
EXPECT_EQ(chunk.session_id(), 9u);
EXPECT_EQ(chunk.offset(), 10u);
EXPECT_EQ(chunk.payload().size(), 4u);

// Ensure we don't leave a dangling reference to transfer_status.
handle->Cancel();
transfer_thread_.WaitUntilEventIsProcessed();
}

TEST_F(WriteTransfer, Timeout_RetriesWithInitialChunk) {
stream::MemoryReader reader(kData32);
Status transfer_status = Status::Unknown();
Expand Down
4 changes: 4 additions & 0 deletions pw_transfer/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,10 @@ void Context::HandleTransferParametersUpdate(const Chunk& chunk) {
}

offset_ = chunk.offset();
} else if (chunk.window_end_offset() <= offset_) {
PW_LOG_DEBUG("Transfer %u ignoring old rolling window chunk", id_for_log());
SetTimeout(chunk_timeout_);
return;
}

window_end_offset_ = chunk.window_end_offset();
Expand Down
9 changes: 9 additions & 0 deletions pw_transfer/py/pw_transfer/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,15 @@ def _handle_parameters_update(self, chunk: Chunk) -> bool:
)

self._offset = chunk.offset
elif (
chunk.type is Chunk.Type.PARAMETERS_CONTINUE
and chunk.window_end_offset <= self._offset
):
_LOG.debug(
'Write transfer %d ignoring old rolling window chunk',
self.id,
)
return False

if chunk.max_chunk_size_bytes is not None:
self._max_chunk_size = chunk.max_chunk_size_bytes
Expand Down
132 changes: 132 additions & 0 deletions pw_transfer/py/tests/transfer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,138 @@ def test_v2_write_transfer_legacy_fallback(self) -> None:

self.assertEqual(self._received_data(), b'write v... NOPE')

def test_v2_write_transfer_continue_shrinks_window(self) -> None:
"""Tests version 2 write where the receiver shrinks with CONTINUE."""
manager = pw_transfer.Manager(
self._service,
default_response_timeout_s=DEFAULT_TIMEOUT_S,
default_protocol_version=ProtocolVersion.VERSION_TWO,
)

self._enqueue_server_responses(
_Method.WRITE,
(
(
transfer_pb2.Chunk(
resource_id=72,
session_id=_FIRST_SESSION_ID,
type=transfer_pb2.Chunk.Type.START_ACK,
protocol_version=ProtocolVersion.VERSION_TWO.value,
),
),
(
transfer_pb2.Chunk(
session_id=_FIRST_SESSION_ID,
type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT,
offset=0,
window_end_offset=8,
max_chunk_size_bytes=8,
),
),
(
transfer_pb2.Chunk(
session_id=_FIRST_SESSION_ID,
type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
offset=8,
window_end_offset=16,
max_chunk_size_bytes=8,
),
),
# Shrink the window end offset with a CONTINUE chunk.
(
transfer_pb2.Chunk(
session_id=_FIRST_SESSION_ID,
type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
offset=10,
window_end_offset=14,
max_chunk_size_bytes=14,
),
),
# The last chunk should be ignored; the receiver times out
# and retries with a RETRANSMIT.
(
transfer_pb2.Chunk(
session_id=_FIRST_SESSION_ID,
type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT,
offset=10,
window_end_offset=14,
max_chunk_size_bytes=14,
),
),
(
transfer_pb2.Chunk(
session_id=_FIRST_SESSION_ID,
type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE,
offset=14,
window_end_offset=30,
max_chunk_size_bytes=16,
),
),
(
transfer_pb2.Chunk(
session_id=_FIRST_SESSION_ID,
type=transfer_pb2.Chunk.Type.COMPLETION,
status=Status.OK.value,
),
),
),
)

manager.write(72, b'pigweed data transfer')

self.assertEqual(
self._sent_chunks,
[
transfer_pb2.Chunk(
transfer_id=72,
resource_id=72,
desired_session_id=_FIRST_SESSION_ID,
type=transfer_pb2.Chunk.Type.START,
protocol_version=ProtocolVersion.VERSION_TWO.value,
),
transfer_pb2.Chunk(
session_id=_FIRST_SESSION_ID,
type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION,
protocol_version=ProtocolVersion.VERSION_TWO.value,
),
transfer_pb2.Chunk(
session_id=_FIRST_SESSION_ID,
type=transfer_pb2.Chunk.Type.DATA,
offset=0,
data=b'pigweed ',
),
transfer_pb2.Chunk(
session_id=_FIRST_SESSION_ID,
type=transfer_pb2.Chunk.Type.DATA,
offset=8,
data=b'data tra',
),
transfer_pb2.Chunk(
session_id=_FIRST_SESSION_ID,
type=transfer_pb2.Chunk.Type.DATA,
offset=8,
data=b'data tra',
),
transfer_pb2.Chunk(
session_id=_FIRST_SESSION_ID,
type=transfer_pb2.Chunk.Type.DATA,
offset=10,
data=b'ta t',
),
transfer_pb2.Chunk(
session_id=_FIRST_SESSION_ID,
type=transfer_pb2.Chunk.Type.DATA,
offset=14,
data=b'ransfer',
remaining_bytes=0,
),
transfer_pb2.Chunk(
session_id=_FIRST_SESSION_ID,
type=transfer_pb2.Chunk.Type.COMPLETION_ACK,
),
],
)

def test_v2_server_error(self) -> None:
"""Tests a server error occurring during the opening handshake."""

Expand Down
12 changes: 8 additions & 4 deletions pw_transfer/ts/transfer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -436,16 +436,16 @@ export class WriteTransfer extends Transfer {
return;
}

const bytesAknowledged = chunk.getOffset();
const bytesAcknowledged = chunk.getOffset();

let writeChunk: Chunk;
// eslint-disable-next-line no-constant-condition
while (true) {
writeChunk = this.nextChunk();
this.offset += writeChunk.getData().length;
const sentRequestedBytes = this.offset === this.windowEndOffset;
const sentRequestedBytes = this.offset >= this.windowEndOffset;

this.updateProgress(this.offset, bytesAknowledged, this.data.length);
this.updateProgress(this.offset, bytesAcknowledged, this.data.length);
this.sendChunk(writeChunk);

if (sentRequestedBytes) {
Expand Down Expand Up @@ -507,6 +507,10 @@ export class WriteTransfer extends Transfer {
);
this.windowEndOffset = this.offset + maxBytesToSend;
} else {
if (chunk.getWindowEndOffset() <= this.offset) {
return false;
}

// Extend the window to the new end offset specified by the server.
this.windowEndOffset = Math.min(
chunk.getWindowEndOffset(),
Expand All @@ -533,7 +537,7 @@ export class WriteTransfer extends Transfer {

const maxBytesInChunk = Math.min(
this.maxChunkSize,
this.windowEndOffset - this.offset,
Math.max(this.windowEndOffset - this.offset, 0),
);

chunk.setData(this.data.slice(this.offset, this.offset + maxBytesInChunk));
Expand Down
66 changes: 66 additions & 0 deletions pw_transfer/ts/transfer_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -682,4 +682,70 @@ describe('Transfer client', () => {
expect(Status[error.status]).toEqual(Status[Status.INTERNAL]);
});
});

it('write transfer continue parameters with shrunk window', async () => {
const manager = new Manager(service, DEFAULT_TIMEOUT_S);

const chunk1 = new Chunk();
chunk1.setTransferId(4);
chunk1.setOffset(0);
chunk1.setPendingBytes(8);
chunk1.setMaxChunkSizeBytes(8);
chunk1.setType(Chunk.Type.PARAMETERS_RETRANSMIT);

const chunk2 = new Chunk();
chunk2.setTransferId(4);
chunk2.setOffset(8);
chunk2.setPendingBytes(8);
chunk2.setWindowEndOffset(16);
chunk2.setMaxChunkSizeBytes(8);
chunk2.setType(Chunk.Type.PARAMETERS_CONTINUE);

// The third chunk ends the window at offset 14 despite the transfer already
// being at offset 16. It should be ignored.
const chunk3 = new Chunk();
chunk3.setTransferId(4);
chunk3.setOffset(10);
chunk3.setPendingBytes(4);
chunk3.setWindowEndOffset(14);
chunk3.setMaxChunkSizeBytes(8);
chunk3.setType(Chunk.Type.PARAMETERS_CONTINUE);

// Following a timeout, the receiver retries, this time as a RETRANSMIT.
const chunk4 = new Chunk();
chunk3.setTransferId(4);
chunk3.setOffset(10);
chunk3.setPendingBytes(4);
chunk3.setWindowEndOffset(14);
chunk3.setMaxChunkSizeBytes(8);
chunk3.setType(Chunk.Type.PARAMETERS_RETRANSMIT);

const chunk5 = new Chunk();
chunk5.setTransferId(4);
chunk5.setOffset(14);
chunk5.setPendingBytes(16);
chunk5.setWindowEndOffset(30);
chunk5.setMaxChunkSizeBytes(16);
chunk5.setType(Chunk.Type.PARAMETERS_CONTINUE);

const completeChunk = new Chunk();
completeChunk.setTransferId(4);
completeChunk.setStatus(Status.OK);

enqueueServerResponses(service.method('Write')!, [
[chunk1],
[chunk2],
[chunk3, chunk4],
// [chunk4],
[chunk5],
[completeChunk],
]);

await manager.write(4, textEncoder.encode('pigweed data transfer'));
expect(sentChunks).toHaveLength(5);
expect(sentChunks[1].getData()).toEqual(textEncoder.encode('pigweed '));
expect(sentChunks[2].getData()).toEqual(textEncoder.encode('data tra'));
expect(sentChunks[3].getData()).toEqual(textEncoder.encode('ta t'));
expect(sentChunks[4].getData()).toEqual(textEncoder.encode('ransfer'));
});
});

0 comments on commit 6582c5e

Please sign in to comment.