Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KNET-12362] Produce V3 API - Improve visibility around produce-record errors #1300

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ea3baeb
[KNET-12362] Produce V3 API - Improve visibility around produce-recor…
wcheng92 Sep 10, 2024
61b7af7
Include counting error code with 200 ok
wcheng92 Sep 16, 2024
be83cce
[Updated] Including counting with response 200 ok (previous version c…
wcheng92 Sep 16, 2024
6d68565
[Update] shorten expectings in streamingRequestsWithExceptions
wcheng92 Sep 16, 2024
0eaba6c
[Update] Add an integration test for streaming produce with multiple …
wcheng92 Sep 17, 2024
0856038
[Update] Add comments to test_WhenProduceRequestWithMultipleRecords_T…
wcheng92 Sep 20, 2024
94613c6
Delete my.properties
wcheng92 Sep 20, 2024
975aec2
[DO NOT MERGE] Pushing for review only
wcheng92 Sep 23, 2024
e2bf768
[Update] Add override equals method for ProduceCounter and modified P…
wcheng92 Sep 24, 2024
09d1ab0
Delete .gitignore
wcheng92 Sep 24, 2024
275c8ec
Add back .gitignore
wcheng92 Sep 24, 2024
96b498c
[Update] Change class name from ProduceCounter to ProduceRecordErrorC…
wcheng92 Sep 24, 2024
5bb6b13
Delete my.properties
wcheng92 Sep 24, 2024
e04f8c0
[Update] Fix format
wcheng92 Sep 24, 2024
c184fa4
[Update] Change to more detailed method and attribute names for Produ…
wcheng92 Sep 24, 2024
b1c7990
[Update] Code structure changes
wcheng92 Sep 25, 2024
5634a9a
Delete 1
wcheng92 Sep 25, 2024
12f6ee5
[UPDATE] Remove unused variables
wcheng92 Sep 25, 2024
1026cb5
Merge pull request #7 from confluentinc/KNET-12362
wcheng92 Sep 25, 2024
8bd93fd
[Update] Remove `equals` and `hashCode` methods of ProduceRecordError…
wcheng92 Sep 26, 2024
e65b733
Merge pull request #12 from confluentinc/KNET-12362-2
wcheng92 Sep 26, 2024
acf489b
[Update] Remove counting error codes for 200
wcheng92 Oct 1, 2024
71e157d
Merge pull request #15 from confluentinc/KNET-12362-2
wcheng92 Oct 2, 2024
47c309f
[Update] Fixed assertion caused by counting 200 error code
wcheng92 Oct 2, 2024
cde8ea8
Merge branch 'airlock/KNET-12362' into KNET-12362
wcheng92 Oct 2, 2024
a864fc9
Merge pull request #17 from confluentinc/KNET-12362
wcheng92 Oct 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ private static RequestLog createRequestLog(
return new CustomLog(
requestLogWriter,
requestLogFormat,
new String[] {CustomLogRequestAttributes.REST_ERROR_CODE});
new String[] {
CustomLogRequestAttributes.REST_ERROR_CODE,
CustomLogRequestAttributes.REST_PRODUCE_RECORD_ERROR_CODE_COUNTS
});
}
// Return null, as Application's ctor would set-up a default request-logger.
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

package io.confluent.kafkarest.requestlog;

import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.eclipse.jetty.server.CustomRequestLog;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.RequestLog;
Expand All @@ -39,6 +42,8 @@ public class CustomLog extends AbstractLifeCycle implements RequestLog {

private final String[] requestAttributesToLog;

public static final String PRODUCE_ERROR_CODE_LOG_PREFIX = "Codes=";

public CustomLog(RequestLog.Writer writer, String formatString, String[] requestAttributesToLog) {
for (String attr : requestAttributesToLog) {
// Add format-specifier to log request-attributes as response-headers in Jetty's
Expand All @@ -63,6 +68,27 @@ protected void doStop() throws Exception {
}
}

/**
* This class aggregates error-codes for produce-records within a single (http)produce-request.
* This implements toString() method which is used by CustomLog to get a message to log for the
* aggregated error counts.
*/
public static class ProduceRecordErrorCounter {
private final Map<Integer, Integer> produceErrorCodeCountMap = new TreeMap<>();

public synchronized void incrementErrorCount(int httpErrorCode) {
produceErrorCodeCountMap.merge(httpErrorCode, 1, Integer::sum);
}

@Override
public synchronized String toString() {
return PRODUCE_ERROR_CODE_LOG_PREFIX
+ produceErrorCodeCountMap.entrySet().stream()
.map(entry -> entry.getKey() + ":" + entry.getValue())
.collect(Collectors.joining(","));
}
}

@Override
public void log(Request request, Response response) {
// The configured request-attributes are converted to response-headers so Jetty can log them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ public final class CustomLogRequestAttributes {
private CustomLogRequestAttributes() {}

public static final String REST_ERROR_CODE = "REST_ERROR_CODE";
public static final String REST_PRODUCE_RECORD_ERROR_CODE_COUNTS =
"REST_PRODUCE_RECORD_ERROR_CODE_COUNTS";
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import io.confluent.kafkarest.extension.ResourceAccesslistFeature.ResourceName;
import io.confluent.kafkarest.ratelimit.DoNotRateLimit;
import io.confluent.kafkarest.ratelimit.RateLimitExceededException;
import io.confluent.kafkarest.requestlog.CustomLog.ProduceRecordErrorCounter;
import io.confluent.kafkarest.requestlog.CustomLogRequestAttributes;
import io.confluent.kafkarest.resources.v3.V3ResourcesModule.ProduceResponseThreadPool;
import io.confluent.kafkarest.response.JsonStream;
import io.confluent.kafkarest.response.StreamingResponseFactory;
Expand Down Expand Up @@ -147,13 +149,19 @@ public void produce(
throw Errors.invalidPayloadException("Request body is empty. Data is required.");
}

ProduceRecordErrorCounter produceRecordErrorCounter = new ProduceRecordErrorCounter();

ProduceController controller = produceControllerProvider.get();
streamingResponseFactory
.from(requests)
.compose(
request ->
produce(clusterId, topicName, request, controller, producerMetricsProvider.get()))
.resume(asyncResponse);
.resume(asyncResponse, produceRecordErrorCounter);

httpServletRequest.setAttribute(
CustomLogRequestAttributes.REST_PRODUCE_RECORD_ERROR_CODE_COUNTS,
produceRecordErrorCounter);
}

private CompletableFuture<ProduceResponse> produce(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.confluent.kafkarest.exceptions.StatusCodeException;
import io.confluent.kafkarest.exceptions.v3.ErrorResponse;
import io.confluent.kafkarest.exceptions.v3.V3ExceptionMapper;
import io.confluent.kafkarest.requestlog.CustomLog.ProduceRecordErrorCounter;
import io.confluent.rest.entities.ErrorMessage;
import io.confluent.rest.exceptions.KafkaExceptionMapper;
import io.confluent.rest.exceptions.RestConstraintViolationException;
Expand Down Expand Up @@ -159,7 +160,8 @@ public final <O> StreamingResponse<O> compose(
* <p>This method will block until all requests are read in. The responses are computed and
* written to {@code asyncResponse} asynchronously.
*/
public final void resume(AsyncResponse asyncResponse) {
public final void resume(
AsyncResponse asyncResponse, ProduceRecordErrorCounter produceRecordErrorCounter) {
log.debug("Resuming StreamingResponse");
AsyncResponseQueue responseQueue = new AsyncResponseQueue(chunkedOutputFactory);
responseQueue.asyncResume(asyncResponse);
Expand All @@ -186,7 +188,11 @@ public final void resume(AsyncResponse asyncResponse) {
"Streaming connection open for longer than allowed",
"Connection will be closed.")))));
} else if (!closingStarted) {
responseQueue.push(next().handle(this::handleNext));
responseQueue.push(
next()
.handle(
(result, exception) ->
handleNext(result, exception, produceRecordErrorCounter)));
} else {
break;
}
Expand Down Expand Up @@ -218,11 +224,17 @@ private void closeAll(AsyncResponseQueue responseQueue) {
responseQueue.close();
}

private ResultOrError handleNext(T result, @Nullable Throwable error) {
private ResultOrError handleNext(
T result, @Nullable Throwable error, ProduceRecordErrorCounter produceRecordErrorCounter) {
if (error == null) {
return ResultOrError.result(result);
} else {
log.debug("Error processing streaming operation.", error);
if (error.getCause() == null) {
throw new IllegalArgumentException("Error cause is null", error);
}
int errorCode = EXCEPTION_MAPPER.toErrorResponse(error.getCause()).getErrorCode();
wcheng92 marked this conversation as resolved.
Show resolved Hide resolved
produceRecordErrorCounter.incrementErrorCount(errorCode);
return ResultOrError.error(EXCEPTION_MAPPER.toErrorResponse(error.getCause()));
}
}
Expand Down
Loading