diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionInvisibleDurationIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionInvisibleDurationIT.java new file mode 100644 index 000000000000..b74e1a7444b1 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionInvisibleDurationIT.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.subscription.it.local; + +import org.apache.iotdb.isession.ISession; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; +import org.apache.iotdb.session.subscription.SubscriptionSession; +import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; +import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; +import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; + +import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class}) +public class IoTDBSubscriptionInvisibleDurationIT extends AbstractSubscriptionLocalIT { + + private static final Logger LOGGER = + LoggerFactory.getLogger(IoTDBSubscriptionInvisibleDurationIT.class); + + private static final Duration INVISIBLE_DURATION = Duration.ofSeconds(5L); + private static final int RECONSUME_COUNT = 3; + + @Test + public void testReconsumeByPoll() throws Exception { + testInvisibleDurationInternal( + (consumer, rowCount, reconsumeCount) -> { + LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time + final List messages; + if (reconsumeCount.incrementAndGet() < RECONSUME_COUNT) { + messages = + consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS, INVISIBLE_DURATION); + } else { + messages = consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); + } + for (final SubscriptionMessage message : messages) { + for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { + while (dataSet.hasNext()) { + dataSet.next(); + rowCount.addAndGet(1); + } + } + } + if (reconsumeCount.get() >= RECONSUME_COUNT) { + consumer.commitSync(messages); + } + }); + } + + @Test + public void testReconsumeByChangeInvisibleDuration() throws Exception { + testInvisibleDurationInternal( + (consumer, rowCount, reconsumeCount) -> { + LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time + final List messages = + consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); + for (final SubscriptionMessage message : messages) { + for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) { + while (dataSet.hasNext()) { + dataSet.next(); + rowCount.addAndGet(1); + } + } + } + if (reconsumeCount.incrementAndGet() < RECONSUME_COUNT) { + consumer.changeInvisibleDuration(messages, INVISIBLE_DURATION); + } else { + consumer.commitSync(messages); + } + }); + } + + private void testInvisibleDurationInternal(final ReconsumeAction action) throws Exception { + // Insert some historical data + try (final ISession session = EnvFactory.getEnv().getSessionConnection()) { + session.createDatabase("root.db"); + for (int i = 0; i < 100; ++i) { + session.executeNonQueryStatement( + String.format("insert into root.db.d1(time, s1) values (%s, 1)", i)); + } + session.executeNonQueryStatement("flush"); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + // Create topic + final String topicName = "topic1"; + final String host = EnvFactory.getEnv().getIP(); + final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); + try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.PATTERN_KEY, "root.db.d1.s1"); + session.createTopic(topicName, config); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + // Subscription + final AtomicInteger rowCount = new AtomicInteger(); + final AtomicInteger reconsumeCount = new AtomicInteger(0); + final AtomicBoolean isClosed = new AtomicBoolean(false); + final Thread thread = + new Thread( + () -> { + try (final SubscriptionPullConsumer consumer = + new SubscriptionPullConsumer.Builder() + .host(host) + .port(port) + .consumerId("c1") + .consumerGroupId("cg1") + .autoCommit(false) + .buildPullConsumer()) { + consumer.open(); + consumer.subscribe(topicName); + while (!isClosed.get()) { + action.apply(consumer, rowCount, reconsumeCount); + } + consumer.unsubscribe(topicName); + } catch (final Exception e) { + e.printStackTrace(); + // Avoid failure + } finally { + LOGGER.info("consumer exiting..."); + } + }, + String.format("%s - consumer", testName.getDisplayName())); + thread.start(); + + // Check row count + try { + // Keep retrying if there are execution failures + AWAIT.untilAsserted(() -> Assert.assertEquals(100 * RECONSUME_COUNT, rowCount.get())); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + isClosed.set(true); + thread.join(); + } + } + + @FunctionalInterface + private interface ReconsumeAction { + void apply( + SubscriptionPullConsumer consumer, AtomicInteger rowCount, AtomicInteger reconsumeCount); + } +} diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollRequest.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollRequest.java index b9337e5779fe..bb94dbe3f92b 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollRequest.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollRequest.java @@ -41,15 +41,20 @@ public class SubscriptionPollRequest { /** The maximum size, in bytes, for the response payload. */ private final transient long maxBytes; + /** The duration in milliseconds for polled messages should remain invisible. */ + private final transient long invisibleDurationMs; + public SubscriptionPollRequest( final short requestType, final SubscriptionPollPayload payload, final long timeoutMs, - final long maxBytes) { + final long maxBytes, + final long invisibleDurationMs) { this.requestType = requestType; this.payload = payload; this.timeoutMs = timeoutMs; this.maxBytes = maxBytes; + this.invisibleDurationMs = invisibleDurationMs; } public short getRequestType() { @@ -68,6 +73,10 @@ public long getMaxBytes() { return maxBytes; } + public long getInvisibleDurationMs() { + return invisibleDurationMs; + } + //////////////////////////// serialization //////////////////////////// public static ByteBuffer serialize(final SubscriptionPollRequest request) throws IOException { @@ -83,6 +92,7 @@ private void serialize(final DataOutputStream stream) throws IOException { payload.serialize(stream); ReadWriteIOUtils.write(timeoutMs, stream); ReadWriteIOUtils.write(maxBytes, stream); + ReadWriteIOUtils.write(invisibleDurationMs, stream); } public static SubscriptionPollRequest deserialize(final ByteBuffer buffer) { @@ -109,7 +119,9 @@ public static SubscriptionPollRequest deserialize(final ByteBuffer buffer) { final long timeoutMs = ReadWriteIOUtils.readLong(buffer); final long maxBytes = ReadWriteIOUtils.readLong(buffer); - return new SubscriptionPollRequest(requestType, payload, timeoutMs, maxBytes); + final long invisibleDurationMs = ReadWriteIOUtils.readLong(buffer); + return new SubscriptionPollRequest( + requestType, payload, timeoutMs, maxBytes, invisibleDurationMs); } /////////////////////////////// object /////////////////////////////// @@ -124,6 +136,8 @@ public String toString() { + timeoutMs + ", maxBytes=" + maxBytes + + ", invisibleDurationMs=" + + invisibleDurationMs + "}"; } } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java index 01d173d27428..2349e88c96a3 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java @@ -38,6 +38,14 @@ public class SubscriptionPollResponse { private final transient SubscriptionCommitContext commitContext; + /** + * The subscription poll request associated with this response. + * + *

This field is set when the client deserializes the RPC response into {@code + * SubscriptionPollResponse}, linking this response to the corresponding request. + */ + private SubscriptionPollRequest request; + public SubscriptionPollResponse( final short responseType, final SubscriptionPollPayload payload, @@ -59,6 +67,14 @@ public SubscriptionCommitContext getCommitContext() { return commitContext; } + public void setRequest(final SubscriptionPollRequest request) { + this.request = request; + } + + public SubscriptionPollRequest getRequest() { + return request; + } + /////////////////////////////// de/ser /////////////////////////////// public static ByteBuffer serialize(final SubscriptionPollResponse response) throws IOException { diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeCommitReq.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeCommitReq.java index f11c47d01a02..1b06e13c565e 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeCommitReq.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeCommitReq.java @@ -36,8 +36,29 @@ public class PipeSubscribeCommitReq extends TPipeSubscribeReq { private transient List commitContexts = new ArrayList<>(); + /** + * Indicates the acknowledgement status of the messages corresponding to {@link + * PipeSubscribeCommitReq#commitContexts}. + * + *

If {@code nack} is {@code false}, it indicates an acknowledgement (ack) meaning the + * consumption of messages was successful. + * + *

If {@code nack} is {@code true}, it indicates a negative acknowledgement (nack) meaning the + * consumption of messages was not successful. + */ private transient boolean nack; + /** + * The duration in milliseconds for which messages corresponding to {@link + * PipeSubscribeCommitReq#commitContexts} should remain invisible. + * + *

This field is effective only when {@link PipeSubscribeCommitReq#nack} is {@code true}. + * + *

If is zero, the messages will be immediately re-consumed. Otherwise, it sets the invisible + * duration for the messages. + */ + private transient long invisibleDurationMs; + public List getCommitContexts() { return commitContexts; } @@ -46,6 +67,10 @@ public boolean isNack() { return nack; } + public long getInvisibleDurationMs() { + return invisibleDurationMs; + } + /////////////////////////////// Thrift /////////////////////////////// /** @@ -53,7 +78,10 @@ public boolean isNack() { * client. */ public static PipeSubscribeCommitReq toTPipeSubscribeReq( - final List commitContexts, final boolean nack) throws IOException { + final List commitContexts, + final boolean nack, + final long invisibleDurationMs) + throws IOException { final PipeSubscribeCommitReq req = new PipeSubscribeCommitReq(); req.commitContexts = commitContexts; @@ -68,6 +96,7 @@ public static PipeSubscribeCommitReq toTPipeSubscribeReq( commitContext.serialize(outputStream); } ReadWriteIOUtils.write(nack, outputStream); + ReadWriteIOUtils.write(invisibleDurationMs, outputStream); req.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); } @@ -84,6 +113,7 @@ public static PipeSubscribeCommitReq fromTPipeSubscribeReq(final TPipeSubscribeR req.commitContexts.add(SubscriptionCommitContext.deserialize(commitReq.body)); } req.nack = ReadWriteIOUtils.readBool(commitReq.body); + req.invisibleDurationMs = ReadWriteIOUtils.readLong(commitReq.body); } req.version = commitReq.version; @@ -106,6 +136,7 @@ public boolean equals(final Object obj) { final PipeSubscribeCommitReq that = (PipeSubscribeCommitReq) obj; return Objects.equals(this.commitContexts, that.commitContexts) && Objects.equals(this.nack, that.nack) + && Objects.equals(this.invisibleDurationMs, that.invisibleDurationMs) && this.version == that.version && this.type == that.type && Objects.equals(this.body, that.body); @@ -113,6 +144,6 @@ public boolean equals(final Object obj) { @Override public int hashCode() { - return Objects.hash(commitContexts, nack, version, type, body); + return Objects.hash(commitContexts, nack, invisibleDurationMs, version, type, body); } } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java index cd7f2b20571b..59d8f86d01bf 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java @@ -57,6 +57,7 @@ import java.nio.file.InvalidPathException; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -94,6 +95,8 @@ abstract class SubscriptionConsumer implements AutoCloseable { private static final long SLEEP_DELTA_MS = 50L; private static final long TIMER_DELTA_MS = 250L; + protected static final Duration DEFAULT_INVISIBLE_DURATION = Duration.ofSeconds(600L); + private final String username; private final String password; @@ -471,7 +474,9 @@ private Path getFilePath( }); protected List multiplePoll( - /* @NotNull */ final Set topicNames, final long timeoutMs) { + /* @NotNull */ final Set topicNames, + final long timeoutMs, + final long invisibleDurationMs) { if (topicNames.isEmpty()) { return Collections.emptyList(); } @@ -481,7 +486,7 @@ protected List multiplePoll( SubscriptionExecutorServiceManager.getAvailableThreadCountForPollTasks(); if (availableCount == 0) { // non-strict timeout - return singlePoll(topicNames, timeoutMs); + return singlePoll(topicNames, timeoutMs, invisibleDurationMs); } // dividing topics @@ -489,7 +494,7 @@ protected List multiplePoll( final List> partitionedTopicNames = partition(topicNames, Math.min(maxPollParallelism, availableCount)); for (final Set partition : partitionedTopicNames) { - tasks.add(new PollTask(partition, timeoutMs)); + tasks.add(new PollTask(partition, timeoutMs, invisibleDurationMs)); } // submit multiple tasks to poll messages @@ -550,20 +555,25 @@ private class PollTask implements Callable> { private final Set topicNames; private final long timeoutMs; + private final long invisibleDurationMs; - public PollTask(final Set topicNames, final long timeoutMs) { + public PollTask( + final Set topicNames, final long timeoutMs, final long invisibleDurationMs) { this.topicNames = topicNames; this.timeoutMs = timeoutMs; + this.invisibleDurationMs = invisibleDurationMs; } @Override public List call() { - return singlePoll(topicNames, timeoutMs); + return singlePoll(topicNames, timeoutMs, invisibleDurationMs); } } private List singlePoll( - /* @NotNull */ final Set topicNames, final long timeoutMs) + /* @NotNull */ final Set topicNames, + final long timeoutMs, + final long invisibleDurationMs) throws SubscriptionException { if (topicNames.isEmpty()) { return Collections.emptyList(); @@ -578,7 +588,7 @@ private List singlePoll( final List currentMessages = new ArrayList<>(); try { currentResponses.clear(); - currentResponses = pollInternal(topicNames); + currentResponses = pollInternal(topicNames, invisibleDurationMs); for (final SubscriptionPollResponse response : currentResponses) { final short responseType = response.getResponseType(); if (!SubscriptionPollResponseType.isValidatedResponseType(responseType)) { @@ -612,7 +622,7 @@ private List singlePoll( e); // nack and clear current responses try { - nack(currentResponses); + nackResponses(currentResponses); currentResponses.clear(); } catch (final Exception ignored) { } @@ -652,7 +662,7 @@ private List singlePoll( if (Thread.currentThread().isInterrupted()) { // nack and clear current responses try { - nack(currentResponses); + nackResponses(currentResponses); currentResponses.clear(); } catch (final Exception ignored) { } @@ -678,7 +688,13 @@ private Optional pollFile(final SubscriptionPollResponse re final Path filePath = getFilePath(topicName, fileName, true, true); final File file = filePath.toFile(); try (final RandomAccessFile fileWriter = new RandomAccessFile(file, "rw")) { - return Optional.of(pollFileInternal(commitContext, fileName, file, fileWriter)); + return Optional.of( + pollFileInternal( + commitContext, + fileName, + file, + fileWriter, + response.getRequest().getInvisibleDurationMs())); } catch (final Exception e) { // construct temporary message to nack nack( @@ -692,7 +708,8 @@ private SubscriptionMessage pollFileInternal( final SubscriptionCommitContext commitContext, final String rawFileName, final File file, - final RandomAccessFile fileWriter) + final RandomAccessFile fileWriter, + final long invisibleDurationMs) throws IOException, SubscriptionException { LOGGER.info( "{} start to poll file {} with commit context {}", @@ -703,7 +720,7 @@ private SubscriptionMessage pollFileInternal( long writingOffset = fileWriter.length(); while (true) { final List responses = - pollFileInternal(commitContext, writingOffset); + pollFileInternal(commitContext, writingOffset, invisibleDurationMs); // It's agreed that the server will always return at least one response, even in case of // failure. @@ -881,7 +898,8 @@ private Optional pollTabletsInternal( } final List responses = - pollTabletsInternal(commitContext, nextOffset); + pollTabletsInternal( + commitContext, nextOffset, initialResponse.getRequest().getInvisibleDurationMs()); // It's agreed that the server will always return at least one response, even in case of // failure. @@ -950,8 +968,8 @@ private Optional pollTabletsInternal( } } - private List pollInternal(final Set topicNames) - throws SubscriptionException { + private List pollInternal( + final Set topicNames, final long invisibleDurationMs) throws SubscriptionException { providers.acquireReadLock(); try { final SubscriptionProvider provider = providers.getNextAvailableProvider(); @@ -966,7 +984,7 @@ private List pollInternal(final Set topicNames } // ignore SubscriptionConnectionException to improve poll auto retry try { - return provider.poll(topicNames); + return provider.poll(topicNames, invisibleDurationMs); } catch (final SubscriptionConnectionException ignored) { return Collections.emptyList(); } @@ -976,7 +994,9 @@ private List pollInternal(final Set topicNames } private List pollFileInternal( - final SubscriptionCommitContext commitContext, final long writingOffset) + final SubscriptionCommitContext commitContext, + final long writingOffset, + final long invisibleDurationMs) throws SubscriptionException { final int dataNodeId = commitContext.getDataNodeId(); providers.acquireReadLock(); @@ -993,7 +1013,7 @@ private List pollFileInternal( } // ignore SubscriptionConnectionException to improve poll auto retry try { - return provider.pollFile(commitContext, writingOffset); + return provider.pollFile(commitContext, writingOffset, invisibleDurationMs); } catch (final SubscriptionConnectionException ignored) { return Collections.emptyList(); } @@ -1003,7 +1023,9 @@ private List pollFileInternal( } private List pollTabletsInternal( - final SubscriptionCommitContext commitContext, final int offset) + final SubscriptionCommitContext commitContext, + final int offset, + final long invisibleDurationMs) throws SubscriptionException { final int dataNodeId = commitContext.getDataNodeId(); providers.acquireReadLock(); @@ -1020,7 +1042,7 @@ private List pollTabletsInternal( } // ignore SubscriptionConnectionException to improve poll auto retry try { - return provider.pollTablets(commitContext, offset); + return provider.pollTablets(commitContext, offset, invisibleDurationMs); } catch (final SubscriptionConnectionException ignored) { return Collections.emptyList(); } @@ -1041,11 +1063,17 @@ protected void ack(final Iterable messages) throws Subscrip } for (final Map.Entry> entry : dataNodeIdToSubscriptionCommitContexts.entrySet()) { - commitInternal(entry.getKey(), entry.getValue(), false); + commitInternal(entry.getKey(), entry.getValue(), false, 0L); } } protected void nack(final Iterable messages) throws SubscriptionException { + nackMessages(messages, 0L); + } + + protected void nackMessages( + final Iterable messages, final long invisibleDurationMs) + throws SubscriptionException { final Map> dataNodeIdToSubscriptionCommitContexts = new HashMap<>(); for (final SubscriptionMessage message : messages) { @@ -1063,11 +1091,12 @@ protected void nack(final Iterable messages) throws Subscri } for (final Map.Entry> entry : dataNodeIdToSubscriptionCommitContexts.entrySet()) { - commitInternal(entry.getKey(), entry.getValue(), true); + commitInternal(entry.getKey(), entry.getValue(), true, invisibleDurationMs); } } - private void nack(final List responses) throws SubscriptionException { + private void nackResponses(final Iterable responses) + throws SubscriptionException { final Map> dataNodeIdToSubscriptionCommitContexts = new HashMap<>(); for (final SubscriptionPollResponse response : responses) { @@ -1078,14 +1107,15 @@ private void nack(final List responses) throws Subscri } for (final Entry> entry : dataNodeIdToSubscriptionCommitContexts.entrySet()) { - commitInternal(entry.getKey(), entry.getValue(), true); + commitInternal(entry.getKey(), entry.getValue(), true, 0L); } } private void commitInternal( final int dataNodeId, final List subscriptionCommitContexts, - final boolean nack) + final boolean nack, + final long invisibleDurationMs) throws SubscriptionException { providers.acquireReadLock(); try { @@ -1099,7 +1129,7 @@ private void commitInternal( "something unexpected happened when %s commit (nack: %s) messages to subscription provider with data node id %s, the subscription provider may be unavailable or not existed", this, nack, dataNodeId)); } - provider.commit(subscriptionCommitContexts, nack); + provider.commit(subscriptionCommitContexts, nack, invisibleDurationMs); } finally { providers.releaseReadLock(); } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java index ef03b6f20cbd..abf468d406fb 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java @@ -299,35 +299,43 @@ Map unsubscribe(final Set topicNames) throws Subscr return unsubscribeResp.getTopics(); } - List poll(final Set topicNames) throws SubscriptionException { + List poll(final Set topicNames, final long invisibleDurationMs) + throws SubscriptionException { return poll( new SubscriptionPollRequest( SubscriptionPollRequestType.POLL.getType(), new PollPayload(topicNames), 0L, - thriftMaxFrameSize)); + thriftMaxFrameSize, + invisibleDurationMs)); } List pollFile( - final SubscriptionCommitContext commitContext, final long writingOffset) + final SubscriptionCommitContext commitContext, + final long writingOffset, + final long invisibleDurationMs) throws SubscriptionException { return poll( new SubscriptionPollRequest( SubscriptionPollRequestType.POLL_FILE.getType(), new PollFilePayload(commitContext, writingOffset), 0L, - thriftMaxFrameSize)); + thriftMaxFrameSize, + invisibleDurationMs)); } List pollTablets( - final SubscriptionCommitContext commitContext, final int offset) + final SubscriptionCommitContext commitContext, + final int offset, + final long invisibleDurationMs) throws SubscriptionException { return poll( new SubscriptionPollRequest( SubscriptionPollRequestType.POLL_TABLETS.getType(), new PollTabletsPayload(commitContext, offset), 0L, - thriftMaxFrameSize)); + thriftMaxFrameSize, + invisibleDurationMs)); } List poll(final SubscriptionPollRequest pollMessage) @@ -358,14 +366,22 @@ List poll(final SubscriptionPollRequest pollMessage) } verifyPipeSubscribeSuccess(resp.status); final PipeSubscribePollResp pollResp = PipeSubscribePollResp.fromTPipeSubscribeResp(resp); - return pollResp.getResponses(); + final List responses = pollResp.getResponses(); + // set origin request for each response + responses.forEach(response -> response.setRequest(pollMessage)); + return responses; } - void commit(final List subscriptionCommitContexts, final boolean nack) + void commit( + final List subscriptionCommitContexts, + final boolean nack, + final long invisibleDurationMs) throws SubscriptionException { final PipeSubscribeCommitReq req; try { - req = PipeSubscribeCommitReq.toTPipeSubscribeReq(subscriptionCommitContexts, nack); + req = + PipeSubscribeCommitReq.toTPipeSubscribeReq( + subscriptionCommitContexts, nack, invisibleDurationMs); } catch (final IOException e) { LOGGER.warn( "IOException occurred when SubscriptionProvider {} serialize commit request {}", diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java index ae9f99b969d9..c2d730378341 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java @@ -138,21 +138,119 @@ public synchronized void close() { /////////////////////////////// poll & commit /////////////////////////////// + /** + * Polls subscription messages from the subscribed topics with a specified timeout. + * + * @param timeout the maximum time to wait for messages, specified as a {@link Duration}. + * @return a list of {@link SubscriptionMessage} from the subscribed topics. + * @throws SubscriptionException if there is an error during polling. + */ public List poll(final Duration timeout) throws SubscriptionException { - return poll(Collections.emptySet(), timeout.toMillis()); + return poll(Collections.emptySet(), timeout.toMillis(), DEFAULT_INVISIBLE_DURATION); } + /** + * Polls subscription messages from the subscribed topics with a specified timeout. + * + * @param timeoutMs the maximum time to wait for messages, in milliseconds. + * @return a list of {@link SubscriptionMessage} from the subscribed topics. + * @throws SubscriptionException if there is an error during polling. + */ public List poll(final long timeoutMs) throws SubscriptionException { - return poll(Collections.emptySet(), timeoutMs); + return poll(Collections.emptySet(), timeoutMs, DEFAULT_INVISIBLE_DURATION); } + /** + * Polls subscription messages from the subscribed topics with a specified timeout and invisible + * duration. + * + * @param timeout the maximum time to wait for messages, specified as a {@link Duration}. + * @param invisibleDuration the duration for which the polled messages will remain invisible, + * specified as a {@link Duration}. + * @return a list of {@link SubscriptionMessage} from the subscribed topics. + * @throws SubscriptionException if there is an error during polling. + */ + public List poll(final Duration timeout, final Duration invisibleDuration) + throws SubscriptionException { + return poll(Collections.emptySet(), timeout.toMillis(), invisibleDuration); + } + + /** + * Polls subscription messages from the subscribed topics with a specified timeout and invisible + * duration. + * + * @param timeoutMs the maximum time to wait for messages, in milliseconds. + * @param invisibleDuration the duration for which the polled messages will remain invisible, + * specified as a {@link Duration}. + * @return a list of {@link SubscriptionMessage} from the subscribed topics. + * @throws SubscriptionException if there is an error during polling. + */ + public List poll(final long timeoutMs, final Duration invisibleDuration) + throws SubscriptionException { + return poll(Collections.emptySet(), timeoutMs, invisibleDuration); + } + + /** + * Polls subscription messages from the specified topics with a specified timeout. + * + * @param topicNames the set of topic names to poll messages from. If empty, all subscribed topics + * will be polled. + * @param timeout the maximum time to wait for messages, specified as a {@link Duration}. + * @return a list of {@link SubscriptionMessage} from the specified topics. + * @throws SubscriptionException if there is an error during polling. + */ public List poll(final Set topicNames, final Duration timeout) throws SubscriptionException { - return poll(topicNames, timeout.toMillis()); + return poll(topicNames, timeout.toMillis(), DEFAULT_INVISIBLE_DURATION); } + /** + * Polls subscription messages from the specified topics with a specified timeout. + * + * @param topicNames the set of topic names to poll messages from. If empty, all subscribed topics + * will be polled. + * @param timeoutMs the maximum time to wait for messages, in milliseconds. + * @return a list of {@link SubscriptionMessage} from the specified topics. + * @throws SubscriptionException if there is an error during polling. + */ public List poll(final Set topicNames, final long timeoutMs) throws SubscriptionException { + return poll(topicNames, timeoutMs, DEFAULT_INVISIBLE_DURATION); + } + + /** + * Polls subscription messages from the specified topics with a specified timeout and invisible + * duration. + * + * @param topicNames the set of topic names to poll messages from. If empty, all subscribed topics + * will be polled. + * @param timeout the maximum time to wait for messages, specified as a {@link Duration}. + * @param invisibleDuration the duration for which the polled messages will remain invisible, + * specified as a {@link Duration}. + * @return a list of {@link SubscriptionMessage} from the specified topics. + * @throws SubscriptionException if there is an error during polling. + */ + public List poll( + final Set topicNames, final Duration timeout, final Duration invisibleDuration) + throws SubscriptionException { + return poll(topicNames, timeout.toMillis(), invisibleDuration); + } + + /** + * Polls subscription messages from the specified topics with a specified timeout and invisible + * duration. + * + * @param topicNames the set of topic names to poll messages from. If empty, all subscribed topics + * will be polled. + * @param timeoutMs the maximum time to wait for messages, in milliseconds. + * @param invisibleDuration the duration for which the polled messages will remain invisible, + * specified as a {@link Duration}. + * @return a list of {@link SubscriptionMessage} from the specified topics. + * @throws SubscriptionException if there is an error during polling. + */ + public List poll( + final Set topicNames, final long timeoutMs, final Duration invisibleDuration) + throws SubscriptionException { // parse topic names from external source Set parsedTopicNames = topicNames.stream().map(IdentifierUtils::parseIdentifier).collect(Collectors.toSet()); @@ -175,7 +273,8 @@ public List poll(final Set topicNames, final long t return Collections.emptyList(); } - final List messages = multiplePoll(parsedTopicNames, timeoutMs); + final List messages = + multiplePoll(parsedTopicNames, timeoutMs, invisibleDuration.toMillis()); if (messages.isEmpty()) { LOGGER.info( "SubscriptionPullConsumer {} poll empty message from topics {} after {} millisecond(s)", @@ -202,32 +301,93 @@ public List poll(final Set topicNames, final long t /////////////////////////////// commit /////////////////////////////// + /** + * Commits the specified message synchronously. + * + * @param message the subscription message to commit. + * @throws SubscriptionException if there is an error during the commit. + */ public void commitSync(final SubscriptionMessage message) throws SubscriptionException { super.ack(Collections.singletonList(message)); } + /** + * Commits the specified messages synchronously. + * + * @param messages an iterable collection of subscription messages to commit. + * @throws SubscriptionException if there is an error during the commit. + */ public void commitSync(final Iterable messages) throws SubscriptionException { super.ack(messages); } + /** + * Commits the specified message asynchronously. + * + * @param message the subscription message to commit. + * @return a {@link CompletableFuture} that completes when the commit operation is finished. + */ public CompletableFuture commitAsync(final SubscriptionMessage message) { return super.commitAsync(Collections.singletonList(message)); } + /** + * Commits the specified messages asynchronously. + * + * @param messages an iterable collection of subscription messages to commit. + * @return a {@link CompletableFuture} that completes when the commit operation is finished. + */ public CompletableFuture commitAsync(final Iterable messages) { return super.commitAsync(messages); } + /** + * Commits the specified message asynchronously with a callback upon completion. + * + * @param message the subscription message to commit. + * @param callback the callback to invoke upon completion of the commit operation. + */ public void commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback) { super.commitAsync(Collections.singletonList(message), callback); } + /** + * Commits the specified messages asynchronously with a callback upon completion. + * + * @param messages an iterable collection of subscription messages to commit. + * @param callback the callback to invoke upon completion of the commit operation. + */ public void commitAsync( final Iterable messages, final AsyncCommitCallback callback) { super.commitAsync(messages, callback); } + /** + * Changes the invisible duration of the specified message. + * + * @param message the subscription message for which to change the invisible duration. + * @param invisibleDuration the new invisible duration, specified as a {@link Duration}. If the + * message is already visible to the consumer, the behavior is undefined. + */ + public void changeInvisibleDuration( + final SubscriptionMessage message, final Duration invisibleDuration) { + super.nackMessages(Collections.singletonList(message), invisibleDuration.toMillis()); + } + + /** + * Changes the invisible duration of the specified messages. + * + * @param messages an iterable collection of subscription messages for which to change the + * invisible duration. + * @param invisibleDuration the new invisible duration, specified as a {@link Duration}. If any of + * the messages are already visible to the consumer, the behavior is undefined. + */ + public void changeInvisibleDuration( + final Iterable messages, final Duration invisibleDuration) { + super.nackMessages(messages, invisibleDuration.toMillis()); + } + /////////////////////////////// auto commit /////////////////////////////// private void submitAutoCommitWorker() { diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java index 953894eece7f..9a7c59df9264 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java @@ -173,7 +173,10 @@ public void run() { try { final List messages = - multiplePoll(subscribedTopics.keySet(), autoPollTimeoutMs); + multiplePoll( + subscribedTopics.keySet(), + autoPollTimeoutMs, + DEFAULT_INVISIBLE_DURATION.toMillis()); if (messages.isEmpty()) { LOGGER.info( "SubscriptionPushConsumer {} poll empty message from topics {} after {} millisecond(s)", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java index 13770fdbf7c2..10ea0149bed2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java @@ -101,7 +101,8 @@ public List pollTablets( public List commit( final ConsumerConfig consumerConfig, final List commitContexts, - final boolean nack) { + final boolean nack, + final long invisibleDurationMs) { final String consumerGroupId = consumerConfig.getConsumerGroupId(); final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId); if (Objects.isNull(broker)) { @@ -112,7 +113,7 @@ public List commit( throw new SubscriptionException(errorMessage); } final String consumerId = consumerConfig.getConsumerId(); - return broker.commit(consumerId, commitContexts, nack); + return broker.commit(consumerId, commitContexts, nack, invisibleDurationMs); } /////////////////////////////// broker /////////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java index 420b571103a8..9262022fe912 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java @@ -142,7 +142,8 @@ public List poll( eventsToPollWithEventsToNack.right.stream() .map(SubscriptionEvent::getCommitContext) .collect(Collectors.toList()), - true); + true, + 0L); return eventsToPollWithEventsToNack.left; } @@ -230,7 +231,8 @@ public List pollTablets( public List commit( final String consumerId, final List commitContexts, - final boolean nack) { + final boolean nack, + final long invisibleDurationMs) { final List successfulCommitContexts = new ArrayList<>(); for (final SubscriptionCommitContext commitContext : commitContexts) { final String topicName = commitContext.getTopicName(); @@ -255,7 +257,7 @@ public List commit( successfulCommitContexts.add(commitContext); } } else { - if (prefetchingQueue.nack(consumerId, commitContext)) { + if (prefetchingQueue.nack(consumerId, commitContext, invisibleDurationMs)) { successfulCommitContexts.add(commitContext); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index e072b7a8d512..b1145754e68e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -470,10 +470,13 @@ private boolean ackInternal( /** * @return {@code true} if nack successfully */ - public boolean nack(final String consumerId, final SubscriptionCommitContext commitContext) { + public boolean nack( + final String consumerId, + final SubscriptionCommitContext commitContext, + final long invisibleDurationMs) { acquireReadLock(); try { - return !isClosed() && nackInternal(consumerId, commitContext); + return !isClosed() && nackInternal(consumerId, commitContext, invisibleDurationMs); } finally { releaseReadLock(); } @@ -483,7 +486,9 @@ public boolean nack(final String consumerId, final SubscriptionCommitContext com * @return {@code true} if nack successfully */ public boolean nackInternal( - final String consumerId, final SubscriptionCommitContext commitContext) { + final String consumerId, + final SubscriptionCommitContext commitContext, + final long invisibleDurationMs) { final AtomicBoolean nacked = new AtomicBoolean(false); inFlightEvents.compute( new Pair<>(consumerId, commitContext), @@ -508,7 +513,13 @@ public boolean nackInternal( this); } - ev.nack(); // now pollable + if (invisibleDurationMs == 0L) { + ev.nack(); // now pollable + } else { + ev.changeInvisibleDuration(invisibleDurationMs); + // NOTE: the current response index should not be reset here, but rather perform nack by + // inactive recycle after the invisible duration + } nacked.set(true); // no need to update inFlightEvents and prefetchingQueue diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java index ad274023f6a8..e9307348abd2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java @@ -70,6 +70,9 @@ public class SubscriptionEvent { private volatile String lastPolledConsumerId = null; private final AtomicLong lastPolledTimestamp = new AtomicLong(INVALID_TIMESTAMP); private final AtomicLong committedTimestamp = new AtomicLong(INVALID_TIMESTAMP); + private final AtomicLong invisibleDuration = + new AtomicLong( + SubscriptionConfig.getInstance().getSubscriptionRecycleUncommittedEventIntervalMs()); /** * Constructs a {@link SubscriptionEvent} with an initial response. @@ -186,6 +189,7 @@ public void recordLastPolledTimestamp() { do { currentTimestamp = lastPolledTimestamp.get(); + // monotonic increase newTimestamp = Math.max(currentTimestamp, System.currentTimeMillis()); } while (!lastPolledTimestamp.compareAndSet(currentTimestamp, newTimestamp)); } @@ -218,19 +222,22 @@ public boolean eagerlyPollable() { private boolean canRecycle() { // Recycle events that may not be able to be committed, i.e., those that have been polled but - // not committed within a certain period of time. - return System.currentTimeMillis() - lastPolledTimestamp.get() - > SubscriptionConfig.getInstance().getSubscriptionRecycleUncommittedEventIntervalMs(); + // not committed within the invisible duration. + return System.currentTimeMillis() - lastPolledTimestamp.get() > invisibleDuration.get(); } public void nack() { // reset current response index currentResponseIndex = 0; - // reset lastPolledTimestamp makes this event pollable + // reset lastPolledTimestamp makes this event eagerly pollable lastPolledTimestamp.set(INVALID_TIMESTAMP); } + public void changeInvisibleDuration(final long invisibleDurationMs) { + invisibleDuration.set(invisibleDurationMs); + } + public void recordLastPolledConsumerId(final String consumerId) { lastPolledConsumerId = consumerId; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java index 97401c73808c..c703fc532dc4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java @@ -352,6 +352,7 @@ private TPipeSubscribeResp handlePipeSubscribePoll(final PipeSubscribePollReq re final List events; final SubscriptionPollRequest request = req.getRequest(); final long maxBytes = (long) (request.getMaxBytes() * POLL_PAYLOAD_SIZE_EXCEED_THRESHOLD); + final long invisibleDurationMs = request.getInvisibleDurationMs(); try { final short requestType = request.getRequestType(); if (SubscriptionPollRequestType.isValidatedRequestType(requestType)) { @@ -399,7 +400,8 @@ private TPipeSubscribeResp handlePipeSubscribePoll(final PipeSubscribePollReq re req.getRequest()); // nack SubscriptionAgent.broker() - .commit(consumerConfig, Collections.singletonList(commitContext), true); + .commit( + consumerConfig, Collections.singletonList(commitContext), true, 0L); return null; } @@ -427,6 +429,7 @@ private TPipeSubscribeResp handlePipeSubscribePoll(final PipeSubscribePollReq re consumerConfig, response, req.getRequest()); + event.changeInvisibleDuration(invisibleDurationMs); return byteBuffer; } catch (final Exception e) { if (e instanceof SubscriptionPayloadExceedException) { @@ -446,7 +449,8 @@ private TPipeSubscribeResp handlePipeSubscribePoll(final PipeSubscribePollReq re } // nack SubscriptionAgent.broker() - .commit(consumerConfig, Collections.singletonList(commitContext), true); + .commit( + consumerConfig, Collections.singletonList(commitContext), true, 0L); return null; } }) @@ -516,27 +520,41 @@ private TPipeSubscribeResp handlePipeSubscribeCommitInternal(final PipeSubscribe return SUBSCRIPTION_MISSING_CUSTOMER_RESP; } - // commit (ack or nack) + // extract and check parameters final List commitContexts = req.getCommitContexts(); final boolean nack = req.isNack(); + final long invisibleDurationMs = req.getInvisibleDurationMs(); + if (!nack && invisibleDurationMs != 0) { + LOGGER.warn( + "Subscription: invalid commit request from consumer {} for commit contexts: {}, `invisibleDurationMs` is non-zero ({}) while `nack` is false, ack it anyway", + consumerConfig, + commitContexts, + invisibleDurationMs); + } + + // commit (ack or nack) final List successfulCommitContexts = - SubscriptionAgent.broker().commit(consumerConfig, commitContexts, nack); + SubscriptionAgent.broker() + .commit(consumerConfig, commitContexts, nack, invisibleDurationMs); if (Objects.equals(successfulCommitContexts.size(), commitContexts.size())) { LOGGER.info( - "Subscription: consumer {} commit successfully, commit contexts: {}, nack: {}", + "Subscription: consumer {} commit successfully, commit contexts: {}, nack: {}, invisible duration {} in ms", consumerConfig, commitContexts, - nack); + nack, + invisibleDurationMs); } else { LOGGER.warn( - "Subscription: consumer {} commit partially successful, commit contexts: {}, successful commit contexts: {}, nack: {}", + "Subscription: consumer {} commit partially successful, commit contexts: {}, successful commit contexts: {}, nack: {}, invisible duration {} in ms", consumerConfig, commitContexts, successfulCommitContexts, - nack); + nack, + invisibleDurationMs); } + // TODO: resp return PipeSubscribeCommitResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 4d29f22482a7..7288c88f97b5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -286,7 +286,10 @@ public class CommonConfig { private int subscriptionPollMaxBlockingTimeMs = 500; private int subscriptionSerializeMaxBlockingTimeMs = 100; private long subscriptionLaunchRetryIntervalMs = 1000; - private int subscriptionRecycleUncommittedEventIntervalMs = 600000; // 600s + + /** should be consistent with SubscriptionConsumer#DEFAULT_INVISIBLE_DURATION */ + private int subscriptionRecycleUncommittedEventIntervalMs = 600_000; // 600s + private long subscriptionReadFileBufferSize = 8 * MB; private long subscriptionReadTabletBufferSize = 8 * MB; private long subscriptionTsFileDeduplicationWindowSeconds = 120; // 120s