diff --git a/aeron-client/src/main/java/io/aeron/Aeron.java b/aeron-client/src/main/java/io/aeron/Aeron.java index 8bcf975c78..4f72f73ef0 100644 --- a/aeron-client/src/main/java/io/aeron/Aeron.java +++ b/aeron-client/src/main/java/io/aeron/Aeron.java @@ -29,7 +29,6 @@ import org.agrona.concurrent.broadcast.CopyBroadcastReceiver; import org.agrona.concurrent.ringbuffer.ManyToOneRingBuffer; import org.agrona.concurrent.ringbuffer.RingBuffer; -import org.agrona.concurrent.status.AtomicCounter; import org.agrona.concurrent.status.CountersReader; import java.io.File; @@ -73,7 +72,6 @@ public class Aeron implements AutoCloseable public static final int NULL_VALUE = -1; private static final VarHandle IS_CLOSED_VH; - static { try @@ -961,7 +959,6 @@ public static class Context extends CommonContext private long closeLingerDurationNs = Configuration.closeLingerDurationNs(); private ThreadFactory threadFactory = Thread::new; - private AtomicCounter mappedBytesCounter; /** * Perform a shallow copy of the object. @@ -1061,16 +1058,9 @@ else if (clientLock instanceof NoOpLock && !useConductorAgentInvoker) countersValuesBuffer(CncFileDescriptor.createCountersValuesBuffer(cncByteBuffer, cncMetaDataBuffer)); } - if (null == mappedBytesCounter) - { - mappedBytesCounter = new AtomicCounter(countersValuesBuffer(), 35); // Bytes currently mapped - } - - mappedBytesCounter.getAndAdd(cncByteBuffer.capacity()); - if (null == logBuffersFactory) { - logBuffersFactory = new MappedLogBuffersFactory(mappedBytesCounter); + logBuffersFactory = new MappedLogBuffersFactory(); } if (null == errorHandler) @@ -1780,26 +1770,11 @@ public PublicationErrorFrameHandler publicationErrorFrameHandler() */ public void close() { - if (null != mappedBytesCounter) - { - mappedBytesCounter.getAndAdd(-cncByteBuffer.capacity()); - } BufferUtil.free(cncByteBuffer); this.cncByteBuffer = null; super.close(); } - AtomicCounter mappedBytesCounter() - { - return mappedBytesCounter; - } - - Context mappedBytesCounter(final AtomicCounter mappedBytesCounter) - { - this.mappedBytesCounter = mappedBytesCounter; - return this; - } - /** * {@inheritDoc} */ @@ -1842,7 +1817,6 @@ public String toString() "\n resourceLingerDurationNs=" + resourceLingerDurationNs + "\n closeLingerDurationNs=" + closeLingerDurationNs + "\n threadFactory=" + threadFactory + - "\n mappedBytesCounter=" + mappedBytesCounter + "\n}"; } diff --git a/aeron-client/src/main/java/io/aeron/LogBuffers.java b/aeron-client/src/main/java/io/aeron/LogBuffers.java index a71c960f46..fba348d8b3 100644 --- a/aeron-client/src/main/java/io/aeron/LogBuffers.java +++ b/aeron-client/src/main/java/io/aeron/LogBuffers.java @@ -20,7 +20,6 @@ import org.agrona.CloseHelper; import org.agrona.LangUtil; import org.agrona.concurrent.UnsafeBuffer; -import org.agrona.concurrent.status.AtomicCounter; import java.io.IOException; import java.nio.ByteBuffer; @@ -51,8 +50,6 @@ public final class LogBuffers implements AutoCloseable private final ByteBuffer[] termBuffers = new ByteBuffer[PARTITION_COUNT]; private final UnsafeBuffer logMetaDataBuffer; private final MappedByteBuffer[] mappedByteBuffers; - private final AtomicCounter mappedBytesCounter; - private long logLength; /** * Construct the log buffers for a given log file. @@ -60,11 +57,6 @@ public final class LogBuffers implements AutoCloseable * @param logFileName to be mapped. */ public LogBuffers(final String logFileName) - { - this(logFileName, null); - } - - LogBuffers(final String logFileName, final AtomicCounter mappedBytesCounter) { int termLength = 0; FileChannel fileChannel = null; @@ -74,7 +66,7 @@ public LogBuffers(final String logFileName) try { fileChannel = FileChannel.open(Paths.get(logFileName), FILE_OPTIONS); - logLength = fileChannel.size(); + final long logLength = fileChannel.size(); if (logLength < LOG_META_DATA_LENGTH) { throw new IllegalStateException( @@ -161,12 +153,6 @@ public LogBuffers(final String logFileName) this.fileChannel = fileChannel; this.logMetaDataBuffer = logMetaDataBuffer; this.mappedByteBuffers = mappedByteBuffers; - this.mappedBytesCounter = mappedBytesCounter; - - if (null != mappedBytesCounter) - { - mappedBytesCounter.getAndAdd(logLength); - } } /** @@ -232,10 +218,6 @@ public void preTouch() public void close() { close(fileChannel, logMetaDataBuffer, mappedByteBuffers); - if (null != mappedBytesCounter) - { - mappedBytesCounter.getAndAdd(-logLength); - } } /** diff --git a/aeron-client/src/main/java/io/aeron/MappedLogBuffersFactory.java b/aeron-client/src/main/java/io/aeron/MappedLogBuffersFactory.java index 3ca0db8563..d3ab02e472 100644 --- a/aeron-client/src/main/java/io/aeron/MappedLogBuffersFactory.java +++ b/aeron-client/src/main/java/io/aeron/MappedLogBuffersFactory.java @@ -15,22 +15,13 @@ */ package io.aeron; -import org.agrona.concurrent.status.AtomicCounter; - /** * Default factory for mapping log buffers in the client. */ class MappedLogBuffersFactory implements LogBuffersFactory { - private final AtomicCounter mappedBytesCounter; - - MappedLogBuffersFactory(final AtomicCounter mappedBytesCounter) - { - this.mappedBytesCounter = mappedBytesCounter; - } - public LogBuffers map(final String logFileName) { - return new LogBuffers(logFileName, mappedBytesCounter); + return new LogBuffers(logFileName); } } diff --git a/aeron-client/src/test/java/io/aeron/AeronContextTest.java b/aeron-client/src/test/java/io/aeron/AeronContextTest.java index 96a6aed1fb..1063f0d5c8 100644 --- a/aeron-client/src/test/java/io/aeron/AeronContextTest.java +++ b/aeron-client/src/test/java/io/aeron/AeronContextTest.java @@ -15,14 +15,11 @@ */ package io.aeron; -import org.agrona.concurrent.status.AtomicCounter; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.NullAndEmptySource; import org.junit.jupiter.params.provider.ValueSource; import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.mock; class AeronContextTest { @@ -46,15 +43,4 @@ void shouldAssignClientName(final String clientName) context.clientName(clientName); assertSame(clientName, context.clientName()); } - - @Test - void shouldAssignMappedBytesCounter() - { - final Aeron.Context context = new Aeron.Context(); - assertNull(context.mappedBytesCounter()); - - final AtomicCounter counter = mock(AtomicCounter.class); - assertSame(context, context.mappedBytesCounter(counter)); - assertSame(counter, context.mappedBytesCounter()); - } } diff --git a/aeron-client/src/test/java/io/aeron/LogBuffersTest.java b/aeron-client/src/test/java/io/aeron/LogBuffersTest.java index c4867c8b59..46ef908b71 100644 --- a/aeron-client/src/test/java/io/aeron/LogBuffersTest.java +++ b/aeron-client/src/test/java/io/aeron/LogBuffersTest.java @@ -16,26 +16,19 @@ package io.aeron; import org.agrona.concurrent.UnsafeBuffer; -import org.agrona.concurrent.status.AtomicCounter; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.InOrder; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.StandardOpenOption; import static io.aeron.logbuffer.LogBufferDescriptor.*; import static org.hamcrest.CoreMatchers.*; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; class LogBuffersTest { @@ -95,68 +88,4 @@ void throwsIllegalStateExceptionIfLogFileSizeIsLessThanLogMetaDataLength(@TempDi assertEquals("Log file length less than min length of " + LOG_META_DATA_LENGTH + ": length=" + fileLength, exception.getMessage()); } - - @Test - void mapExistingFile(@TempDir final Path dir) throws IOException - { - final int termLength = TERM_MIN_LENGTH; - final int pageSize = PAGE_MIN_SIZE; - final Path logFile = createLogFile(dir, termLength, pageSize); - - try (LogBuffers logBuffers = new LogBuffers(logFile.toString())) - { - final UnsafeBuffer metaDataBuffer = logBuffers.metaDataBuffer(); - assertNotNull(metaDataBuffer); - assertEquals(termLength, termLength(metaDataBuffer)); - assertEquals(termLength, logBuffers.termLength()); - assertEquals(pageSize, pageSize(metaDataBuffer)); - } - } - - @Test - void mapFileAndCaptureMappedSize() throws IOException - { - final Path logFile = createLogFile(Files.createTempDirectory("test"), TERM_MAX_LENGTH, PAGE_MIN_SIZE * 4); - final long logFileSize = Files.size(logFile); - - final AtomicCounter mappedBytesCounter = mock(AtomicCounter.class); - final LogBuffers logBuffers = new LogBuffers(logFile.toString(), mappedBytesCounter); - - for (int i = 0; i < 5; i++) - { - logBuffers.duplicateTermBuffers(); - } - - logBuffers.close(); - - final InOrder inOrder = inOrder(mappedBytesCounter); - inOrder.verify(mappedBytesCounter).getAndAdd(logFileSize); - inOrder.verify(mappedBytesCounter).getAndAdd(-logFileSize); - inOrder.verifyNoMoreInteractions(); - } - - private static Path createLogFile(final Path dir, final int termLength, final int pageSize) throws IOException - { - final long logFileSize = (long)termLength * PARTITION_COUNT + LOG_META_DATA_LENGTH; - final UnsafeBuffer buffer = new UnsafeBuffer(ByteBuffer.allocateDirect(LOG_META_DATA_LENGTH)); - termLength(buffer, termLength); - pageSize(buffer, pageSize); - final Path logFile = dir.resolve("some.log"); - try (FileChannel fileChannel = - FileChannel.open(logFile, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW)) - { - final ByteBuffer byteBuffer = buffer.byteBuffer(); - byteBuffer.position(0).limit(LOG_META_DATA_LENGTH); - - long position = logFileSize - LOG_META_DATA_LENGTH; - do - { - position += fileChannel.write(byteBuffer, position); - } - while (byteBuffer.remaining() > 0); - } - - assertEquals(logFileSize, Files.size(logFile)); - return logFile; - } } diff --git a/aeron-driver/src/main/java/io/aeron/driver/buffer/MappedRawLog.java b/aeron-driver/src/main/java/io/aeron/driver/buffer/MappedRawLog.java index b107fa8074..54c67474e0 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/buffer/MappedRawLog.java +++ b/aeron-driver/src/main/java/io/aeron/driver/buffer/MappedRawLog.java @@ -121,7 +121,7 @@ class MappedRawLog implements RawLog preTouchPages(termBuffers, termLength, filePageSize); } - mappedBytesCounter.getAndAdd(logLength); + mappedBytesCounter.getAndAddOrdered(logLength); } } catch (final IOException ex) @@ -147,7 +147,7 @@ public boolean free() BufferUtil.free(mappedBuffers[i]); } - mappedBytesCounter.getAndAdd(-logLength); + mappedBytesCounter.getAndAddOrdered(-logLength); logMetaDataBuffer.wrap(0, 0); for (int i = 0; i < termBuffers.length; i++) diff --git a/aeron-system-tests/src/test/java/io/aeron/ClientContextTest.java b/aeron-system-tests/src/test/java/io/aeron/ClientContextTest.java index 8425806c58..3f2b170429 100644 --- a/aeron-system-tests/src/test/java/io/aeron/ClientContextTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/ClientContextTest.java @@ -27,7 +27,6 @@ import io.aeron.test.Tests; import io.aeron.test.driver.TestMediaDriver; import org.agrona.CloseHelper; -import org.agrona.collections.MutableInteger; import org.agrona.concurrent.NoOpLock; import org.agrona.concurrent.status.CountersReader; import org.junit.jupiter.api.AfterEach; @@ -41,11 +40,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; -import static io.aeron.driver.status.SystemCounterDescriptor.BYTES_CURRENTLY_MAPPED; -import static io.aeron.logbuffer.LogBufferDescriptor.LOG_META_DATA_LENGTH; -import static io.aeron.logbuffer.LogBufferDescriptor.PARTITION_COUNT; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; import static org.junit.jupiter.api.Assertions.*; @ExtendWith(InterruptingTestCallback.class) @@ -127,7 +121,7 @@ void shouldHaveUniqueCorrelationIdsAcrossMultipleClientsToTheSameDriver() } @ParameterizedTest - @ValueSource(strings = { "", "my-test-client" }) + @ValueSource(strings = {"", "my-test-client"}) @InterruptAfter(10) void shouldAddClientInfoToTheHeartbeatTimestampCounter(final String clientName) { @@ -222,69 +216,4 @@ void shouldRejectClientNameThatIsTooLong() AeronException.class, () -> new Aeron.Context().clientName(name).conclude()); assertEquals("ERROR - clientName length must <= 100", aeronException.getMessage()); } - - @Test - void logBuffersMappedOnTheClientSideShouldBeReportedViaSystemCounter() - { - final String channel = "aeron:ipc?term-length=128m"; - final int streamId = 42; - final Aeron.Context context = new Aeron.Context().aeronDirectoryName(mediaDriver.aeronDirectoryName()); - try (Aeron aeron = Aeron.connect(context.clone()); - ExclusivePublication publication = aeron.addExclusivePublication(channel, streamId); - Subscription subscription = aeron.addSubscription(channel, streamId)) - { - Tests.awaitConnected(publication); - Tests.awaitConnected(subscription); - - final long logBufferLength = (long)publication.termBufferLength() * PARTITION_COUNT + LOG_META_DATA_LENGTH; - final CountersReader countersReader = aeron.countersReader(); - final long originalMappedBytes = countersReader.getCounterValue(BYTES_CURRENTLY_MAPPED.id()); - assertThat(originalMappedBytes, is(greaterThan(logBufferLength * 2))); - - // LogBuffers are shared so now new mappings reported - for (int i = 0; i < 5; i++) - { - final Subscription sub = aeron.addSubscription(channel, streamId); - Tests.awaitConnected(sub); - } - - assertThat( - countersReader.getCounterValue(BYTES_CURRENTLY_MAPPED.id()), - is(originalMappedBytes)); - - try (Aeron client1 = Aeron.connect(context.clone()); - Aeron client2 = Aeron.connect(context.clone())) - { - final Subscription sub1 = client1.addSubscription(channel, streamId); - final Subscription sub2 = client2.addSubscription(channel, streamId); - final Subscription sub3 = client2.addSubscription(channel, streamId); - Tests.awaitConnected(sub1); - Tests.awaitConnected(sub2); - Tests.awaitConnected(sub3); - - assertThat(countersReader.getCounterValue(BYTES_CURRENTLY_MAPPED.id()), allOf( - greaterThan(originalMappedBytes + logBufferLength * 2), - lessThan(originalMappedBytes + logBufferLength * 3))); - - assertEquals(3, countConnectedClients(countersReader)); - } - - Tests.await(() -> 1 == countConnectedClients(countersReader)); - - assertThat(countersReader.getCounterValue(BYTES_CURRENTLY_MAPPED.id()), is(originalMappedBytes)); - } - } - - private static int countConnectedClients(final CountersReader countersReader) - { - final MutableInteger clientCount = new MutableInteger(); - countersReader.forEach((counterId, typeId, keyBuffer, label) -> - { - if (HeartbeatTimestamp.HEARTBEAT_TYPE_ID == typeId) - { - clientCount.increment(); - } - }); - return clientCount.get(); - } }