From 59709fc189a5c044f9331a1d84a6abea7e19a179 Mon Sep 17 00:00:00 2001 From: Spottedleaf Date: Thu, 17 Oct 2024 10:39:00 -0700 Subject: [PATCH] Add async chunk writing The RegionFile IO scheduler is now capable of taking in a Completable for the chunk data instead of CompoundTag. This allows writes to be scheduled without the write value immediately. --- .../chunk_system/MinecraftServerMixin.java | 1 + .../chunk_system/io/MoonriseRegionFileIO.java | 406 ++++++++++++------ .../scheduling/ChunkTaskScheduler.java | 4 + .../scheduling/NewChunkHolder.java | 36 +- 4 files changed, 313 insertions(+), 134 deletions(-) diff --git a/src/main/java/ca/spottedleaf/moonrise/mixin/chunk_system/MinecraftServerMixin.java b/src/main/java/ca/spottedleaf/moonrise/mixin/chunk_system/MinecraftServerMixin.java index 5042b9bf..df362204 100644 --- a/src/main/java/ca/spottedleaf/moonrise/mixin/chunk_system/MinecraftServerMixin.java +++ b/src/main/java/ca/spottedleaf/moonrise/mixin/chunk_system/MinecraftServerMixin.java @@ -240,6 +240,7 @@ private void noOpClose(final ServerLevel instance) {} private void closeIOThreads(final CallbackInfo ci) { LOGGER.info("Waiting for I/O tasks to complete..."); MoonriseRegionFileIO.flush((MinecraftServer)(Object)this); + LOGGER.info("All I/O tasks to complete"); if ((Object)this instanceof DedicatedServer) { MoonriseCommon.haltExecutors(); } diff --git a/src/main/java/ca/spottedleaf/moonrise/patches/chunk_system/io/MoonriseRegionFileIO.java b/src/main/java/ca/spottedleaf/moonrise/patches/chunk_system/io/MoonriseRegionFileIO.java index 9966e1ed..4abeabf4 100644 --- a/src/main/java/ca/spottedleaf/moonrise/patches/chunk_system/io/MoonriseRegionFileIO.java +++ b/src/main/java/ca/spottedleaf/moonrise/patches/chunk_system/io/MoonriseRegionFileIO.java @@ -1,6 +1,8 @@ package ca.spottedleaf.moonrise.patches.chunk_system.io; import ca.spottedleaf.concurrentutil.collection.MultiThreadedQueue; +import ca.spottedleaf.concurrentutil.completable.CallbackCompletable; +import ca.spottedleaf.concurrentutil.completable.Completable; import ca.spottedleaf.concurrentutil.executor.Cancellable; import ca.spottedleaf.concurrentutil.executor.PrioritisedExecutor; import ca.spottedleaf.concurrentutil.executor.queue.PrioritisedTaskQueue; @@ -12,6 +14,7 @@ import ca.spottedleaf.moonrise.common.util.TickThread; import ca.spottedleaf.moonrise.common.util.WorldUtil; import ca.spottedleaf.moonrise.patches.chunk_system.level.ChunkSystemServerLevel; +import it.unimi.dsi.fastutil.objects.ReferenceOpenHashSet; import net.minecraft.nbt.CompoundTag; import net.minecraft.server.MinecraftServer; import net.minecraft.server.level.ServerLevel; @@ -23,14 +26,12 @@ import java.io.DataOutputStream; import java.io.IOException; import java.lang.invoke.VarHandle; -import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Consumer; -import java.util.function.Supplier; public final class MoonriseRegionFileIO { @@ -215,30 +216,6 @@ public static Priority getIOBlockingPriorityForCurrentThread() { return Priority.HIGHEST; } - /** - * Returns the current {@code CompoundTag} pending for write for the specified chunk & regionfile type. - * Note that this does not copy the result, so do not modify the result returned. - * - * @param world Specified world. - * @param chunkX Specified chunk x. - * @param chunkZ Specified chunk z. - * @param type Specified regionfile type. - * - * @return The compound tag associated for the specified chunk. {@code null} if no write was pending, or if {@code null} is the write pending. - */ - public static CompoundTag getPendingWrite(final ServerLevel world, final int chunkX, final int chunkZ, final RegionFileType type) { - final RegionDataController taskController = getControllerFor(world, type); - final ChunkIOTask task = taskController.chunkTasks.get(CoordinateUtils.getChunkKey(chunkX, chunkZ)); - - if (task == null) { - return null; - } - - final CompoundTag ret = task.inProgressWrite; - - return ret == ChunkIOTask.NOTHING_TO_WRITE ? null : ret; - } - /** * Returns the priority for the specified regionfile type for the specified chunk. * @param world Specified world. @@ -444,27 +421,106 @@ public static void scheduleSave(final ServerLevel world, final int chunkX, final */ public static void scheduleSave(final ServerLevel world, final int chunkX, final int chunkZ, final CompoundTag data, final RegionFileType type, final Priority priority) { + scheduleSave( + world, chunkX, chunkZ, + (final BiConsumer consumer) -> { + consumer.accept(data, null); + }, null, type, priority + ); + } + + /** + * Schedules the chunk data to be written asynchronously. + *

+ * Impl notes: + *

+ *
  • + * This function presumes a chunk load for the coordinates is not called during this function (anytime after is OK). This means + * saves must be scheduled before a chunk is unloaded. + *
  • + *
  • + * Writes may be called concurrently, although only the "later" write will go through. + *
  • + *
  • + * The specified write task, if not null, will have its priority controlled by the scheduler. + *
  • + * + * @param world Chunk's world + * @param chunkX Chunk's x coordinate + * @param chunkZ Chunk's z coordinate + * @param completable Chunk's pending data + * @param writeTask The task responsible for completing the pending chunk data + * @param type The regionfile type to write to. + * @param priority The minimum priority to schedule at. + * + * @throws IllegalStateException If the file io thread has shutdown. + */ + public static void scheduleSave(final ServerLevel world, final int chunkX, final int chunkZ, final CallbackCompletable completable, + final PrioritisedExecutor.PrioritisedTask writeTask, final RegionFileType type, final Priority priority) { + scheduleSave(world, chunkX, chunkZ, completable::addWaiter, writeTask, type, priority); + } + + /** + * Schedules the chunk data to be written asynchronously. + *

    + * Impl notes: + *

    + *
  • + * This function presumes a chunk load for the coordinates is not called during this function (anytime after is OK). This means + * saves must be scheduled before a chunk is unloaded. + *
  • + *
  • + * Writes may be called concurrently, although only the "later" write will go through. + *
  • + *
  • + * The specified write task, if not null, will have its priority controlled by the scheduler. + *
  • + * + * @param world Chunk's world + * @param chunkX Chunk's x coordinate + * @param chunkZ Chunk's z coordinate + * @param completable Chunk's pending data + * @param writeTask The task responsible for completing the pending chunk data + * @param type The regionfile type to write to. + * @param priority The minimum priority to schedule at. + * + * @throws IllegalStateException If the file io thread has shutdown. + */ + public static void scheduleSave(final ServerLevel world, final int chunkX, final int chunkZ, final Completable completable, + final PrioritisedExecutor.PrioritisedTask writeTask, final RegionFileType type, final Priority priority) { + scheduleSave(world, chunkX, chunkZ, completable::whenComplete, writeTask, type, priority); + } + + private static void scheduleSave(final ServerLevel world, final int chunkX, final int chunkZ, final Consumer> scheduler, + final PrioritisedExecutor.PrioritisedTask writeTask, final RegionFileType type, final Priority priority) { final RegionDataController taskController = getControllerFor(world, type); final boolean[] created = new boolean[1]; - final long key = CoordinateUtils.getChunkKey(chunkX, chunkZ); - final ChunkIOTask task = taskController.chunkTasks.compute(key, (final long keyInMap, final ChunkIOTask taskRunning) -> { - if (taskRunning == null || taskRunning.failedWrite) { - // no task is scheduled or the previous write failed - meaning we need to overwrite it + final ChunkIOTask.InProgressWrite write = new ChunkIOTask.InProgressWrite(writeTask); + final ChunkIOTask task = taskController.chunkTasks.compute(CoordinateUtils.getChunkKey(chunkX, chunkZ), + (final long keyInMap, final ChunkIOTask taskRunning) -> { + if (taskRunning == null || taskRunning.failedWrite) { + // no task is scheduled or the previous write failed - meaning we need to overwrite it - // create task - final ChunkIOTask newTask = new ChunkIOTask( - world, taskController, chunkX, chunkZ, priority, new ChunkIOTask.InProgressRead(), data - ); - created[0] = true; + // create task + final ChunkIOTask newTask = new ChunkIOTask( + world, taskController, chunkX, chunkZ, priority, new ChunkIOTask.InProgressRead() + ); - return newTask; - } + newTask.pushPendingWrite(write); - taskRunning.inProgressWrite = data; + created[0] = true; - return taskRunning; - }); + return newTask; + } + + taskRunning.pushPendingWrite(write); + + return taskRunning; + } + ); + + write.schedule(task, scheduler); if (created[0]) { taskController.startTask(task); @@ -711,7 +767,7 @@ public static Cancellable loadDataAsync(final ServerLevel world, final int chunk // set up task final ChunkIOTask newTask = new ChunkIOTask( - world, taskController, chunkX, chunkZ, priority, new ChunkIOTask.InProgressRead(), ChunkIOTask.NOTHING_TO_WRITE + world, taskController, chunkX, chunkZ, priority, new ChunkIOTask.InProgressRead() ); newTask.inProgressRead.addToAsyncWaiters(onComplete); @@ -719,22 +775,33 @@ public static Cancellable loadDataAsync(final ServerLevel world, final int chunk return newTask; } - final CompoundTag pendingWrite = running.inProgressWrite; + final ChunkIOTask.InProgressWrite pendingWrite = running.inProgressWrite; - if (pendingWrite == ChunkIOTask.NOTHING_TO_WRITE) { + if (pendingWrite == null) { // need to add to waiters here, because the regionfile thread will use compute() to lock and check for cancellations if (!running.inProgressRead.addToAsyncWaiters(onComplete)) { callbackInfo.data = running.inProgressRead.value; callbackInfo.throwable = running.inProgressRead.throwable; callbackInfo.completeNow = true; } + + callbackInfo.read = running.inProgressRead; + return running; } // at this stage we have to use the in progress write's data to avoid an order issue - callbackInfo.data = pendingWrite; - callbackInfo.throwable = null; - callbackInfo.completeNow = true; + + if (!pendingWrite.addToAsyncWaiters(onComplete)) { + // data is ready now + callbackInfo.data = pendingWrite.value; + callbackInfo.throwable = pendingWrite.throwable; + callbackInfo.completeNow = true; + return running; + } + + callbackInfo.write = pendingWrite; + return running; }; @@ -755,7 +822,7 @@ public static Cancellable loadDataAsync(final ServerLevel world, final int chunk ret.raisePriority(priority); } - return new CancellableRead(onComplete, ret); + return new CancellableRead(onComplete, callbackInfo.read, callbackInfo.write); } private static final class ImmediateCallbackCompletion { @@ -764,6 +831,8 @@ private static final class ImmediateCallbackCompletion { private Throwable throwable; private boolean completeNow; private boolean tasksNeedReadScheduling; + private ChunkIOTask.InProgressRead read; + private ChunkIOTask.InProgressWrite write; } @@ -802,26 +871,40 @@ public static CompoundTag loadData(final ServerLevel world, final int chunkX, fi private static final class CancellableRead implements Cancellable { private BiConsumer callback; - private ChunkIOTask task; + private ChunkIOTask.InProgressRead read; + private ChunkIOTask.InProgressWrite write; - private CancellableRead(final BiConsumer callback, final ChunkIOTask task) { + private CancellableRead(final BiConsumer callback, + final ChunkIOTask.InProgressRead read, + final ChunkIOTask.InProgressWrite write) { this.callback = callback; - this.task = task; + this.read = read; + this.write = write; } @Override public boolean cancel() { final BiConsumer callback = this.callback; - final ChunkIOTask task = this.task; + final ChunkIOTask.InProgressRead read = this.read; + final ChunkIOTask.InProgressWrite write = this.write; - if (callback == null || task == null) { + if (callback == null || (read == null && write == null)) { return false; } this.callback = null; - this.task = null; - - return task.inProgressRead.cancel(callback); + this.read = null; + this.write = null; + + if (read != null) { + return read.cancel(callback); + } + if (write != null) { + return write.cancel(callback); + } + + // unreachable + throw new InternalError(); } } @@ -854,8 +937,6 @@ public boolean cancel() { private static final class ChunkIOTask { - private static final CompoundTag NOTHING_TO_WRITE = new CompoundTag(); - private final ServerLevel world; private final RegionDataController regionDataController; private final int chunkX; @@ -864,22 +945,21 @@ private static final class ChunkIOTask { private PrioritisedExecutor.PrioritisedTask currentTask; private final InProgressRead inProgressRead; - private volatile CompoundTag inProgressWrite; + private volatile InProgressWrite inProgressWrite; + private final ReferenceOpenHashSet allPendingWrites = new ReferenceOpenHashSet<>(); private RegionDataController.ReadData readData; private RegionDataController.WriteData writeData; private boolean failedWrite; public ChunkIOTask(final ServerLevel world, final RegionDataController regionDataController, - final int chunkX, final int chunkZ, final Priority priority, final InProgressRead inProgressRead, - final CompoundTag inProgressWrite) { + final int chunkX, final int chunkZ, final Priority priority, final InProgressRead inProgressRead) { this.world = world; this.regionDataController = regionDataController; this.chunkX = chunkX; this.chunkZ = chunkZ; this.priority = priority; this.inProgressRead = inProgressRead; - this.inProgressWrite = inProgressWrite; } public Priority getPriority() { @@ -888,16 +968,26 @@ public Priority getPriority() { } } + // must hold lock on this object + private void updatePriority(final Priority priority) { + this.priority = priority; + if (this.currentTask != null) { + this.currentTask.setPriority(priority); + } + for (final InProgressWrite write : this.allPendingWrites) { + if (write.writeTask != null) { + write.writeTask.setPriority(priority); + } + } + } + public boolean setPriority(final Priority priority) { synchronized (this) { if (this.priority == priority) { return false; } - this.priority = priority; - if (this.currentTask != null) { - this.currentTask.setPriority(priority); - } + this.updatePriority(priority); return true; } @@ -909,10 +999,7 @@ public boolean raisePriority(final Priority priority) { return false; } - this.priority = priority; - if (this.currentTask != null) { - this.currentTask.setPriority(priority); - } + this.updatePriority(priority); return true; } @@ -924,15 +1011,28 @@ public boolean lowerPriority(final Priority priority) { return false; } - this.priority = priority; - if (this.currentTask != null) { - this.currentTask.setPriority(priority); - } + this.updatePriority(priority); return true; } } + private void pushPendingWrite(final InProgressWrite write) { + this.inProgressWrite = write; + synchronized (this) { + this.allPendingWrites.add(write); + if (write.writeTask != null) { + write.writeTask.setPriority(this.priority); + } + } + } + + private void pendingWriteComplete(final InProgressWrite write) { + synchronized (this) { + this.allPendingWrites.remove(write); + } + } + public void scheduleReadIO() { final PrioritisedExecutor.PrioritisedTask task; synchronized (this) { @@ -964,7 +1064,7 @@ private void performReadIO() { canRead[0] = false; } - if (valueInMap.inProgressWrite != NOTHING_TO_WRITE) { + if (valueInMap.inProgressWrite != null) { return valueInMap; } @@ -1050,8 +1150,7 @@ private void performReadDecompress() { this.finishRead(compoundTag, throwable); if (!this.tryAbortWrite()) { - // we are already on the compression executor, don't bother scheduling - this.performWriteCompress(); + this.scheduleWriteCompress(); } } @@ -1060,17 +1159,24 @@ private void finishRead(final CompoundTag compoundTag, final Throwable throwable } public void scheduleWriteCompress() { + final InProgressWrite inProgressWrite = this.inProgressWrite; + final PrioritisedExecutor.PrioritisedTask task; synchronized (this) { - task = this.regionDataController.compressionExecutor.createTask(this::performWriteCompress, this.priority); + task = this.regionDataController.compressionExecutor.createTask(() -> { + ChunkIOTask.this.performWriteCompress(inProgressWrite); + }, this.priority); this.currentTask = task; } - task.queue(); + + inProgressWrite.addToWaiters(this, (final CompoundTag data, final Throwable throwable) -> { + task.queue(); + }); } private boolean tryAbortWrite() { final long chunkKey = CoordinateUtils.getChunkKey(this.chunkX, this.chunkZ); - if (this.inProgressWrite == NOTHING_TO_WRITE) { + if (this.inProgressWrite == null) { final ChunkIOTask inMap = this.regionDataController.chunkTasks.compute(chunkKey, (final long keyInMap, final ChunkIOTask valueInMap) -> { if (valueInMap == null) { throw new IllegalStateException("Write completed concurrently, expected this task: " + ChunkIOTask.this.toString() + ", report this!"); @@ -1079,7 +1185,7 @@ private boolean tryAbortWrite() { throw new IllegalStateException("Chunk task mismatch, expected this task: " + ChunkIOTask.this.toString() + ", got: " + valueInMap.toString() + ", report this!"); } - if (valueInMap.inProgressWrite != NOTHING_TO_WRITE) { + if (valueInMap.inProgressWrite != null) { return valueInMap; } @@ -1095,61 +1201,62 @@ private boolean tryAbortWrite() { return false; } - private void performWriteCompress() { - for (;;) { - final CompoundTag write = this.inProgressWrite; - if (write == NOTHING_TO_WRITE) { - throw new IllegalStateException("Should be writable"); - } + private void performWriteCompress(final InProgressWrite inProgressWrite) { + final CompoundTag write = inProgressWrite.value; + if (!inProgressWrite.isComplete()) { + throw new IllegalStateException("Should be writable"); + } - RegionDataController.WriteData writeData = null; - boolean failedWrite = false; + RegionDataController.WriteData writeData = null; + boolean failedWrite = false; - try { - writeData = this.regionDataController.startWrite(this.chunkX, this.chunkZ, write); - } catch (final Throwable thr) { - // TODO implement this? + try { + writeData = this.regionDataController.startWrite(this.chunkX, this.chunkZ, write); + } catch (final Throwable thr) { + // TODO implement this? /*if (thr instanceof RegionFileStorage.RegionFileSizeException) { final int maxSize = RegionFile.MAX_CHUNK_SIZE / (1024 * 1024); LOGGER.error("Chunk at (" + this.chunkX + "," + this.chunkZ + ") in '" + WorldUtil.getWorldName(this.world) + "' exceeds max size of " + maxSize + "MiB, it has been deleted from disk."); } else */ - { - failedWrite = thr instanceof IOException; - LOGGER.error("Failed to write chunk data for task: " + this.toString(), thr); - } + { + failedWrite = thr instanceof IOException; + LOGGER.error("Failed to write chunk data for task: " + this.toString(), thr); } + } - if (writeData == null) { - // null if a throwable was encountered + if (writeData == null) { + // null if a throwable was encountered - // we cannot continue to the I/O stage here, so try to complete + // we cannot continue to the I/O stage here, so try to complete - if (this.tryCompleteWrite(write, failedWrite)) { - return; - } else { - // fetch new data and try again - continue; - } + if (this.tryCompleteWrite(inProgressWrite, failedWrite)) { + return; } else { - // writeData != null && !failedWrite - // we can continue to I/O stage - this.writeData = writeData; - this.scheduleWriteIO(); + // fetch new data and try again + this.scheduleWriteCompress(); return; } + } else { + // writeData != null && !failedWrite + // we can continue to I/O stage + this.writeData = writeData; + this.scheduleWriteIO(inProgressWrite); + return; } } - private void scheduleWriteIO() { + private void scheduleWriteIO(final InProgressWrite inProgressWrite) { final PrioritisedExecutor.PrioritisedTask task; synchronized (this) { - task = this.regionDataController.ioScheduler.createTask(this.chunkX, this.chunkZ, this::runWriteIO, this.priority); + task = this.regionDataController.ioScheduler.createTask(this.chunkX, this.chunkZ, () -> { + ChunkIOTask.this.runWriteIO(inProgressWrite); + }, this.priority); this.currentTask = task; } task.queue(); } - private void runWriteIO() { + private void runWriteIO(final InProgressWrite inProgressWrite) { RegionDataController.WriteData writeData = this.writeData; this.writeData = null; @@ -1162,14 +1269,14 @@ private void runWriteIO() { LOGGER.error("Failed to write chunk data for task: " + this.toString(), thr); } - if (!this.tryCompleteWrite(writeData.input(), failedWrite)) { + if (!this.tryCompleteWrite(inProgressWrite, failedWrite)) { // fetch new data and try again this.scheduleWriteCompress(); } return; } - private boolean tryCompleteWrite(final CompoundTag written, final boolean failedWrite) { + private boolean tryCompleteWrite(final InProgressWrite written, final boolean failedWrite) { final long chunkKey = CoordinateUtils.getChunkKey(this.chunkX, this.chunkZ); final boolean[] done = new boolean[] { false }; @@ -1216,14 +1323,6 @@ public boolean hasNoWaiters() { return this.callbacks.isEmpty(); } - public CompoundTag getValue() { - return this.value; - } - - public Throwable getThrowable() { - return this.throwable; - } - public boolean addToAsyncWaiters(final BiConsumer callback) { return this.callbacks.add(callback); } @@ -1241,11 +1340,72 @@ public void complete(final ChunkIOTask task, final CompoundTag value, final Thro try { consumer.accept(value == null ? null : value.copy(), throwable); } catch (final Throwable thr) { - LOGGER.error("Callback " + ConcurrentUtil.genericToString(consumer) + " failed to handle chunk data for task " + task.toString(), thr); + LOGGER.error("Callback " + ConcurrentUtil.genericToString(consumer) + " failed to handle chunk data (read) for task " + task.toString(), thr); } } } } + + private static final class InProgressWrite { + + private static final Logger LOGGER = LoggerFactory.getLogger(InProgressWrite.class); + + private CompoundTag value; + private Throwable throwable; + private volatile boolean complete; + private final MultiThreadedQueue> callbacks = new MultiThreadedQueue<>(); + + private final PrioritisedExecutor.PrioritisedTask writeTask; + + public InProgressWrite(final PrioritisedExecutor.PrioritisedTask writeTask) { + this.writeTask = writeTask; + } + + public boolean isComplete() { + return this.complete; + } + + public void schedule(final ChunkIOTask task, final Consumer> scheduler) { + scheduler.accept((final CompoundTag data, final Throwable throwable) -> { + InProgressWrite.this.complete(task, data, throwable); + }); + } + + public boolean addToAsyncWaiters(final BiConsumer callback) { + return this.callbacks.add(callback); + } + + public void addToWaiters(final ChunkIOTask task, final BiConsumer consumer) { + if (!this.callbacks.add(consumer)) { + this.syncAccept(task, consumer, this.value, this.throwable); + } + } + + private void syncAccept(final ChunkIOTask task, final BiConsumer consumer, final CompoundTag value, final Throwable throwable) { + try { + consumer.accept(value == null ? null : value.copy(), throwable); + } catch (final Throwable thr) { + LOGGER.error("Callback " + ConcurrentUtil.genericToString(consumer) + " failed to handle chunk data (write) for task " + task.toString(), thr); + } + } + + public void complete(final ChunkIOTask task, final CompoundTag value, final Throwable throwable) { + this.value = value; + this.throwable = throwable; + this.complete = true; + + task.pendingWriteComplete(this); + + BiConsumer consumer; + while ((consumer = this.callbacks.pollOrBlockAdds()) != null) { + this.syncAccept(task, consumer, value, throwable); + } + } + + public boolean cancel(final BiConsumer callback) { + return this.callbacks.remove(callback); + } + } } public static abstract class RegionDataController { diff --git a/src/main/java/ca/spottedleaf/moonrise/patches/chunk_system/scheduling/ChunkTaskScheduler.java b/src/main/java/ca/spottedleaf/moonrise/patches/chunk_system/scheduling/ChunkTaskScheduler.java index c3c7536d..120ce317 100644 --- a/src/main/java/ca/spottedleaf/moonrise/patches/chunk_system/scheduling/ChunkTaskScheduler.java +++ b/src/main/java/ca/spottedleaf/moonrise/patches/chunk_system/scheduling/ChunkTaskScheduler.java @@ -121,6 +121,7 @@ public static int getTicketLevel(final ChunkStatus status) { public final PrioritisedThreadPool.ExecutorGroup.ThreadPoolExecutor loadExecutor; public final PrioritisedThreadPool.ExecutorGroup.ThreadPoolExecutor ioExecutor; public final PrioritisedThreadPool.ExecutorGroup.ThreadPoolExecutor compressionExecutor; + public final PrioritisedThreadPool.ExecutorGroup.ThreadPoolExecutor saveExecutor; private final PrioritisedTaskQueue mainThreadExecutor = new PrioritisedTaskQueue(); @@ -288,6 +289,7 @@ public ChunkTaskScheduler(final ServerLevel world) { this.ioExecutor = MoonriseCommon.SERVER_REGION_IO_GROUP.createExecutor(-1, MoonriseCommon.IO_QUEUE_HOLD_TIME, 0); // we need a separate executor here so that on shutdown we can continue to process I/O tasks this.compressionExecutor = MoonriseCommon.LOAD_GROUP.createExecutor(-1, MoonriseCommon.WORKER_QUEUE_HOLD_TIME, 0); + this.saveExecutor = MoonriseCommon.LOAD_GROUP.createExecutor(-1, MoonriseCommon.WORKER_QUEUE_HOLD_TIME, 0); this.chunkHolderManager = new ChunkHolderManager(world, this); } @@ -855,12 +857,14 @@ public boolean halt(final boolean sync, final long maxWaitNS) { public boolean haltIO(final boolean sync, final long maxWaitNS) { this.ioExecutor.halt(); + this.saveExecutor.halt(); this.compressionExecutor.halt(); if (sync) { final long time = System.nanoTime(); for (long failures = 9L;; failures = ConcurrentUtil.linearLongBackoff(failures, 500_000L, 50_000_000L)) { if ( !this.ioExecutor.isActive() && + !this.saveExecutor.isActive() && !this.compressionExecutor.isActive() ) { return true; diff --git a/src/main/java/ca/spottedleaf/moonrise/patches/chunk_system/scheduling/NewChunkHolder.java b/src/main/java/ca/spottedleaf/moonrise/patches/chunk_system/scheduling/NewChunkHolder.java index 233a74e3..66e99477 100644 --- a/src/main/java/ca/spottedleaf/moonrise/patches/chunk_system/scheduling/NewChunkHolder.java +++ b/src/main/java/ca/spottedleaf/moonrise/patches/chunk_system/scheduling/NewChunkHolder.java @@ -860,7 +860,7 @@ UnloadState unloadStage1() { if (chunk != null) { final LazyRunnable toRun = new LazyRunnable(); - this.chunkDataUnload = new UnloadTask(new CallbackCompletable<>(), this.scheduler.loadExecutor.createTask(toRun), toRun); + this.chunkDataUnload = new UnloadTask(new CallbackCompletable<>(), this.scheduler.saveExecutor.createTask(toRun), toRun); } if (poiChunk != null) { this.poiDataUnload = new UnloadTask(new CallbackCompletable<>(), null, null); @@ -1747,25 +1747,39 @@ private boolean saveChunk(final ChunkAccess chunk, final boolean unloading) { } return false; } - boolean completing = false; try { final SerializableChunkData chunkData = SerializableChunkData.copyOf(this.world, chunk); PlatformHooks.get().chunkSyncSave(this.world, chunk, chunkData); - // TODO implement proper async save - final CompoundTag save = chunkData.write(); + chunk.setUnsaved(false); + + final CallbackCompletable completable = new CallbackCompletable<>(); + + final Runnable run = () -> { + final CompoundTag data = chunkData.write(); + + completable.complete(data); + + if (unloading) { + NewChunkHolder.this.completeAsyncUnloadDataSave(MoonriseRegionFileIO.RegionFileType.CHUNK_DATA, data); + } + }; + + final PrioritisedExecutor.PrioritisedTask task; if (unloading) { - completing = true; - this.completeAsyncUnloadDataSave(MoonriseRegionFileIO.RegionFileType.CHUNK_DATA, save); + this.chunkDataUnload.toRun().setRunnable(run); + task = this.chunkDataUnload.task(); } else { - MoonriseRegionFileIO.scheduleSave(this.world, this.chunkX, this.chunkZ, save, MoonriseRegionFileIO.RegionFileType.CHUNK_DATA); + task = this.scheduler.saveExecutor.createTask(run); } - chunk.setUnsaved(false); + + task.queue(); + + MoonriseRegionFileIO.scheduleSave( + this.world, this.chunkX, this.chunkZ, completable, task, MoonriseRegionFileIO.RegionFileType.CHUNK_DATA, Priority.NORMAL + ); } catch (final Throwable thr) { LOGGER.error("Failed to save chunk data (" + this.chunkX + "," + this.chunkZ + ") in world '" + WorldUtil.getWorldName(this.world) + "'", thr); - if (unloading && !completing) { - this.completeAsyncUnloadDataSave(MoonriseRegionFileIO.RegionFileType.CHUNK_DATA, null); - } } return true;