Skip to content

Commit

Permalink
[Java] Await client error before asserting static counter state.
Browse files Browse the repository at this point in the history
  • Loading branch information
vyazelenko committed Sep 18, 2024
1 parent 17bca14 commit 6f3c21f
Showing 1 changed file with 18 additions and 15 deletions.
33 changes: 18 additions & 15 deletions aeron-system-tests/src/test/java/io/aeron/CounterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static io.aeron.Aeron.NULL_VALUE;
import static org.hamcrest.CoreMatchers.allOf;
Expand Down Expand Up @@ -391,17 +392,17 @@ void shouldReturnErrorIfANonStaticCounterExistsForTypeIdRegistrationId()
final RegistrationException registrationException = assertThrowsExactly(
RegistrationException.class,
() -> clientA.addStaticCounter(
COUNTER_TYPE_ID,
keyBuffer,
0,
keyBuffer.capacity(),
labelBuffer,
0,
COUNTER_LABEL.length(),
counter.registrationId()));
COUNTER_TYPE_ID,
keyBuffer,
0,
keyBuffer.capacity(),
labelBuffer,
0,
COUNTER_LABEL.length(),
counter.registrationId()));
assertThat(registrationException.getMessage(), allOf(
containsString("cannot add static counter, because a non-static counter exists (counterId=" +
counter.id() + ") for typeId=" + COUNTER_TYPE_ID + " and registrationId=" + counter.registrationId()),
counter.id() + ") for typeId=" + COUNTER_TYPE_ID + " and registrationId=" + counter.registrationId()),
containsString("errorCodeValue=11")));
}

Expand Down Expand Up @@ -444,13 +445,13 @@ void shouldNotCloseStaticCounterIfClientTimesOut()
{
final AvailableCounterHandler availableCounterHandler = mock(AvailableCounterHandler.class);
final UnavailableCounterHandler unavailableCounterHandler = mock(UnavailableCounterHandler.class);
final ErrorHandler errorHandler = mock(ErrorHandler.class);
final AtomicReference<Throwable> error = new AtomicReference<>();
try (Aeron aeron = Aeron.connect(new Aeron.Context()
.aeronDirectoryName(driver.aeronDirectoryName())
.useConductorAgentInvoker(true)
.availableCounterHandler(availableCounterHandler)
.unavailableCounterHandler(unavailableCounterHandler)
.errorHandler(errorHandler)))
.errorHandler(error::set)))
{
final AgentInvoker conductorAgentInvoker = aeron.conductorAgentInvoker();
assertNotNull(conductorAgentInvoker);
Expand All @@ -468,10 +469,12 @@ void shouldNotCloseStaticCounterIfClientTimesOut()

Tests.await(() -> 1 == countersReader.getCounterValue(SystemCounterDescriptor.CLIENT_TIMEOUTS.id()));

conductorAgentInvoker.invoke();
final ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
verify(errorHandler, timeout(5000L)).onError(captor.capture());
final Throwable timeoutException = captor.getValue();
while (null == error.get())
{
conductorAgentInvoker.invoke();
Thread.yield();
}
final Throwable timeoutException = error.get();
if (timeoutException instanceof ClientTimeoutException)
{
assertEquals("FATAL - client timeout from driver", timeoutException.getMessage());
Expand Down

0 comments on commit 6f3c21f

Please sign in to comment.