Skip to content

Commit

Permalink
fix: Implement new PCES writer flush (#10945)
Browse files Browse the repository at this point in the history
Signed-off-by: Austin Littley <[email protected]>
  • Loading branch information
litt3 authored Jan 12, 2024
1 parent 3df5a2a commit cca3e1c
Show file tree
Hide file tree
Showing 13 changed files with 279 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,8 @@ public class SwirldsPlatform implements Platform {
eventObserverDispatcher,
shadowGraph,
latestEventTipsetTracker,
intakeEventCounter);
intakeEventCounter,
platformWiring.getKeystoneEventSequenceNumberOutput());

final EventCreationManager eventCreationManager = buildEventCreationManager(
platformContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.swirlds.base.time.Time;
import com.swirlds.common.context.PlatformContext;
import com.swirlds.common.wiring.wires.output.StandardOutputWire;
import com.swirlds.platform.Consensus;
import com.swirlds.platform.consensus.NonAncientEventWindow;
import com.swirlds.platform.gossip.IntakeEventCounter;
Expand Down Expand Up @@ -67,6 +68,11 @@ public class LinkedEventIntake {
*/
private final IntakeEventCounter intakeEventCounter;

/**
* The secondary wire that outputs the keystone event sequence number
*/
private final StandardOutputWire<Long> keystoneEventSequenceNumberOutput;

/**
* Whether or not the linked event intake is paused.
* <p>
Expand All @@ -77,14 +83,16 @@ public class LinkedEventIntake {
/**
* Constructor
*
* @param platformContext the platform context
* @param time provides the wall clock time
* @param consensusSupplier provides the current consensus instance
* @param dispatcher invokes event related callbacks
* @param shadowGraph tracks events in the hashgraph
* @param latestEventTipsetTracker tracks the tipset of the latest self event, null if feature is not enabled
* @param intakeEventCounter tracks the number of events from each peer that are currently in the intake
* pipeline
* @param platformContext the platform context
* @param time provides the wall clock time
* @param consensusSupplier provides the current consensus instance
* @param dispatcher invokes event related callbacks
* @param shadowGraph tracks events in the hashgraph
* @param latestEventTipsetTracker tracks the tipset of the latest self event, null if feature is
* not enabled
* @param intakeEventCounter tracks the number of events from each peer that are currently in
* the intake pipeline
* @param keystoneEventSequenceNumberOutput the secondary wire that outputs the keystone event sequence number
*/
public LinkedEventIntake(
@NonNull final PlatformContext platformContext,
Expand All @@ -93,13 +101,15 @@ public LinkedEventIntake(
@NonNull final EventObserverDispatcher dispatcher,
@NonNull final ShadowGraph shadowGraph,
@Nullable final LatestEventTipsetTracker latestEventTipsetTracker,
@NonNull final IntakeEventCounter intakeEventCounter) {
@NonNull final IntakeEventCounter intakeEventCounter,
@NonNull final StandardOutputWire<Long> keystoneEventSequenceNumberOutput) {
this.platformContext = Objects.requireNonNull(platformContext);
this.time = Objects.requireNonNull(time);
this.consensusSupplier = Objects.requireNonNull(consensusSupplier);
this.dispatcher = Objects.requireNonNull(dispatcher);
this.shadowGraph = Objects.requireNonNull(shadowGraph);
this.intakeEventCounter = Objects.requireNonNull(intakeEventCounter);
this.keystoneEventSequenceNumberOutput = Objects.requireNonNull(keystoneEventSequenceNumberOutput);
this.latestEventTipsetTracker = latestEventTipsetTracker;

this.paused = false;
Expand Down Expand Up @@ -138,7 +148,15 @@ public List<ConsensusRound> addEvent(@NonNull final EventImpl event) {
dispatcher.eventAdded(event);

if (consensusRounds != null) {
consensusRounds.forEach(this::handleConsensus);
consensusRounds.forEach(round -> {
// it is important that a flush request for the keystone event is submitted before starting
// to handle the transactions in the round. Otherwise, the system could arrive at a place
// where the transaction handler is waiting for a given event to become durable, but the
// PCES writer hasn't been notified yet that the event should be flushed.
keystoneEventSequenceNumberOutput.forward(
round.getKeystoneEvent().getBaseEvent().getStreamSequenceNumber());
handleConsensus(round);
});
}

final long minimumGenerationNonAncient = consensusSupplier.get().getMinGenerationNonAncient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ private static final class ClassVersion {
*/
public static final long NO_STREAM_SEQUENCE_NUMBER = -1;

/**
* The sequence number of an event that will never be written to disk because it is stale.
*/
public static final long STALE_EVENT_STREAM_SEQUENCE_NUMBER = -2;

/**
* Each event is assigned a sequence number as it is written to the preconsensus event stream. This is used to
* signal when events have been made durable.
Expand Down Expand Up @@ -109,8 +104,7 @@ public GossipEvent(final BaseEventHashedData hashedData, final BaseEventUnhashed
* @param streamSequenceNumber the sequence number
*/
public void setStreamSequenceNumber(final long streamSequenceNumber) {
if (this.streamSequenceNumber != NO_STREAM_SEQUENCE_NUMBER
&& streamSequenceNumber != STALE_EVENT_STREAM_SEQUENCE_NUMBER) {
if (this.streamSequenceNumber != NO_STREAM_SEQUENCE_NUMBER) {
throw new IllegalStateException("sequence number already set");
}
this.streamSequenceNumber = streamSequenceNumber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.swirlds.common.threading.CountUpLatch;
import com.swirlds.platform.event.GossipEvent;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;

/**
* A class used to determine if an event is guaranteed to be durable, i.e. flushed to disk.
Expand All @@ -46,12 +45,6 @@ public void setLatestDurableSequenceNumber(final long lastFlushedEvent) {
* @return true if the event is guaranteed to be durable, false otherwise
*/
public boolean isEventDurable(@NonNull final GossipEvent event) {
Objects.requireNonNull(event);
if (event.getStreamSequenceNumber() == GossipEvent.STALE_EVENT_STREAM_SEQUENCE_NUMBER) {
// Stale events are not written to disk.
return false;
}

return event.getStreamSequenceNumber() <= latestDurableSequenceNumber.getCount();
}

Expand All @@ -62,11 +55,6 @@ public boolean isEventDurable(@NonNull final GossipEvent event) {
* @throws InterruptedException if interrupted while waiting
*/
public void waitUntilDurable(@NonNull final GossipEvent event) throws InterruptedException {
Objects.requireNonNull(event);
if (event.getStreamSequenceNumber() == GossipEvent.STALE_EVENT_STREAM_SEQUENCE_NUMBER) {
throw new IllegalStateException("Event is stale and will never be durable");
}

latestDurableSequenceNumber.await(event.getStreamSequenceNumber());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Objects;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -118,6 +120,11 @@ public class PcesWriter {
*/
private long lastWrittenEvent = -1;

/**
* The highest event sequence number that has been durably flushed to disk.
*/
private long lastFlushedEvent = -1;

/**
* If true then all added events are new and need to be written to the stream. If false then all added events are
* already durable and do not need to be written to the stream.
Expand All @@ -131,21 +138,26 @@ public class PcesWriter {
*/
private final AncientMode fileType;

/**
* A collection of outstanding flush requests
* <p>
* Each flush request is a sequence number that needs to be flushed to disk as soon as possible.
*/
private final Deque<Long> flushRequests = new ArrayDeque<>();

/**
* Constructor
*
* @param platformContext the platform context
* @param fileManager manages all preconsensus event stream files currently on disk
*/
public PcesWriter(@NonNull final PlatformContext platformContext, @NonNull final PcesFileManager fileManager) {

Objects.requireNonNull(platformContext, "platformContext must not be null");
Objects.requireNonNull(fileManager, "fileManager must not be null");
Objects.requireNonNull(platformContext);
Objects.requireNonNull(fileManager);

final PcesConfig config = platformContext.getConfiguration().getConfigData(PcesConfig.class);

preferredFileSizeMegabytes = config.preferredFileSizeMegabytes();

averageSpanUtilization = new LongRunningAverage(config.spanUtilizationRunningAverageLength());
previousSpan = config.bootstrapSpan();
bootstrapSpanOverlapFactor = config.bootstrapSpanOverlapFactor();
Expand Down Expand Up @@ -176,6 +188,38 @@ public void beginStreamingNewEvents(final @NonNull DoneStreamingPcesTrigger igno
streamingNewEvents = true;
}

/**
* Consider outstanding flush requests and perform a flush if needed.
*
* @return true if a flush was performed, otherwise false
*/
private boolean processFlushRequests() {
boolean flushRequired = false;
while (!flushRequests.isEmpty() && flushRequests.peekFirst() <= lastWrittenEvent) {
final long flushRequest = flushRequests.removeFirst();

if (flushRequest > lastFlushedEvent) {
flushRequired = true;
}
}

if (flushRequired) {
if (currentMutableFile == null) {
logger.error(EXCEPTION.getMarker(), "Flush required, but no file is open. This should never happen");
}

try {
currentMutableFile.flush();
} catch (final IOException e) {
throw new UncheckedIOException(e);
}

lastFlushedEvent = lastWrittenEvent;
}

return flushRequired;
}

/**
* Write an event to the stream.
*
Expand All @@ -185,24 +229,30 @@ public void beginStreamingNewEvents(final @NonNull DoneStreamingPcesTrigger igno
*/
@Nullable
public Long writeEvent(@NonNull final GossipEvent event) {
validateSequenceNumber(event);
if (event.getStreamSequenceNumber() == GossipEvent.NO_STREAM_SEQUENCE_NUMBER) {
throw new IllegalStateException("Event must have a valid stream sequence number");
}

// if we aren't streaming new events yet, assume that the given event is already durable
if (!streamingNewEvents) {
lastWrittenEvent = event.getStreamSequenceNumber();
return lastWrittenEvent;
lastFlushedEvent = event.getStreamSequenceNumber();
return event.getStreamSequenceNumber();
}

// don't do anything with ancient events
if (event.getAncientIndicator(fileType) < nonAncientBoundary) {
event.setStreamSequenceNumber(GossipEvent.STALE_EVENT_STREAM_SEQUENCE_NUMBER);
return null;
}

try {
final Long latestDurableSequenceNumberUpdate = prepareOutputStream(event);
final boolean fileClosed = prepareOutputStream(event);
currentMutableFile.writeEvent(event);
lastWrittenEvent = event.getStreamSequenceNumber();

return latestDurableSequenceNumberUpdate;
final boolean flushPerformed = processFlushRequests();

return fileClosed || flushPerformed ? lastFlushedEvent : null;
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
Expand All @@ -221,56 +271,46 @@ public Long registerDiscontinuity(final long newOriginRound) {
logger.error(EXCEPTION.getMarker(), "registerDiscontinuity() called while replaying events");
}

final Long latestDurableSequenceNumberUpdate;
if (currentMutableFile != null) {
closeFile();
latestDurableSequenceNumberUpdate = lastWrittenEvent;
} else {
latestDurableSequenceNumberUpdate = null;
try {
if (currentMutableFile != null) {
closeFile();
return lastFlushedEvent;
} else {
return null;
}
} finally {
fileManager.registerDiscontinuity(newOriginRound);
}

fileManager.registerDiscontinuity(newOriginRound);

return latestDurableSequenceNumberUpdate;
}

/**
* Make sure that the event has a valid stream sequence number.
* Submit a request to flush a given sequence number to disk as soon as possible. This flush request will be
* honored as soon as the event with the given sequence number is received.
*
* @param sequenceNumber the sequence number to flush
* @return the sequence number of the last event durably written to the stream, or null if this method call didn't
* result in any additional events being durably written to the stream
*/
private static void validateSequenceNumber(@NonNull final GossipEvent event) {
if (event.getStreamSequenceNumber() == GossipEvent.NO_STREAM_SEQUENCE_NUMBER
|| event.getStreamSequenceNumber() == GossipEvent.STALE_EVENT_STREAM_SEQUENCE_NUMBER) {
throw new IllegalStateException("Event must have a valid stream sequence number");
}
@Nullable
public Long submitFlushRequest(final long sequenceNumber) {
flushRequests.add(sequenceNumber);

return processFlushRequests() ? lastFlushedEvent : null;
}

/**
* Let the event writer know the current non-ancient event boundary. Ancient events will be ignored if added to the
* event writer.
*
* @param nonAncientBoundary describes the boundary between ancient and non-ancient events
* @return the sequence number of the last event durably written to the stream if this method call resulted in any
* additional events being durably written to the stream, otherwise null
*/
@Nullable
public Long updateNonAncientEventBoundary(@NonNull final NonAncientEventWindow nonAncientBoundary) {
public void updateNonAncientEventBoundary(@NonNull final NonAncientEventWindow nonAncientBoundary) {
if (nonAncientBoundary.getAncientThreshold() < this.nonAncientBoundary) {
throw new IllegalArgumentException("Non-ancient boundary cannot be decreased. Current = "
+ this.nonAncientBoundary + ", requested = " + nonAncientBoundary);
}

this.nonAncientBoundary = nonAncientBoundary.getAncientThreshold();

if (!streamingNewEvents || currentMutableFile == null) {
return null;
}

try {
currentMutableFile.flush();
return lastWrittenEvent;
} catch (final IOException e) {
throw new UncheckedIOException("unable to flush", e);
}
}

/**
Expand Down Expand Up @@ -313,6 +353,7 @@ private void closeFile() {
averageSpanUtilization.add(previousSpan);
}
currentMutableFile.close();
lastFlushedEvent = lastWrittenEvent;

fileManager.finishedWritingFile(currentMutableFile);
currentMutableFile = null;
Expand Down Expand Up @@ -350,11 +391,10 @@ private long computeNewFileSpan(final long minimumLowerBound, final long nextAnc
* Prepare the output stream for a particular event. May create a new file/stream if needed.
*
* @param eventToWrite the event that is about to be written
* @return the latest sequence number durably written to disk, if preparing the output stream caused a file to be
* closed. Otherwise, null.
* @return true if this method call resulted in the current file being closed
*/
private Long prepareOutputStream(@NonNull final GossipEvent eventToWrite) throws IOException {
Long latestDurableSequenceNumberUpdate = null;
private boolean prepareOutputStream(@NonNull final GossipEvent eventToWrite) throws IOException {
boolean fileClosed = false;
if (currentMutableFile != null) {
final boolean fileCanContainEvent =
currentMutableFile.canContain(eventToWrite.getAncientIndicator(fileType));
Expand All @@ -363,14 +403,15 @@ private Long prepareOutputStream(@NonNull final GossipEvent eventToWrite) throws

if (!fileCanContainEvent || fileIsFull) {
closeFile();
latestDurableSequenceNumberUpdate = lastWrittenEvent;
fileClosed = true;
}

if (fileIsFull) {
bootstrapMode = false;
}
}

// if the block above closed the file, then we need to create a new one
if (currentMutableFile == null) {
final long upperBound = nonAncientBoundary
+ computeNewFileSpan(nonAncientBoundary, eventToWrite.getAncientIndicator(fileType));
Expand All @@ -380,7 +421,7 @@ private Long prepareOutputStream(@NonNull final GossipEvent eventToWrite) throws
.getMutableFile();
}

return latestDurableSequenceNumberUpdate;
return fileClosed;
}

/**
Expand Down
Loading

0 comments on commit cca3e1c

Please sign in to comment.