Skip to content

Commit

Permalink
MINOR: Remove PartitionHeader abstraction from FetchResponse sche…
Browse files Browse the repository at this point in the history
…ma (apache#9164)

This patch removes the PartitionHeader grouping from the Fetch response. With old versions of the protocol, there was no cost for this grouping, but once we add flexible version support, then it adds an extra byte to the schema for tagged fields with little apparent benefit.

Reviewers: Ismael Juma <[email protected]>, David Arthur <[email protected]>
  • Loading branch information
Jason Gustafson authored Aug 11, 2020
1 parent 7159c6d commit 0f3622c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,18 @@ private PartitionData(FetchResponseData.FetchablePartitionResponse partitionResp
// When we convert the PartitionData (and other fields) into FetchResponseData down in toMessage, we
// set the partition IDs.
this.partitionResponse = partitionResponse;
this.preferredReplica = Optional.of(partitionResponse.partitionHeader().preferredReadReplica())
this.preferredReplica = Optional.of(partitionResponse.preferredReadReplica())
.filter(replicaId -> replicaId != INVALID_PREFERRED_REPLICA_ID);

if (partitionResponse.partitionHeader().abortedTransactions() == null) {
if (partitionResponse.abortedTransactions() == null) {
this.abortedTransactions = null;
} else {
this.abortedTransactions = partitionResponse.partitionHeader().abortedTransactions().stream()
this.abortedTransactions = partitionResponse.abortedTransactions().stream()
.map(AbortedTransaction::fromMessage)
.collect(Collectors.toList());
}

this.error = Errors.forCode(partitionResponse.partitionHeader().errorCode());
this.error = Errors.forCode(partitionResponse.errorCode());
}

public PartitionData(Errors error,
Expand All @@ -154,24 +154,25 @@ public PartitionData(Errors error,
this.preferredReplica = preferredReadReplica;
this.abortedTransactions = abortedTransactions;
this.error = error;
FetchResponseData.PartitionHeader partitionHeader = new FetchResponseData.PartitionHeader();
partitionHeader.setErrorCode(error.code())
FetchResponseData.FetchablePartitionResponse partitionResponse =
new FetchResponseData.FetchablePartitionResponse();
partitionResponse.setErrorCode(error.code())
.setHighWatermark(highWatermark)
.setLastStableOffset(lastStableOffset)
.setLogStartOffset(logStartOffset);
if (abortedTransactions != null) {
partitionHeader.setAbortedTransactions(abortedTransactions.stream().map(
partitionResponse.setAbortedTransactions(abortedTransactions.stream().map(
aborted -> new FetchResponseData.AbortedTransaction()
.setProducerId(aborted.producerId)
.setFirstOffset(aborted.firstOffset))
.collect(Collectors.toList()));
} else {
partitionHeader.setAbortedTransactions(null);
partitionResponse.setAbortedTransactions(null);
}
partitionHeader.setPreferredReadReplica(preferredReadReplica.orElse(INVALID_PREFERRED_REPLICA_ID));
this.partitionResponse = new FetchResponseData.FetchablePartitionResponse()
.setPartitionHeader(partitionHeader)
.setRecordSet(records);
partitionResponse.setPreferredReadReplica(preferredReadReplica.orElse(INVALID_PREFERRED_REPLICA_ID));
partitionResponse.setRecordSet(records);

this.partitionResponse = partitionResponse;
}

public PartitionData(Errors error,
Expand Down Expand Up @@ -216,15 +217,15 @@ public Errors error() {
}

public long highWatermark() {
return partitionResponse.partitionHeader().highWatermark();
return partitionResponse.highWatermark();
}

public long lastStableOffset() {
return partitionResponse.partitionHeader().lastStableOffset();
return partitionResponse.lastStableOffset();
}

public long logStartOffset() {
return partitionResponse.partitionHeader().logStartOffset();
return partitionResponse.logStartOffset();
}

public Optional<Integer> preferredReadReplica() {
Expand Down Expand Up @@ -342,8 +343,7 @@ private static <T extends BaseRecords> LinkedHashMap<TopicPartition, PartitionDa
LinkedHashMap<TopicPartition, PartitionData<T>> responseMap = new LinkedHashMap<>();
message.responses().forEach(topicResponse -> {
topicResponse.partitionResponses().forEach(partitionResponse -> {
FetchResponseData.PartitionHeader partitionHeader = partitionResponse.partitionHeader();
TopicPartition tp = new TopicPartition(topicResponse.topic(), partitionHeader.partition());
TopicPartition tp = new TopicPartition(topicResponse.topic(), partitionResponse.partition());
PartitionData<T> partitionData = new PartitionData<>(partitionResponse);
responseMap.put(tp, partitionData);
});
Expand All @@ -366,7 +366,7 @@ private static <T extends BaseRecords> FetchResponseData toMessage(int throttleT
List<FetchResponseData.FetchablePartitionResponse> partitionResponses = new ArrayList<>();
partitionDataTopicAndPartitionData.partitions.forEach((partitionId, partitionData) -> {
// Since PartitionData alone doesn't know the partition ID, we set it here
partitionData.partitionResponse.partitionHeader().setPartition(partitionId);
partitionData.partitionResponse.setPartition(partitionId);
partitionResponses.add(partitionData.partitionResponse);
});
topicResponseList.add(new FetchResponseData.FetchableTopicResponse()
Expand Down
39 changes: 18 additions & 21 deletions clients/src/main/resources/common/message/FetchResponse.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,25 @@
"about": "The topic name." },
{ "name": "PartitionResponses", "type": "[]FetchablePartitionResponse", "versions": "0+",
"about": "The topic partitions.", "fields": [
{ "name": "PartitionHeader", "type": "PartitionHeader", "versions": "0+",
"fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no fetch error." },
{ "name": "HighWatermark", "type": "int64", "versions": "0+",
"about": "The current high water mark." },
{ "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
"about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
{ "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
"about": "The current log start offset." },
{ "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": true,
"about": "The aborted transactions.", "fields": [
{ "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
"about": "The producer id associated with the aborted transaction." },
{ "name": "FirstOffset", "type": "int64", "versions": "4+",
"about": "The first offset in the aborted transaction." }
]},
{ "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": false,
"about": "The preferred read replica for the consumer to use on its next fetch request"}
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no fetch error." },
{ "name": "HighWatermark", "type": "int64", "versions": "0+",
"about": "The current high water mark." },
{ "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
"about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
{ "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
"about": "The current log start offset." },
{ "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": true,
"about": "The aborted transactions.", "fields": [
{ "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
"about": "The producer id associated with the aborted transaction." },
{ "name": "FirstOffset", "type": "int64", "versions": "4+",
"about": "The first offset in the aborted transaction." }
]},
{ "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": false,
"about": "The preferred read replica for the consumer to use on its next fetch request"},
{ "name": "RecordSet", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."}
]}
]}
Expand Down

0 comments on commit 0f3622c

Please sign in to comment.