Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New cluster tool command to create empty service snapshot #1699

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions aeron-cluster/src/main/java/io/aeron/cluster/ClusterTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,20 @@ public class ClusterTool
action(operator::seedRecordingLogFromSnapshot),
"creates a new recording log based on the latest valid snapshot."));

COMMANDS.put("create-empty-service-snapshot", new ClusterToolCommand(
action(operator::createEmptyServiceSnapshot),
"creates an empty service snapshot based on the latest consensus module snapshot."));

COMMANDS.put("add-service-snapshot", new ClusterToolCommand((clusterDir, out, args) ->
{
if (args.length < 3)
{
printHelp(COMMANDS, HELP_PREFIX);
return -1;
}
return operator.addServiceSnapshot(out, clusterDir, Integer.parseInt(args[2]));
}, "Adds a new snapshot with the specified recording id and with the next available service id."));

COMMANDS.put("errors", new ClusterToolCommand(
action(operator::errors),
"prints Aeron and cluster component error logs."));
Expand Down Expand Up @@ -321,6 +335,32 @@ public static void seedRecordingLogFromSnapshot(final File clusterDir)
BACKWARD_COMPATIBLE_OPERATIONS.seedRecordingLogFromSnapshot(clusterDir);
}

/**
* Create a new/empty service snapshot recording based on the most recent snapshot.
*
* @param clusterDir where the cluster is running.
* @param out to print the output to.
* @return {@code true} if snapshot is created or {@code false} if it is not.
*/
public static boolean createEmptyServiceSnapshot(final File clusterDir, final PrintStream out)
{
return BACKWARD_COMPATIBLE_OPERATIONS.createEmptyServiceSnapshot(clusterDir, out) == SUCCESS;
}

/**
* Add a new snapshot entry to the recording log with the specified recording id.
* The new snapshot entry will use the next highest service id.
*
* @param out to print the output to.
* @param clusterDir where the cluster is running.
* @param recordingId the recording id to associate with the new snapshot entry
* @return {@code true} if the entry is added or {@code false} if it is not.
*/
public static boolean addServiceSnapshot(final PrintStream out, final File clusterDir, final int recordingId)
{
return BACKWARD_COMPATIBLE_OPERATIONS.addServiceSnapshot(out, clusterDir, recordingId) == SUCCESS;
}

/**
* Print out the errors in the error logs for the cluster components.
*
Expand Down
136 changes: 135 additions & 1 deletion aeron-cluster/src/main/java/io/aeron/cluster/ClusterToolOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package io.aeron.cluster;

import static io.aeron.Aeron.NULL_VALUE;
import static io.aeron.archive.codecs.SourceLocation.LOCAL;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.nio.file.StandardCopyOption.*;
import static java.nio.file.StandardOpenOption.*;
import static org.agrona.Strings.isEmpty;
import static org.agrona.concurrent.status.CountersReader.NULL_COUNTER_ID;

import java.io.File;
import java.io.IOException;
Expand All @@ -36,6 +38,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.*;

import io.aeron.archive.status.RecordingPos;
import org.agrona.BufferUtil;
import org.agrona.DirectBuffer;
import org.agrona.IoUtil;
Expand Down Expand Up @@ -127,7 +130,14 @@ protected int pid(final File clusterDir, final PrintStream out)
*/
protected int recoveryPlan(final PrintStream out, final File clusterDir, final int serviceCount)
{
try (AeronArchive archive = AeronArchive.connect();
final ClusterNodeControlProperties properties = loadControlProperties(clusterDir);

final AeronArchive.Context archiveCtx = new AeronArchive.Context()
.controlRequestChannel("aeron:ipc")
.controlResponseChannel("aeron:ipc");

try (Aeron aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(properties.aeronDirectoryName));
AeronArchive archive = AeronArchive.connect(archiveCtx.aeron(aeron));
RecordingLog recordingLog = new RecordingLog(clusterDir, false))
{
out.println(recordingLog.createRecoveryPlan(archive, serviceCount, Aeron.NULL_VALUE));
Expand Down Expand Up @@ -255,6 +265,130 @@ protected int seedRecordingLogFromSnapshot(final File clusterDir)
return SUCCESS;
}

protected int createEmptyServiceSnapshot(final File clusterDir, final PrintStream out)
{
final RecordingLog.Entry entry = findLatestValidSnapshot(clusterDir);
if (null == entry)
{
out.println("Snapshot not found");
return FAILURE;
}

final ClusterNodeControlProperties properties = loadControlProperties(clusterDir);

final AeronArchive.Context archiveCtx = new AeronArchive.Context()
.controlRequestChannel("aeron:ipc")
.controlResponseChannel("aeron:ipc");

try (Aeron aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(properties.aeronDirectoryName));
AeronArchive archive = AeronArchive.connect(archiveCtx.aeron(aeron)))
{
if (null == archive)
{
out.println("unable to connect to Archive");
return FAILURE;
}

final int subscriptionSessionId = (int)archive.startReplay(
entry.recordingId, 0, AeronArchive.NULL_LENGTH, toolChannel, toolStreamId);

final String replayChannel = ChannelUri.addSessionId(toolChannel, subscriptionSessionId);
try (Subscription subscription = aeron.addSubscription(replayChannel, toolStreamId);
ExclusivePublication publication = aeron.addExclusivePublication(toolChannel, toolStreamId))
{
final int publicationSessionId = publication.sessionId();
final String recordingChannel = ChannelUri.addSessionId(toolChannel, publicationSessionId);
archive.startRecording(recordingChannel, toolStreamId, LOCAL, true);

final CountersReader counters = aeron.countersReader();
final long archiveId = archive.archiveId();
int counterId = RecordingPos.findCounterIdBySession(counters, publicationSessionId, archiveId);
while (NULL_COUNTER_ID == counterId)
{
Thread.yield();
archive.checkForErrorResponse();
counterId = RecordingPos.findCounterIdBySession(counters, publicationSessionId, archiveId);
}
final long recordingId = RecordingPos.getRecordingId(counters, counterId);
out.println("New snapshot recording id: " + recordingId);

Image image;
while ((image = subscription.imageBySessionId(subscriptionSessionId)) == null)
{
archive.checkForErrorResponse();
Thread.yield();
}

final ConsensusModuleSnapshotAdapter adapter = new ConsensusModuleSnapshotAdapter(
image, new ConsensusModuleSnapshotCopier(publication));

while (true)
{
final int fragments = image.controlledPoll(adapter, 10);
if (adapter.isDone())
{
break;
}

if (0 == fragments)
{
if (image.isClosed())
{
throw new ClusterException("snapshot ended unexpectedly: " + image);
}

archive.checkForErrorResponse();
Thread.yield();
}
}

while (counters.getCounterValue(counterId) < publication.position())
{
Thread.yield();
archive.checkForErrorResponse();

if (!RecordingPos.isActive(counters, counterId, recordingId))
{
throw new ClusterException("recording stopped unexpectedly: " + recordingId);
}
}
}
}

return SUCCESS;
}

protected int addServiceSnapshot(
final PrintStream out,
final File clusterDir,
final int recordingId)
{
try (RecordingLog recordingLog = new RecordingLog(clusterDir, false))
{
final RecordingLog.Entry entry = recordingLog.getLatestSnapshot(ConsensusModule.Configuration.SERVICE_ID);

if (null == entry)
{
out.println("Snapshot not found");
return FAILURE;
}

final int newServiceId = recordingLog.getHighServiceIdOfLatestSnapshot() + 1;

recordingLog.appendSnapshot(
recordingId,
entry.leadershipTermId,
entry.termBaseLogPosition,
entry.logPosition,
entry.timestamp,
newServiceId);

out.println("Snapshot added for serviceId=" + newServiceId + " and recordingId=" + recordingId);
}

return SUCCESS;
}

/**
* Print out the errors in the error logs for the cluster components.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright 2014-2024 Real Logic Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.aeron.cluster;

import io.aeron.ExclusivePublication;
import io.aeron.Publication;
import io.aeron.cluster.client.ClusterException;
import io.aeron.cluster.codecs.CloseReason;
import io.aeron.cluster.codecs.MessageHeaderEncoder;
import io.aeron.cluster.codecs.SnapshotMarkerEncoder;
import io.aeron.logbuffer.BufferClaim;
import org.agrona.DirectBuffer;

import java.util.concurrent.TimeUnit;

import static io.aeron.cluster.service.ClusteredServiceContainer.Configuration.SNAPSHOT_TYPE_ID;

/**
* Copies the relevant bits of a consensus module snapshot into a regular service snapshot.
* Copies the snapshot marker begin and end, as well as the sessions.
* Modifies the snapshot marker type id.
*/
class ConsensusModuleSnapshotCopier implements ConsensusModuleSnapshotListener
{
private final BufferClaim bufferClaim = new BufferClaim();
private final SnapshotMarkerEncoder encoder = new SnapshotMarkerEncoder();
private final ExclusivePublication publication;

ConsensusModuleSnapshotCopier(final ExclusivePublication publication)
{
this.publication = publication;
}

public void onLoadBeginSnapshot(
final int appVersion, final TimeUnit timeUnit, final DirectBuffer buffer, final int offset, final int length)
{
onSnapshotMarker(buffer, offset, length);
}

public void onLoadEndSnapshot(final DirectBuffer buffer, final int offset, final int length)
{
onSnapshotMarker(buffer, offset, length);
}

public void onLoadConsensusModuleState(
final long nextSessionId,
final long nextServiceSessionId,
final long logServiceSessionId,
final int pendingMessageCapacity,
final DirectBuffer buffer,
final int offset,
final int length)
{
}

public void onLoadPendingMessage(
final long clusterSessionId, final DirectBuffer buffer, final int offset, final int length)
{
}

public void onLoadClusterSession(
final long clusterSessionId,
final long correlationId,
final long openedLogPosition,
final long timeOfLastActivity,
final CloseReason closeReason,
final int responseStreamId,
final String responseChannel,
final DirectBuffer buffer,
final int offset,
final int length)
{
claimAndPut(buffer, offset, length);
bufferClaim.commit();
}

public void onLoadTimer(
final long correlationId, final long deadline, final DirectBuffer buffer, final int offset, final int length)
{
}

public void onLoadPendingMessageTracker(
final long nextServiceSessionId,
final long logServiceSessionId,
final int pendingMessageCapacity,
final int serviceId,
final DirectBuffer buffer,
final int offset,
final int length)
{
}

private void onSnapshotMarker(final DirectBuffer buffer, final int offset, final int length)
{
claimAndPut(buffer, offset, length);

// overwrite the marker's type id before committing
encoder.wrap(bufferClaim.buffer(), bufferClaim.offset() + MessageHeaderEncoder.ENCODED_LENGTH);
encoder.typeId(SNAPSHOT_TYPE_ID);

bufferClaim.commit();
}

private void claimAndPut(final DirectBuffer buffer, final int offset, final int length)
{
while (true)
{
final long result = publication.tryClaim(length, bufferClaim);
if (result > 0)
{
bufferClaim.putBytes(buffer, offset, length);
break;
}

if (Publication.NOT_CONNECTED == result)
{
throw new ClusterException("publication is not connected");
}

if (Publication.CLOSED == result)
{
throw new ClusterException("publication is closed");
}

if (Publication.MAX_POSITION_EXCEEDED == result)
{
throw new ClusterException(
"publication at max position: term-length=" + publication.termBufferLength());
}

Thread.yield();
}
}
}
Loading
Loading