Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TH2-2212] merged with dev 5 #283

Merged
merged 60 commits into from
Mar 5, 2024
Merged
Changes from 1 commit
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
7cf0aec
[TH2-2212] fixed increasing recovery delay, added delay time deviatio…
Xanclry Jun 14, 2022
d4949c1
[TH2-2212] fixed configurations tests
Xanclry Jun 15, 2022
fa2f55f
[TH2-2212] simplified logic of connection recovery triggering
Xanclry Jun 16, 2022
9ae7680
Merge branch 'master' into TH2-2212-common-cannot-recover-channel-lev…
Xanclry Jun 16, 2022
16d1cf2
[TH2-2212] added retry for channel level errors
Xanclry Jun 21, 2022
6b85ae2
[TH2-2212] minor logger fixes, readme update
Xanclry Jun 21, 2022
dad038c
[TH2-2212] channel level errors logging
Xanclry Jun 23, 2022
0261f2f
[TH2-2212] fix: added throws
Xanclry Jun 23, 2022
92129f3
Merge branch 'master' into TH2-2212-common-cannot-recover-channel-lev…
Xanclry Jun 23, 2022
3ca6ddd
[TH2-2212] version bump, readme edited
Xanclry Jun 23, 2022
eb394c2
[TH2-2212] added interrupted exception to basicConsume
Xanclry Jun 24, 2022
0547a17
[TH2-2212] title version bump
Xanclry Jun 24, 2022
7ed4122
[TH2-2212] added integration tests
Xanclry Nov 7, 2022
6b643fd
[TH2-2212] merged master
Xanclry Nov 7, 2022
e4a9ad6
[TH2-2212] recoverable subscriptions, added integrations tests
Xanclry Nov 10, 2022
c22cef6
[TH2-2212] publish-consume test, refactoring
Xanclry Nov 11, 2022
8be1815
[TH2-2212] rabbit container reusing in the tests
Xanclry Nov 14, 2022
aaf58d1
[TH2-2212] added test for ack timeout among several channels
Xanclry Nov 14, 2022
1e9de98
[TH2-2212] added one more test
Xanclry Nov 14, 2022
23bc510
[TH2-2212] added more complexity to the several channels test
Xanclry Nov 15, 2022
3bd042c
[TH2-2212] refactoring
Xanclry Nov 16, 2022
0efa181
[TH2-2212] merge master
Xanclry Nov 16, 2022
17dc5a5
[TH2-2212] refactor
Xanclry Nov 17, 2022
0cbfb9b
[TH2-2212] removed unnecessary threads in the tests
Xanclry Nov 18, 2022
4b75b33
[TH2-2212] threads handling
Xanclry Nov 18, 2022
2f63aa7
[TH2-2212] assertion fix
Xanclry Nov 18, 2022
7ed2dd6
[TH2-2212] refactored
Xanclry Nov 18, 2022
19182f9
Merge branch 'TH2-2212-common-cannot-recover-channel-level-exceptions…
Nikita-Smirnov-Exactpro Nov 18, 2022
f1900b7
[TH2-2212] refactored 'handles ack timeout' test
Nikita-Smirnov-Exactpro Nov 18, 2022
8b81bf6
[TH2-2212] refactored 'thread interruption test' test
Nikita-Smirnov-Exactpro Nov 18, 2022
b91cc5c
[TH2-2212] fix `thread interruption test`
Xanclry Nov 18, 2022
9cbd388
[TH2-4466] fixed channels amount after recovery. Added related tests
Xanclry Nov 24, 2022
822e274
merge attempt
Oct 19, 2023
34c9ebe
fixes after merge
Oct 23, 2023
b0183b6
Merge branch 'dev-version-5' into TH2-2212-merged-with-dev-5
Oct 23, 2023
3d8f7ef
fixes after review
Oct 23, 2023
0b4ccbf
`isAutomaticRecoveryEnabled` added to `ConnectionManagerConfiguration`
Oct 25, 2023
767ee07
fixes after review
Oct 26, 2023
57b2d02
fix: cancelled consumer channel resubscribed during recovery
Oct 30, 2023
7fc34cc
fixed catching ShutdownSignalException exceptions
Oct 31, 2023
0afeda6
Merge branch 'dev-version-5' into TH2-2212-merged-with-dev-5
Nov 1, 2023
3c2e1d7
channel is not recovered by ConnectionManager if it's connection is c…
Nov 6, 2023
0def228
recovering channel after NOT_FOUND error added
Nov 7, 2023
e79d84b
comment proofreading
lumber1000 Nov 7, 2023
385e484
fixes after review
Nov 7, 2023
3a29537
locking reworked
Nov 9, 2023
3fb9036
test added to TestConnectionManager (multiple subscribers in parallel)
Nov 13, 2023
41c247e
unnecessary `isSubscribed` flag reset removed
Nov 13, 2023
28b9e4b
`recoverSubscriptionsOfChannel` refactored
Nov 13, 2023
49ecdb6
Merge branch 'dev-version-5' into TH2-2212-merged-with-dev-5
Nov 13, 2023
672f098
sync with dev-version-5
Nov 13, 2023
50e8891
changes after review
Nov 16, 2023
04df9ac
locking in `retryingPublishWithLock`
Nov 17, 2023
df85890
README.md
Nov 17, 2023
d9f110b
recoverSubscriptionsOfChannel refactored
Nov 17, 2023
5949809
locking
Nov 20, 2023
04ede78
Merge branch 'dev-version-5' into TH2-2212-merged-with-dev-5
Mar 4, 2024
eb2ef35
deps updated
Mar 4, 2024
945d364
test fixed
Mar 4, 2024
4835df2
test fixed
Mar 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixed catching ShutdownSignalException exceptions
review fixes
  • Loading branch information
Oleg Smelov committed Oct 31, 2023
commit 7fc34cc76ca8d5bd3aa02c591333691dba32d292
Original file line number Diff line number Diff line change
@@ -276,15 +276,14 @@ private void recoverSubscriptionsOfChannel(int channelNumber) {
SubscriptionCallbacks subscriptionCallbacks = channelHolder.subscriptionCallbacks;
if (subscriptionCallbacks != null) {
channelsByPin.remove(pinId);
OptimumCode marked this conversation as resolved.
Show resolved Hide resolved
if (channelHolder.isSubscribed) {
if (channelHolder.subscribed()) {
LOGGER.info("Changing channel for holder with pin id: " + pinId.toString());
basicConsume(pinId.queue, subscriptionCallbacks.deliverCallback, subscriptionCallbacks.cancelCallback);
}
}
}
} catch (IOException e) {
LOGGER.warn("Exception during channel's subscriptions recovery", e);
throw new RuntimeException(e);
LOGGER.warn("Failed to recovery channel's subscriptions", e);
OptimumCode marked this conversation as resolved.
Show resolved Hide resolved
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -449,7 +448,6 @@ public void confirm() throws IOException {
}
}, cancelCallback), configuration);

holder.isSubscribed = true;
return new RabbitMqSubscriberMonitor(holder, queue, tag, this::basicCancel);
}

@@ -606,10 +604,7 @@ public RabbitMqSubscriberMonitor(ChannelHolder holder, String queue, String tag,

@Override
public void unsubscribe() throws IOException {
holder.withLock(false, channel -> {
action.execute(channel, tag);
holder.isSubscribed = false;
});
holder.unsubscribeWithLock(tag, action);
}
}

@@ -684,7 +679,7 @@ private static class ChannelHolder {
@GuardedBy("lock")
private Channel channel;
@GuardedBy("lock")
public boolean isSubscribed = false;
private boolean isSubscribed = false;

public ChannelHolder(
Supplier<Channel> supplier,
@@ -696,6 +691,7 @@ public ChannelHolder(
this.maxCount = maxCount;
this.subscriptionCallbacks = null;
}

public ChannelHolder(
Supplier<Channel> supplier,
BiConsumer<ShutdownNotifier, Boolean> reconnectionChecker,
@@ -753,26 +749,42 @@ public void retryingPublishWithLock(ChannelConsumer consumer, ConnectionManagerC
}
}

public <T> T retryingConsumeWithLock(ChannelMapper<T> mapper, ConnectionManagerConfiguration configuration) throws InterruptedException {
public <T> T retryingConsumeWithLock(ChannelMapper<T> mapper, ConnectionManagerConfiguration configuration) throws InterruptedException, IOException {
lock.lock();
try {
Iterator<RetryingDelay> iterator = null;
Channel tempChannel = getChannel();
while (true) {
try {
return mapper.map(tempChannel);
var tag = mapper.map(tempChannel);
isSubscribed = true;
return tag;
} catch (IOException e) {
iterator = handleAndSleep(configuration, iterator, "Retrying consume", e);
} catch (ShutdownSignalException e) {
iterator = handleAndSleep(configuration, iterator, "Retrying consume", e);
tempChannel = recreateChannel();
var reason = tempChannel.getCloseReason();
if (reason != null) {
if (reason.getMessage().contains("reply-text=RESOURCE_LOCKED")) {
throw e;
}
tempChannel = recreateChannel();
}
}
}
} finally {
lock.unlock();
}
}

public void unsubscribeWithLock(String tag, CancelAction action) throws IOException {
lock.lock();
try {
action.execute(channel, tag);
isSubscribed = false;
} finally {
lock.unlock();
}
}

@NotNull
private static Iterator<RetryingDelay> handleAndSleep(
ConnectionManagerConfiguration configuration,
@@ -869,6 +881,10 @@ private Channel getChannel(boolean waitForRecovery) {
reconnectionChecker.accept(channel, waitForRecovery);
return channel;
}

public boolean subscribed() {
return isSubscribed;
}
}

private interface ChannelMapper<T> {
Original file line number Diff line number Diff line change
@@ -50,6 +50,8 @@ import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.testcontainers.containers.RabbitMQContainer
import org.testcontainers.utility.MountableFile
import java.io.IOException
import kotlin.test.assertFailsWith

@IntegrationTest
class TestConnectionManager {
@@ -406,13 +408,9 @@ class TestConnectionManager {
val queueName = "queue5"
val amqpPort = 5672
val container = object : RabbitMQContainer(RABBITMQ_IMAGE_NAME) {
fun addFixedPort(hostPort: Int, containerPort: Int) {
super.addFixedExposedPort(hostPort, containerPort)
}
init { super.addFixedExposedPort(amqpPort, amqpPort) }
}

container
.addFixedPort(amqpPort, amqpPort)
container
.withQueue(queueName)
.use {
@@ -821,11 +819,10 @@ class TestConnectionManager {
createConnectionManager(rabbitMQContainer).use { firstManager ->
createConnectionManager(rabbitMQContainer).use { secondManager ->
val queue = firstManager.queueDeclare()
val consumerThread = thread { secondManager.basicConsume(queue, { _, _, _ -> }, {}) }
consumerThread.join(5_000)
val isAlive = consumerThread.isAlive
consumerThread.stop()
assertTrue(isAlive) { "Another connection can subscribe to the $queue queue" }

assertFailsWith<IOException>("Another connection can subscribe to the $queue queue") {
secondManager.basicConsume(queue, { _, _, _ -> }, {})
}

extracted(firstManager, secondManager, queue, 3)
extracted(firstManager, secondManager, queue, 6)
7 changes: 6 additions & 1 deletion src/test/resources/rabbitmq_it.conf
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
consumer_timeout = 5000
# According to rabbitMQ docs: Values lower than one minute are not supported
# Whether the timeout should be enforced is evaluated periodically, at one minute intervals
# (https://www.rabbitmq.com/consumers.html#acknowledgement-timeout)
# Actually, timeouts less than a minute are applied as expected contrary to the documentation (RabbitMQ 3.12.7).
# Using small timeouts to reduce testing time
consumer_timeout = 1000
loopback_users.guest = false
Loading