Skip to content

Commit

Permalink
Address comments on exposing a parameter to control the retry logic
Browse files Browse the repository at this point in the history
  • Loading branch information
lluwm committed Jan 23, 2024
1 parent 987fe18 commit 1135a4b
Showing 1 changed file with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,10 @@ public static ControlMessage generateHeartbeatMessage(CheckSumType checkSumType)
*/
@Override
public void close(boolean gracefulClose) {
close(gracefulClose, true);
}

public void close(boolean gracefulClose, boolean retryOnGracefulCloseFailure) {
synchronized (closeLock) {
if (isClosed) {
return;
Expand All @@ -416,7 +420,7 @@ public void close(boolean gracefulClose) {
producerAdapter.close(topicName, closeTimeOutInMs, gracefulClose);
OPEN_VENICE_WRITER_COUNT.decrementAndGet();
} catch (Exception e) {
handleExceptionInClose(e, gracefulClose);
handleExceptionInClose(e, gracefulClose, retryOnGracefulCloseFailure);
} finally {
threadPoolExecutor.shutdown();
isClosed = true;
Expand All @@ -426,6 +430,12 @@ public void close(boolean gracefulClose) {
}

public CompletableFuture<VeniceResourceCloseResult> closeAsync(boolean gracefulClose) {
return closeAsync(gracefulClose, true);
}

public CompletableFuture<VeniceResourceCloseResult> closeAsync(
boolean gracefulClose,
boolean retryOnGracefulCloseFailure) {
return CompletableFuture.supplyAsync(() -> {
synchronized (closeLock) {
if (isClosed) {
Expand All @@ -452,7 +462,7 @@ public CompletableFuture<VeniceResourceCloseResult> closeAsync(boolean gracefulC
producerAdapter.close(topicName, closeTimeOutInMs, gracefulClose);
OPEN_VENICE_WRITER_COUNT.decrementAndGet();
} catch (Exception e) {
handleExceptionInClose(e, gracefulClose);
handleExceptionInClose(e, gracefulClose, retryOnGracefulCloseFailure);
} finally {
threadPoolExecutor.shutdown();
isClosed = true;
Expand All @@ -464,13 +474,13 @@ public CompletableFuture<VeniceResourceCloseResult> closeAsync(boolean gracefulC
}, threadPoolExecutor);
}

void handleExceptionInClose(Exception e, boolean gracefulClose) {
void handleExceptionInClose(Exception e, boolean gracefulClose, boolean retryOnGracefulCloseFailure) {
logger.warn("Swallowed an exception while trying to close the VeniceWriter for topic: {}", topicName, e);
VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet();

// For graceful close, swallow the exception and give another try to close it ungracefully.
try {
if (gracefulClose) {
if (gracefulClose && retryOnGracefulCloseFailure) {
producerAdapter.close(topicName, closeTimeOutInMs, false);
OPEN_VENICE_WRITER_COUNT.decrementAndGet();
}
Expand Down

0 comments on commit 1135a4b

Please sign in to comment.