Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription: support dynamic adjustment of message's invisibility time through poll and changeInvisibleDuration interfaces #13511

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.subscription.it.local;

import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import org.apache.iotdb.session.subscription.SubscriptionSession;
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;

import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
import static org.junit.Assert.fail;

@RunWith(IoTDBTestRunner.class)
@Category({LocalStandaloneIT.class})
public class IoTDBSubscriptionInvisibleDurationIT extends AbstractSubscriptionLocalIT {

private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSubscriptionInvisibleDurationIT.class);

private static final Duration INVISIBLE_DURATION = Duration.ofSeconds(5L);
private static final int RECONSUME_COUNT = 3;

@Test
public void testReconsumeByPoll() throws Exception {
testInvisibleDurationInternal(
(consumer, rowCount, reconsumeCount) -> {
LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time
final List<SubscriptionMessage> messages;
if (reconsumeCount.incrementAndGet() < RECONSUME_COUNT) {
messages =
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS, INVISIBLE_DURATION);
} else {
messages = consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
}
for (final SubscriptionMessage message : messages) {
for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) {
while (dataSet.hasNext()) {
dataSet.next();
rowCount.addAndGet(1);
}
}
}
if (reconsumeCount.get() >= RECONSUME_COUNT) {
consumer.commitSync(messages);
}
});
}

@Test
public void testReconsumeByChangeInvisibleDuration() throws Exception {
testInvisibleDurationInternal(
(consumer, rowCount, reconsumeCount) -> {
LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); // wait some time
final List<SubscriptionMessage> messages =
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
for (final SubscriptionMessage message : messages) {
for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) {
while (dataSet.hasNext()) {
dataSet.next();
rowCount.addAndGet(1);
}
}
}
if (reconsumeCount.incrementAndGet() < RECONSUME_COUNT) {
consumer.changeInvisibleDuration(messages, INVISIBLE_DURATION);
} else {
consumer.commitSync(messages);
}
});
}

private void testInvisibleDurationInternal(final ReconsumeAction action) throws Exception {
// Insert some historical data
try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
session.createDatabase("root.db");
for (int i = 0; i < 100; ++i) {
session.executeNonQueryStatement(
String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
}
session.executeNonQueryStatement("flush");
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}

// Create topic
final String topicName = "topic1";
final String host = EnvFactory.getEnv().getIP();
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
session.open();
final Properties config = new Properties();
config.put(TopicConstant.PATTERN_KEY, "root.db.d1.s1");
session.createTopic(topicName, config);
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}

// Subscription
final AtomicInteger rowCount = new AtomicInteger();
final AtomicInteger reconsumeCount = new AtomicInteger(0);
final AtomicBoolean isClosed = new AtomicBoolean(false);
final Thread thread =
new Thread(
() -> {
try (final SubscriptionPullConsumer consumer =
new SubscriptionPullConsumer.Builder()
.host(host)
.port(port)
.consumerId("c1")
.consumerGroupId("cg1")
.autoCommit(false)
.buildPullConsumer()) {
consumer.open();
consumer.subscribe(topicName);
while (!isClosed.get()) {
action.apply(consumer, rowCount, reconsumeCount);
}
consumer.unsubscribe(topicName);
} catch (final Exception e) {
e.printStackTrace();
// Avoid failure
} finally {
LOGGER.info("consumer exiting...");
}
},
String.format("%s - consumer", testName.getDisplayName()));
thread.start();

// Check row count
try {
// Keep retrying if there are execution failures
AWAIT.untilAsserted(() -> Assert.assertEquals(100 * RECONSUME_COUNT, rowCount.get()));
} catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
isClosed.set(true);
thread.join();
}
}

@FunctionalInterface
private interface ReconsumeAction {
void apply(
SubscriptionPullConsumer consumer, AtomicInteger rowCount, AtomicInteger reconsumeCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,20 @@ public class SubscriptionPollRequest {
/** The maximum size, in bytes, for the response payload. */
private final transient long maxBytes;

/** The duration in milliseconds for polled messages should remain invisible. */
private final transient long invisibleDurationMs;

public SubscriptionPollRequest(
final short requestType,
final SubscriptionPollPayload payload,
final long timeoutMs,
final long maxBytes) {
final long maxBytes,
final long invisibleDurationMs) {
this.requestType = requestType;
this.payload = payload;
this.timeoutMs = timeoutMs;
this.maxBytes = maxBytes;
this.invisibleDurationMs = invisibleDurationMs;
}

public short getRequestType() {
Expand All @@ -68,6 +73,10 @@ public long getMaxBytes() {
return maxBytes;
}

public long getInvisibleDurationMs() {
return invisibleDurationMs;
}

//////////////////////////// serialization ////////////////////////////

public static ByteBuffer serialize(final SubscriptionPollRequest request) throws IOException {
Expand All @@ -83,6 +92,7 @@ private void serialize(final DataOutputStream stream) throws IOException {
payload.serialize(stream);
ReadWriteIOUtils.write(timeoutMs, stream);
ReadWriteIOUtils.write(maxBytes, stream);
ReadWriteIOUtils.write(invisibleDurationMs, stream);
}

public static SubscriptionPollRequest deserialize(final ByteBuffer buffer) {
Expand All @@ -109,7 +119,9 @@ public static SubscriptionPollRequest deserialize(final ByteBuffer buffer) {

final long timeoutMs = ReadWriteIOUtils.readLong(buffer);
final long maxBytes = ReadWriteIOUtils.readLong(buffer);
return new SubscriptionPollRequest(requestType, payload, timeoutMs, maxBytes);
final long invisibleDurationMs = ReadWriteIOUtils.readLong(buffer);
return new SubscriptionPollRequest(
requestType, payload, timeoutMs, maxBytes, invisibleDurationMs);
}

/////////////////////////////// object ///////////////////////////////
Expand All @@ -124,6 +136,8 @@ public String toString() {
+ timeoutMs
+ ", maxBytes="
+ maxBytes
+ ", invisibleDurationMs="
+ invisibleDurationMs
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ public class SubscriptionPollResponse {

private final transient SubscriptionCommitContext commitContext;

/**
* The subscription poll request associated with this response.
*
* <p>This field is set when the client deserializes the RPC response into {@code
* SubscriptionPollResponse}, linking this response to the corresponding request.
*/
private SubscriptionPollRequest request;

public SubscriptionPollResponse(
final short responseType,
final SubscriptionPollPayload payload,
Expand All @@ -59,6 +67,14 @@ public SubscriptionCommitContext getCommitContext() {
return commitContext;
}

public void setRequest(final SubscriptionPollRequest request) {
this.request = request;
}

public SubscriptionPollRequest getRequest() {
return request;
}

/////////////////////////////// de/ser ///////////////////////////////

public static ByteBuffer serialize(final SubscriptionPollResponse response) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,29 @@ public class PipeSubscribeCommitReq extends TPipeSubscribeReq {

private transient List<SubscriptionCommitContext> commitContexts = new ArrayList<>();

/**
* Indicates the acknowledgement status of the messages corresponding to {@link
* PipeSubscribeCommitReq#commitContexts}.
*
* <p>If {@code nack} is {@code false}, it indicates an acknowledgement (ack) meaning the
* consumption of messages was successful.
*
* <p>If {@code nack} is {@code true}, it indicates a negative acknowledgement (nack) meaning the
* consumption of messages was not successful.
*/
private transient boolean nack;

/**
* The duration in milliseconds for which messages corresponding to {@link
* PipeSubscribeCommitReq#commitContexts} should remain invisible.
*
* <p>This field is effective only when {@link PipeSubscribeCommitReq#nack} is {@code true}.
*
* <p>If is zero, the messages will be immediately re-consumed. Otherwise, it sets the invisible
* duration for the messages.
*/
private transient long invisibleDurationMs;

public List<SubscriptionCommitContext> getCommitContexts() {
return commitContexts;
}
Expand All @@ -46,14 +67,21 @@ public boolean isNack() {
return nack;
}

public long getInvisibleDurationMs() {
return invisibleDurationMs;
}

/////////////////////////////// Thrift ///////////////////////////////

/**
* Serialize the incoming parameters into `PipeSubscribeCommitReq`, called by the subscription
* client.
*/
public static PipeSubscribeCommitReq toTPipeSubscribeReq(
final List<SubscriptionCommitContext> commitContexts, final boolean nack) throws IOException {
final List<SubscriptionCommitContext> commitContexts,
final boolean nack,
final long invisibleDurationMs)
throws IOException {
final PipeSubscribeCommitReq req = new PipeSubscribeCommitReq();

req.commitContexts = commitContexts;
Expand All @@ -68,6 +96,7 @@ public static PipeSubscribeCommitReq toTPipeSubscribeReq(
commitContext.serialize(outputStream);
}
ReadWriteIOUtils.write(nack, outputStream);
ReadWriteIOUtils.write(invisibleDurationMs, outputStream);
req.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
}

Expand All @@ -84,6 +113,7 @@ public static PipeSubscribeCommitReq fromTPipeSubscribeReq(final TPipeSubscribeR
req.commitContexts.add(SubscriptionCommitContext.deserialize(commitReq.body));
}
req.nack = ReadWriteIOUtils.readBool(commitReq.body);
req.invisibleDurationMs = ReadWriteIOUtils.readLong(commitReq.body);
}

req.version = commitReq.version;
Expand All @@ -106,13 +136,14 @@ public boolean equals(final Object obj) {
final PipeSubscribeCommitReq that = (PipeSubscribeCommitReq) obj;
return Objects.equals(this.commitContexts, that.commitContexts)
&& Objects.equals(this.nack, that.nack)
&& Objects.equals(this.invisibleDurationMs, that.invisibleDurationMs)
&& this.version == that.version
&& this.type == that.type
&& Objects.equals(this.body, that.body);
}

@Override
public int hashCode() {
return Objects.hash(commitContexts, nack, version, type, body);
return Objects.hash(commitContexts, nack, invisibleDurationMs, version, type, body);
}
}
Loading
Loading