Skip to content

Commit

Permalink
improve for javadoc
Browse files Browse the repository at this point in the history
  • Loading branch information
VGalaxies committed Sep 13, 2024
1 parent 77485dd commit 91a5be0
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,40 +138,116 @@ public synchronized void close() {

/////////////////////////////// poll & commit ///////////////////////////////

/**
* Polls subscription messages from the subscribed topics with a specified timeout.
*
* @param timeout the maximum time to wait for messages, specified as a {@link Duration}.
* @return a list of {@link SubscriptionMessage} from the subscribed topics.
* @throws SubscriptionException if there is an error during polling.
*/
public List<SubscriptionMessage> poll(final Duration timeout) throws SubscriptionException {
return poll(Collections.emptySet(), timeout.toMillis(), DEFAULT_INVISIBLE_DURATION);
}

/**
* Polls subscription messages from the subscribed topics with a specified timeout.
*
* @param timeoutMs the maximum time to wait for messages, in milliseconds.
* @return a list of {@link SubscriptionMessage} from the subscribed topics.
* @throws SubscriptionException if there is an error during polling.
*/
public List<SubscriptionMessage> poll(final long timeoutMs) throws SubscriptionException {
return poll(Collections.emptySet(), timeoutMs, DEFAULT_INVISIBLE_DURATION);
}

/**
* Polls subscription messages from the subscribed topics with a specified timeout and invisible
* duration.
*
* @param timeout the maximum time to wait for messages, specified as a {@link Duration}.
* @param invisibleDuration the duration for which the polled messages will remain invisible,
* specified as a {@link Duration}.
* @return a list of {@link SubscriptionMessage} from the subscribed topics.
* @throws SubscriptionException if there is an error during polling.
*/
public List<SubscriptionMessage> poll(final Duration timeout, final Duration invisibleDuration)
throws SubscriptionException {
return poll(Collections.emptySet(), timeout.toMillis(), invisibleDuration);
}

/**
* Polls subscription messages from the subscribed topics with a specified timeout and invisible
* duration.
*
* @param timeoutMs the maximum time to wait for messages, in milliseconds.
* @param invisibleDuration the duration for which the polled messages will remain invisible,
* specified as a {@link Duration}.
* @return a list of {@link SubscriptionMessage} from the subscribed topics.
* @throws SubscriptionException if there is an error during polling.
*/
public List<SubscriptionMessage> poll(final long timeoutMs, final Duration invisibleDuration)
throws SubscriptionException {
return poll(Collections.emptySet(), timeoutMs, invisibleDuration);
}

/**
* Polls subscription messages from the specified topics with a specified timeout.
*
* @param topicNames the set of topic names to poll messages from. If empty, all subscribed topics
* will be polled.
* @param timeout the maximum time to wait for messages, specified as a {@link Duration}.
* @return a list of {@link SubscriptionMessage} from the specified topics.
* @throws SubscriptionException if there is an error during polling.
*/
public List<SubscriptionMessage> poll(final Set<String> topicNames, final Duration timeout)
throws SubscriptionException {
return poll(topicNames, timeout.toMillis(), DEFAULT_INVISIBLE_DURATION);
}

/**
* Polls subscription messages from the specified topics with a specified timeout.
*
* @param topicNames the set of topic names to poll messages from. If empty, all subscribed topics
* will be polled.
* @param timeoutMs the maximum time to wait for messages, in milliseconds.
* @return a list of {@link SubscriptionMessage} from the specified topics.
* @throws SubscriptionException if there is an error during polling.
*/
public List<SubscriptionMessage> poll(final Set<String> topicNames, final long timeoutMs)
throws SubscriptionException {
return poll(topicNames, timeoutMs, DEFAULT_INVISIBLE_DURATION);
}

/**
* Polls subscription messages from the specified topics with a specified timeout and invisible
* duration.
*
* @param topicNames the set of topic names to poll messages from. If empty, all subscribed topics
* will be polled.
* @param timeout the maximum time to wait for messages, specified as a {@link Duration}.
* @param invisibleDuration the duration for which the polled messages will remain invisible,
* specified as a {@link Duration}.
* @return a list of {@link SubscriptionMessage} from the specified topics.
* @throws SubscriptionException if there is an error during polling.
*/
public List<SubscriptionMessage> poll(
final Set<String> topicNames, final Duration timeout, final Duration invisibleDuration)
throws SubscriptionException {
return poll(topicNames, timeout.toMillis(), invisibleDuration);
}

/**
* Polls subscription messages from the specified topics with a specified timeout and invisible
* duration.
*
* @param topicNames the set of topic names to poll messages from. If empty, all subscribed topics
* will be polled.
* @param timeoutMs the maximum time to wait for messages, in milliseconds.
* @param invisibleDuration the duration for which the polled messages will remain invisible,
* specified as a {@link Duration}.
* @return a list of {@link SubscriptionMessage} from the specified topics.
* @throws SubscriptionException if there is an error during polling.
*/
public List<SubscriptionMessage> poll(
final Set<String> topicNames, final long timeoutMs, final Duration invisibleDuration)
throws SubscriptionException {
Expand Down Expand Up @@ -225,37 +301,88 @@ public List<SubscriptionMessage> poll(

/////////////////////////////// commit ///////////////////////////////

/**
* Commits the specified message synchronously.
*
* @param message the subscription message to commit.
* @throws SubscriptionException if there is an error during the commit.
*/
public void commitSync(final SubscriptionMessage message) throws SubscriptionException {
super.ack(Collections.singletonList(message));
}

/**
* Commits the specified messages synchronously.
*
* @param messages an iterable collection of subscription messages to commit.
* @throws SubscriptionException if there is an error during the commit.
*/
public void commitSync(final Iterable<SubscriptionMessage> messages)
throws SubscriptionException {
super.ack(messages);
}

/**
* Commits the specified message asynchronously.
*
* @param message the subscription message to commit.
* @return a {@link CompletableFuture} that completes when the commit operation is finished.
*/
public CompletableFuture<Void> commitAsync(final SubscriptionMessage message) {
return super.commitAsync(Collections.singletonList(message));
}

/**
* Commits the specified messages asynchronously.
*
* @param messages an iterable collection of subscription messages to commit.
* @return a {@link CompletableFuture} that completes when the commit operation is finished.
*/
public CompletableFuture<Void> commitAsync(final Iterable<SubscriptionMessage> messages) {
return super.commitAsync(messages);
}

/**
* Commits the specified message asynchronously with a callback upon completion.
*
* @param message the subscription message to commit.
* @param callback the callback to invoke upon completion of the commit operation.
*/
public void commitAsync(final SubscriptionMessage message, final AsyncCommitCallback callback) {
super.commitAsync(Collections.singletonList(message), callback);
}

/**
* Commits the specified messages asynchronously with a callback upon completion.
*
* @param messages an iterable collection of subscription messages to commit.
* @param callback the callback to invoke upon completion of the commit operation.
*/
public void commitAsync(
final Iterable<SubscriptionMessage> messages, final AsyncCommitCallback callback) {
super.commitAsync(messages, callback);
}

/**
* Changes the invisible duration of the specified message.
*
* @param message the subscription message for which to change the invisible duration.
* @param invisibleDuration the new invisible duration, specified as a {@link Duration}. If the
* message is already visible to the consumer, the behavior is undefined.
*/
public void changeInvisibleDuration(
final SubscriptionMessage message, final Duration invisibleDuration) {
super.nackMessages(Collections.singletonList(message), invisibleDuration.toMillis());
}

/**
* Changes the invisible duration of the specified messages.
*
* @param messages an iterable collection of subscription messages for which to change the
* invisible duration.
* @param invisibleDuration the new invisible duration, specified as a {@link Duration}. If any of
* the messages are already visible to the consumer, the behavior is undefined.
*/
public void changeInvisibleDuration(
final Iterable<SubscriptionMessage> messages, final Duration invisibleDuration) {
super.nackMessages(messages, invisibleDuration.toMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,10 @@ public class CommonConfig {
private int subscriptionPollMaxBlockingTimeMs = 500;
private int subscriptionSerializeMaxBlockingTimeMs = 100;
private long subscriptionLaunchRetryIntervalMs = 1000;

/** should be consistent with SubscriptionConsumer#DEFAULT_INVISIBLE_DURATION */
private int subscriptionRecycleUncommittedEventIntervalMs = 600_000; // 600s

private long subscriptionReadFileBufferSize = 8 * MB;
private long subscriptionReadTabletBufferSize = 8 * MB;
private long subscriptionTsFileDeduplicationWindowSeconds = 120; // 120s
Expand Down

0 comments on commit 91a5be0

Please sign in to comment.