diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/ClusterTool.java b/aeron-cluster/src/main/java/io/aeron/cluster/ClusterTool.java index cf4177548a..300e8001f3 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/ClusterTool.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/ClusterTool.java @@ -160,6 +160,20 @@ public class ClusterTool action(operator::seedRecordingLogFromSnapshot), "creates a new recording log based on the latest valid snapshot.")); + COMMANDS.put("create-empty-service-snapshot", new ClusterToolCommand( + action(operator::createEmptyServiceSnapshot), + "creates an empty service snapshot based on the latest consensus module snapshot.")); + + COMMANDS.put("add-service-snapshot", new ClusterToolCommand((clusterDir, out, args) -> + { + if (args.length < 3) + { + printHelp(COMMANDS, HELP_PREFIX); + return -1; + } + return operator.addServiceSnapshot(out, clusterDir, Integer.parseInt(args[2])); + }, "Adds a new snapshot with the specified recording id and with the next available service id.")); + COMMANDS.put("errors", new ClusterToolCommand( action(operator::errors), "prints Aeron and cluster component error logs.")); @@ -321,6 +335,32 @@ public static void seedRecordingLogFromSnapshot(final File clusterDir) BACKWARD_COMPATIBLE_OPERATIONS.seedRecordingLogFromSnapshot(clusterDir); } + /** + * Create a new/empty service snapshot recording based on the most recent snapshot. + * + * @param clusterDir where the cluster is running. + * @param out to print the output to. + * @return {@code true} if snapshot is created or {@code false} if it is not. + */ + public static boolean createEmptyServiceSnapshot(final File clusterDir, final PrintStream out) + { + return BACKWARD_COMPATIBLE_OPERATIONS.createEmptyServiceSnapshot(clusterDir, out) == SUCCESS; + } + + /** + * Add a new snapshot entry to the recording log with the specified recording id. + * The new snapshot entry will use the next highest service id. + * + * @param out to print the output to. + * @param clusterDir where the cluster is running. + * @param recordingId the recording id to associate with the new snapshot entry + * @return {@code true} if the entry is added or {@code false} if it is not. + */ + public static boolean addServiceSnapshot(final PrintStream out, final File clusterDir, final int recordingId) + { + return BACKWARD_COMPATIBLE_OPERATIONS.addServiceSnapshot(out, clusterDir, recordingId) == SUCCESS; + } + /** * Print out the errors in the error logs for the cluster components. * diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/ClusterToolOperator.java b/aeron-cluster/src/main/java/io/aeron/cluster/ClusterToolOperator.java index f2bbf7db94..6cc0111642 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/ClusterToolOperator.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/ClusterToolOperator.java @@ -17,11 +17,13 @@ package io.aeron.cluster; import static io.aeron.Aeron.NULL_VALUE; +import static io.aeron.archive.codecs.SourceLocation.LOCAL; import static java.nio.ByteOrder.LITTLE_ENDIAN; import static java.nio.charset.StandardCharsets.US_ASCII; import static java.nio.file.StandardCopyOption.*; import static java.nio.file.StandardOpenOption.*; import static org.agrona.Strings.isEmpty; +import static org.agrona.concurrent.status.CountersReader.NULL_COUNTER_ID; import java.io.File; import java.io.IOException; @@ -36,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.*; +import io.aeron.archive.status.RecordingPos; import org.agrona.BufferUtil; import org.agrona.DirectBuffer; import org.agrona.IoUtil; @@ -127,7 +130,14 @@ protected int pid(final File clusterDir, final PrintStream out) */ protected int recoveryPlan(final PrintStream out, final File clusterDir, final int serviceCount) { - try (AeronArchive archive = AeronArchive.connect(); + final ClusterNodeControlProperties properties = loadControlProperties(clusterDir); + + final AeronArchive.Context archiveCtx = new AeronArchive.Context() + .controlRequestChannel("aeron:ipc") + .controlResponseChannel("aeron:ipc"); + + try (Aeron aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(properties.aeronDirectoryName)); + AeronArchive archive = AeronArchive.connect(archiveCtx.aeron(aeron)); RecordingLog recordingLog = new RecordingLog(clusterDir, false)) { out.println(recordingLog.createRecoveryPlan(archive, serviceCount, Aeron.NULL_VALUE)); @@ -255,6 +265,130 @@ protected int seedRecordingLogFromSnapshot(final File clusterDir) return SUCCESS; } + protected int createEmptyServiceSnapshot(final File clusterDir, final PrintStream out) + { + final RecordingLog.Entry entry = findLatestValidSnapshot(clusterDir); + if (null == entry) + { + out.println("Snapshot not found"); + return FAILURE; + } + + final ClusterNodeControlProperties properties = loadControlProperties(clusterDir); + + final AeronArchive.Context archiveCtx = new AeronArchive.Context() + .controlRequestChannel("aeron:ipc") + .controlResponseChannel("aeron:ipc"); + + try (Aeron aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(properties.aeronDirectoryName)); + AeronArchive archive = AeronArchive.connect(archiveCtx.aeron(aeron))) + { + if (null == archive) + { + out.println("unable to connect to Archive"); + return FAILURE; + } + + final int subscriptionSessionId = (int)archive.startReplay( + entry.recordingId, 0, AeronArchive.NULL_LENGTH, toolChannel, toolStreamId); + + final String replayChannel = ChannelUri.addSessionId(toolChannel, subscriptionSessionId); + try (Subscription subscription = aeron.addSubscription(replayChannel, toolStreamId); + ExclusivePublication publication = aeron.addExclusivePublication(toolChannel, toolStreamId)) + { + final int publicationSessionId = publication.sessionId(); + final String recordingChannel = ChannelUri.addSessionId(toolChannel, publicationSessionId); + archive.startRecording(recordingChannel, toolStreamId, LOCAL, true); + + final CountersReader counters = aeron.countersReader(); + final long archiveId = archive.archiveId(); + int counterId = RecordingPos.findCounterIdBySession(counters, publicationSessionId, archiveId); + while (NULL_COUNTER_ID == counterId) + { + Thread.yield(); + archive.checkForErrorResponse(); + counterId = RecordingPos.findCounterIdBySession(counters, publicationSessionId, archiveId); + } + final long recordingId = RecordingPos.getRecordingId(counters, counterId); + out.println("New snapshot recording id: " + recordingId); + + Image image; + while ((image = subscription.imageBySessionId(subscriptionSessionId)) == null) + { + archive.checkForErrorResponse(); + Thread.yield(); + } + + final ConsensusModuleSnapshotAdapter adapter = new ConsensusModuleSnapshotAdapter( + image, new ConsensusModuleSnapshotCopier(publication)); + + while (true) + { + final int fragments = image.controlledPoll(adapter, 10); + if (adapter.isDone()) + { + break; + } + + if (0 == fragments) + { + if (image.isClosed()) + { + throw new ClusterException("snapshot ended unexpectedly: " + image); + } + + archive.checkForErrorResponse(); + Thread.yield(); + } + } + + while (counters.getCounterValue(counterId) < publication.position()) + { + Thread.yield(); + archive.checkForErrorResponse(); + + if (!RecordingPos.isActive(counters, counterId, recordingId)) + { + throw new ClusterException("recording stopped unexpectedly: " + recordingId); + } + } + } + } + + return SUCCESS; + } + + protected int addServiceSnapshot( + final PrintStream out, + final File clusterDir, + final int recordingId) + { + try (RecordingLog recordingLog = new RecordingLog(clusterDir, false)) + { + final RecordingLog.Entry entry = recordingLog.getLatestSnapshot(ConsensusModule.Configuration.SERVICE_ID); + + if (null == entry) + { + out.println("Snapshot not found"); + return FAILURE; + } + + final int newServiceId = recordingLog.getHighServiceIdOfLatestSnapshot() + 1; + + recordingLog.appendSnapshot( + recordingId, + entry.leadershipTermId, + entry.termBaseLogPosition, + entry.logPosition, + entry.timestamp, + newServiceId); + + out.println("Snapshot added for serviceId=" + newServiceId + " and recordingId=" + recordingId); + } + + return SUCCESS; + } + /** * Print out the errors in the error logs for the cluster components. * diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModuleSnapshotCopier.java b/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModuleSnapshotCopier.java new file mode 100644 index 0000000000..a26e515120 --- /dev/null +++ b/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModuleSnapshotCopier.java @@ -0,0 +1,147 @@ +/* + * Copyright 2014-2024 Real Logic Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.aeron.cluster; + +import io.aeron.ExclusivePublication; +import io.aeron.Publication; +import io.aeron.cluster.client.ClusterException; +import io.aeron.cluster.codecs.CloseReason; +import io.aeron.cluster.codecs.MessageHeaderEncoder; +import io.aeron.cluster.codecs.SnapshotMarkerEncoder; +import io.aeron.logbuffer.BufferClaim; +import org.agrona.DirectBuffer; + +import java.util.concurrent.TimeUnit; + +import static io.aeron.cluster.service.ClusteredServiceContainer.Configuration.SNAPSHOT_TYPE_ID; + +/** + * Copies the relevant bits of a consensus module snapshot into a regular service snapshot. + * Copies the snapshot marker begin and end, as well as the sessions. + * Modifies the snapshot marker type id. + */ +class ConsensusModuleSnapshotCopier implements ConsensusModuleSnapshotListener +{ + private final BufferClaim bufferClaim = new BufferClaim(); + private final SnapshotMarkerEncoder encoder = new SnapshotMarkerEncoder(); + private final ExclusivePublication publication; + + ConsensusModuleSnapshotCopier(final ExclusivePublication publication) + { + this.publication = publication; + } + + public void onLoadBeginSnapshot( + final int appVersion, final TimeUnit timeUnit, final DirectBuffer buffer, final int offset, final int length) + { + onSnapshotMarker(buffer, offset, length); + } + + public void onLoadEndSnapshot(final DirectBuffer buffer, final int offset, final int length) + { + onSnapshotMarker(buffer, offset, length); + } + + public void onLoadConsensusModuleState( + final long nextSessionId, + final long nextServiceSessionId, + final long logServiceSessionId, + final int pendingMessageCapacity, + final DirectBuffer buffer, + final int offset, + final int length) + { + } + + public void onLoadPendingMessage( + final long clusterSessionId, final DirectBuffer buffer, final int offset, final int length) + { + } + + public void onLoadClusterSession( + final long clusterSessionId, + final long correlationId, + final long openedLogPosition, + final long timeOfLastActivity, + final CloseReason closeReason, + final int responseStreamId, + final String responseChannel, + final DirectBuffer buffer, + final int offset, + final int length) + { + claimAndPut(buffer, offset, length); + bufferClaim.commit(); + } + + public void onLoadTimer( + final long correlationId, final long deadline, final DirectBuffer buffer, final int offset, final int length) + { + } + + public void onLoadPendingMessageTracker( + final long nextServiceSessionId, + final long logServiceSessionId, + final int pendingMessageCapacity, + final int serviceId, + final DirectBuffer buffer, + final int offset, + final int length) + { + } + + private void onSnapshotMarker(final DirectBuffer buffer, final int offset, final int length) + { + claimAndPut(buffer, offset, length); + + // overwrite the marker's type id before committing + encoder.wrap(bufferClaim.buffer(), bufferClaim.offset() + MessageHeaderEncoder.ENCODED_LENGTH); + encoder.typeId(SNAPSHOT_TYPE_ID); + + bufferClaim.commit(); + } + + private void claimAndPut(final DirectBuffer buffer, final int offset, final int length) + { + while (true) + { + final long result = publication.tryClaim(length, bufferClaim); + if (result > 0) + { + bufferClaim.putBytes(buffer, offset, length); + break; + } + + if (Publication.NOT_CONNECTED == result) + { + throw new ClusterException("publication is not connected"); + } + + if (Publication.CLOSED == result) + { + throw new ClusterException("publication is closed"); + } + + if (Publication.MAX_POSITION_EXCEEDED == result) + { + throw new ClusterException( + "publication at max position: term-length=" + publication.termBufferLength()); + } + + Thread.yield(); + } + } +} diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/RecordingLog.java b/aeron-cluster/src/main/java/io/aeron/cluster/RecordingLog.java index faa2d8606e..031f48958f 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/RecordingLog.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/RecordingLog.java @@ -982,6 +982,39 @@ public Entry getLatestSnapshot(final int serviceId) return null; } + /** + * Get the high service id of the latest snapshot. + * If no services are found, this returns -1. + * + * @return the high service id. + */ + public int getHighServiceIdOfLatestSnapshot() + { + for (int idx = entriesCache.size() - 1; idx >= 0; idx--) + { + final Entry entry = entriesCache.get(idx); + if (isValidSnapshot(entry) && ConsensusModule.Configuration.SERVICE_ID == entry.serviceId) + { + int highServiceId = entry.serviceId; + + for (--idx; idx >= 0; idx--) + { + final Entry svcEntry = entriesCache.get(idx); + if (isValidAnySnapshot(svcEntry) && svcEntry.serviceId == highServiceId + 1) + { + highServiceId = svcEntry.serviceId; + } + else + { + return highServiceId; + } + } + } + } + + return ConsensusModule.Configuration.SERVICE_ID; + } + /** * Invalidate the last snapshot taken by the cluster so that on restart it can revert to the previous one. * diff --git a/aeron-system-tests/src/test/java/io/aeron/cluster/ClusterToolTest.java b/aeron-system-tests/src/test/java/io/aeron/cluster/ClusterToolTest.java index 5b5d46317f..c5ece1fccb 100644 --- a/aeron-system-tests/src/test/java/io/aeron/cluster/ClusterToolTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/cluster/ClusterToolTest.java @@ -36,14 +36,14 @@ import java.util.Arrays; import java.util.List; import java.util.function.Consumer; +import java.util.regex.Matcher; import java.util.regex.Pattern; import static io.aeron.test.cluster.TestCluster.aCluster; import static java.nio.file.StandardOpenOption.CREATE_NEW; import static java.nio.file.StandardOpenOption.WRITE; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.matchesRegex; +import static org.hamcrest.Matchers.*; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.mock; @@ -112,6 +112,108 @@ void shouldDescribeLatestConsensusModuleSnapshot() containsString("Snapshot: appVersion=1 timeUnit=MILLISECONDS")); } + @Test + @InterruptAfter(5) + void shouldAddServiceSnapshot() + { + final TestCluster cluster = aCluster().withStaticNodes(3).start(); + systemTestWatcher.cluster(cluster); + + final TestNode leader = cluster.awaitLeader(); + assertNotNull(cluster.asyncConnectClient()); + cluster.sendAndAwaitMessages(10); + + final CapturingPrintStream capturingPrintStream = new CapturingPrintStream(); + + assertFalse(ClusterTool.addServiceSnapshot( + capturingPrintStream.resetAndGetPrintStream(), + leader.consensusModule().context().clusterDir(), + 5)); + + assertTrue(ClusterTool.snapshot( + leader.consensusModule().context().clusterDir(), + capturingPrintStream.resetAndGetPrintStream())); + cluster.awaitSnapshotCount(1); + + assertTrue(ClusterTool.addServiceSnapshot( + capturingPrintStream.resetAndGetPrintStream(), + leader.consensusModule().context().clusterDir(), + 7)); + + assertThat( + capturingPrintStream.flushAndGetContent(), + containsString("recordingId=7")); + + ClusterTool.recordingLog( + capturingPrintStream.resetAndGetPrintStream(), + leader.consensusModule().context().clusterDir()); + + final String log = capturingPrintStream.flushAndGetContent(); + assertThat(log, containsString("serviceId=1")); + assertThat(log, containsString("recordingId=7")); + assertThat(log, not(containsString("recordingId=5"))); + } + + @Test + @InterruptAfter(5) + void shouldAddNewServiceAfterSnapshot() + { + final int[] recordingIds = new int[3]; + + final TestCluster cluster = aCluster().withStaticNodes(3).start(); + systemTestWatcher.cluster(cluster); + + final TestNode initialLeader = cluster.awaitLeader(); + assertNotNull(cluster.asyncConnectClient()); + cluster.sendAndAwaitMessages(10); + + final CapturingPrintStream capturingPrintStream = new CapturingPrintStream(); + + assertTrue(ClusterTool.snapshot( + initialLeader.consensusModule().context().clusterDir(), + capturingPrintStream.resetAndGetPrintStream())); + cluster.awaitSnapshotCount(1); + + final Pattern pattern = Pattern.compile("New snapshot recording id: (\\d+)"); + + for (int i = 0; i <= 2; i++) + { + assertTrue(ClusterTool.createEmptyServiceSnapshot( + cluster.node(i).consensusModule().context().clusterDir(), + capturingPrintStream.resetAndGetPrintStream())); + + final Matcher matcher = pattern.matcher(capturingPrintStream.flushAndGetContent()); + assertTrue(matcher.find()); + recordingIds[i] = Integer.parseInt(matcher.group(1)); + } + + cluster.stopAllNodes(); + + for (int i = 0; i <= 2; i++) + { + assertTrue(ClusterTool.addServiceSnapshot( + capturingPrintStream.resetAndGetPrintStream(), + cluster.node(i).consensusModule().context().clusterDir(), + recordingIds[i])); + + cluster.startStaticNode( + i, + false, + (x) -> new TestNode.TestService[]{ + new TestNode.TestService().index(x), + new TestNode.TestService().index(x + 3).requireFullSnapshot(false) + }); + } + + cluster.awaitLeader(); + + assertNotNull(cluster.asyncConnectClient()); + + cluster.sendMessages(10); + cluster.awaitResponseMessageCount(20); // two services, so twice the messages pinged back + cluster.awaitServicesMessageCount(10); // each service gets 10 messages + } + @Test @InterruptAfter(30) void shouldNotSnapshotWhenSuspendedOnly() diff --git a/aeron-test-support/src/main/java/io/aeron/test/cluster/TestNode.java b/aeron-test-support/src/main/java/io/aeron/test/cluster/TestNode.java index 7cc6d1a407..d6db8f33c4 100644 --- a/aeron-test-support/src/main/java/io/aeron/test/cluster/TestNode.java +++ b/aeron-test-support/src/main/java/io/aeron/test/cluster/TestNode.java @@ -354,6 +354,7 @@ public static class TestService extends StubClusteredService volatile boolean wasSnapshotLoaded = false; volatile boolean failNextSnapshot = false; private int index; + private boolean requireFullSnapshot = true; private volatile boolean hasReceivedUnexpectedMessage = false; private volatile Cluster.Role roleChangedTo = null; private final AtomicInteger activeSessionCount = new AtomicInteger(); @@ -369,6 +370,12 @@ public TestService index(final int index) return this; } + public TestService requireFullSnapshot(final boolean requireFullSnapshot) + { + this.requireFullSnapshot = requireFullSnapshot; + return this; + } + int index() { return index; @@ -449,7 +456,7 @@ public void onStart(final Cluster cluster, final Image snapshotImage) idleStrategy.idle(fragments); } - if (fragmentCount != SNAPSHOT_FRAGMENT_COUNT) + if (requireFullSnapshot && (fragmentCount != SNAPSHOT_FRAGMENT_COUNT)) { throw new AgentTerminationException( "unexpected snapshot length: expected=" + SNAPSHOT_FRAGMENT_COUNT + " actual=" + fragmentCount);