Skip to content

Commit

Permalink
Update to ConcurrentUtil 0.0.2, and refactor I/O + worker pool
Browse files Browse the repository at this point in the history
With ConcurrentUtil 0.0.2, we can make all thread pools
(worker+I/O) have their thread counts configurable on the fly.

The new I/O system splits the compression/decompression work
from the I/O. The compression/decompression is ran on the worker
pool, while the I/O is ran on the I/O pool. This allows for
better cpu utilisation control on systems with low core counts,
and allows higher read/write speeds on systems with higher
core counts.

Additionally, the I/O scheduling for thread counts > 1 is also
improved as it longer selects a thread to schedule based on the
chunk location.
  • Loading branch information
Spottedleaf committed Aug 7, 2024
1 parent 6a2c6d2 commit bfaddd3
Show file tree
Hide file tree
Showing 42 changed files with 1,728 additions and 915 deletions.
2 changes: 1 addition & 1 deletion ConcurrentUtil
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ dependencies {
mappings loom.officialMojangMappings()
modImplementation "net.fabricmc:fabric-loader:${project.loader_version}"

shadow('ca.spottedleaf:concurrentutil:0.0.1-SNAPSHOT')
shadow('ca.spottedleaf:concurrentutil:0.0.2-SNAPSHOT')
shadow('org.yaml:snakeyaml:2.2')
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package ca.spottedleaf.moonrise.common.config;

public interface PostDeserializeHook {

public void deserialize();

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ca.spottedleaf.moonrise.common.config.adapter;

import ca.spottedleaf.moonrise.common.config.PostDeserializeHook;
import ca.spottedleaf.moonrise.common.config.adapter.collection.CollectionTypeAdapter;
import ca.spottedleaf.moonrise.common.config.adapter.collection.ListTypeAdapter;
import ca.spottedleaf.moonrise.common.config.adapter.collection.SortedMapTypeAdapter;
Expand Down Expand Up @@ -228,6 +229,10 @@ public T deserialize(final TypeAdapterRegistry registry, final Object input, fin
field.field.set(ret, field.adapter.deserialize(registry, fieldValue, field.field.getGenericType()));
}

if (ret instanceof PostDeserializeHook hook) {
hook.deserialize();
}

return ret;
} catch (final Exception ex) {
throw new RuntimeException(ex);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package ca.spottedleaf.moonrise.common.config.moonrise;

import ca.spottedleaf.moonrise.common.config.PostDeserializeHook;
import ca.spottedleaf.moonrise.common.config.annotation.Adaptable;
import ca.spottedleaf.moonrise.common.config.annotation.Serializable;
import ca.spottedleaf.moonrise.common.config.moonrise.type.DefaultedValue;
import ca.spottedleaf.moonrise.common.config.type.Duration;
import ca.spottedleaf.moonrise.common.util.MoonriseCommon;
import ca.spottedleaf.moonrise.patches.chunk_system.scheduling.ChunkTaskScheduler;

@Adaptable
public final class MoonriseConfig {
Expand Down Expand Up @@ -125,18 +128,33 @@ public static final class ChunkSaving {

@Serializable(
comment = """
Configuration options which control the behavior of the common threadpool workers.
"""
)
public WorkerPool workerPool = new WorkerPool();

@Adaptable
public static final class WorkerPool implements PostDeserializeHook {
@Serializable(
comment = """
Set the number of shared worker threads to be used by chunk rendering,
chunk loading, chunk generation. If the value is <= 0, then the number
of threads will automatically be determined.
"""
)
public int workerThreads = -1;
)
public int workerThreads = -1;

@Override
public void deserialize() {
MoonriseCommon.adjustWorkerThreads(this);
}
}

@Serializable
public ChunkSystem chunkSystem = new ChunkSystem();

@Adaptable
public static final class ChunkSystem {
public static final class ChunkSystem implements PostDeserializeHook {

@Serializable(
comment = """
Expand All @@ -157,6 +175,11 @@ gen and are saturating the population generation (~10 threads of the worker pool
"""
)
public boolean populationGenParallelism = false;

@Override
public void deserialize() {
ChunkTaskScheduler.init(this);
}
}

@Serializable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package ca.spottedleaf.moonrise.common.misc;

import ca.spottedleaf.concurrentutil.util.ConcurrentUtil;
import java.lang.invoke.VarHandle;

public final class LazyRunnable implements Runnable {

private volatile Runnable toRun;
private static final VarHandle TO_RUN_HANDLE = ConcurrentUtil.getVarHandle(LazyRunnable.class, "toRun", Runnable.class);

public void setRunnable(final Runnable run) {
final Runnable prev = (Runnable)TO_RUN_HANDLE.compareAndExchange(this, (Runnable)null, run);
if (prev != null) {
throw new IllegalStateException("Runnable already set");
}
}

@Override
public void run() {
((Runnable)TO_RUN_HANDLE.getVolatile(this)).run();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ca.spottedleaf.moonrise.common.util;

import ca.spottedleaf.concurrentutil.executor.standard.PrioritisedExecutor;
import ca.spottedleaf.concurrentutil.util.Priority;
import ca.spottedleaf.moonrise.patches.chunk_system.level.ChunkSystemServerLevel;
import ca.spottedleaf.moonrise.patches.chunk_system.level.chunk.ChunkSystemLevelChunk;
import ca.spottedleaf.moonrise.patches.chunk_system.player.RegionizedPlayerChunkLoader;
Expand All @@ -23,27 +23,27 @@ public final class ChunkSystem {
private static final Logger LOGGER = LogUtils.getLogger();

public static void scheduleChunkTask(final ServerLevel level, final int chunkX, final int chunkZ, final Runnable run) {
scheduleChunkTask(level, chunkX, chunkZ, run, PrioritisedExecutor.Priority.NORMAL);
scheduleChunkTask(level, chunkX, chunkZ, run, Priority.NORMAL);
}

public static void scheduleChunkTask(final ServerLevel level, final int chunkX, final int chunkZ, final Runnable run, final PrioritisedExecutor.Priority priority) {
public static void scheduleChunkTask(final ServerLevel level, final int chunkX, final int chunkZ, final Runnable run, final Priority priority) {
((ChunkSystemServerLevel)level).moonrise$getChunkTaskScheduler().scheduleChunkTask(chunkX, chunkZ, run, priority);
}

public static void scheduleChunkLoad(final ServerLevel level, final int chunkX, final int chunkZ, final boolean gen,
final ChunkStatus toStatus, final boolean addTicket, final PrioritisedExecutor.Priority priority,
final ChunkStatus toStatus, final boolean addTicket, final Priority priority,
final Consumer<ChunkAccess> onComplete) {
((ChunkSystemServerLevel)level).moonrise$getChunkTaskScheduler().scheduleChunkLoad(chunkX, chunkZ, gen, toStatus, addTicket, priority, onComplete);
}

public static void scheduleChunkLoad(final ServerLevel level, final int chunkX, final int chunkZ, final ChunkStatus toStatus,
final boolean addTicket, final PrioritisedExecutor.Priority priority, final Consumer<ChunkAccess> onComplete) {
final boolean addTicket, final Priority priority, final Consumer<ChunkAccess> onComplete) {
((ChunkSystemServerLevel)level).moonrise$getChunkTaskScheduler().scheduleChunkLoad(chunkX, chunkZ, toStatus, addTicket, priority, onComplete);
}

public static void scheduleTickingState(final ServerLevel level, final int chunkX, final int chunkZ,
final FullChunkStatus toStatus, final boolean addTicket,
final PrioritisedExecutor.Priority priority, final Consumer<LevelChunk> onComplete) {
final Priority priority, final Consumer<LevelChunk> onComplete) {
((ChunkSystemServerLevel)level).moonrise$getChunkTaskScheduler().scheduleTickingState(chunkX, chunkZ, toStatus, addTicket, priority, onComplete);
}

Expand Down
105 changes: 78 additions & 27 deletions src/main/java/ca/spottedleaf/moonrise/common/util/MoonriseCommon.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ca.spottedleaf.moonrise.common.util;

import ca.spottedleaf.concurrentutil.executor.standard.PrioritisedThreadPool;
import ca.spottedleaf.concurrentutil.executor.thread.PrioritisedThreadPool;
import ca.spottedleaf.moonrise.common.config.adapter.TypeAdapterRegistry;
import ca.spottedleaf.moonrise.common.config.moonrise.MoonriseConfig;
import ca.spottedleaf.moonrise.common.config.config.YamlConfig;
Expand All @@ -9,11 +9,77 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

public final class MoonriseCommon {

private static final Logger LOGGER = LoggerFactory.getLogger(MoonriseCommon.class);

public static final PrioritisedThreadPool WORKER_POOL = new PrioritisedThreadPool(
new Consumer<Thread>() {
private final AtomicInteger idGenerator = new AtomicInteger();

@Override
public void accept(Thread thread) {
thread.setDaemon(true);
thread.setName("Moonrise Common Worker #" + this.idGenerator.getAndIncrement());
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread thread, final Throwable throwable) {
LOGGER.error("Uncaught exception in thread " + thread.getName(), throwable);
}
});
}
}
);
public static final long WORKER_QUEUE_HOLD_TIME = (long)(20.0e6); // 20ms
public static final int CLIENT_DIVISION = 0;
public static final PrioritisedThreadPool.ExecutorGroup RENDER_EXECUTOR_GROUP = MoonriseCommon.WORKER_POOL.createExecutorGroup(CLIENT_DIVISION, 0);
public static final int SERVER_DIVISION = 1;
public static final PrioritisedThreadPool.ExecutorGroup PARALLEL_GEN_GROUP = MoonriseCommon.WORKER_POOL.createExecutorGroup(SERVER_DIVISION, 0);
public static final PrioritisedThreadPool.ExecutorGroup RADIUS_AWARE_GROUP = MoonriseCommon.WORKER_POOL.createExecutorGroup(SERVER_DIVISION, 0);
public static final PrioritisedThreadPool.ExecutorGroup LOAD_GROUP = MoonriseCommon.WORKER_POOL.createExecutorGroup(SERVER_DIVISION, 0);

public static void adjustWorkerThreads(final MoonriseConfig.WorkerPool config) {
int defaultWorkerThreads = Runtime.getRuntime().availableProcessors() / 2;
if (defaultWorkerThreads <= 4) {
defaultWorkerThreads = defaultWorkerThreads <= 3 ? 1 : 2;
} else {
defaultWorkerThreads = defaultWorkerThreads / 2;
}
defaultWorkerThreads = Integer.getInteger("Moonrise.WorkerThreadCount", Integer.valueOf(defaultWorkerThreads));

int workerThreads = config.workerThreads;

if (workerThreads <= 0) {
workerThreads = defaultWorkerThreads;
}

WORKER_POOL.adjustThreadCount(workerThreads);
}

public static final PrioritisedThreadPool IO_POOL = new PrioritisedThreadPool(
new Consumer<Thread>() {
private final AtomicInteger idGenerator = new AtomicInteger();

@Override
public void accept(Thread thread) {
thread.setDaemon(true);
thread.setName("Moonrise I/O Worker #" + this.idGenerator.getAndIncrement());
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread thread, final Throwable throwable) {
LOGGER.error("Uncaught exception in thread " + thread.getName(), throwable);
}
});
}
}
);
public static final long IO_QUEUE_HOLD_TIME = (long)(100.0e6); // 100ms
public static final PrioritisedThreadPool.ExecutorGroup SERVER_REGION_IO_GROUP = IO_POOL.createExecutorGroup(SERVER_DIVISION, 0);

private static final File CONFIG_FILE = new File(System.getProperty("Moonrise.ConfigFile", "moonrise.yml"));
private static final TypeAdapterRegistry CONFIG_ADAPTERS = new TypeAdapterRegistry();
private static final YamlConfig<MoonriseConfig> CONFIG;
Expand Down Expand Up @@ -78,35 +144,20 @@ public static boolean saveConfig() {
}
}

public static final PrioritisedThreadPool WORKER_POOL;
public static final int WORKER_THREADS;
static {
int defaultWorkerThreads = Runtime.getRuntime().availableProcessors() / 2;
if (defaultWorkerThreads <= 4) {
defaultWorkerThreads = defaultWorkerThreads <= 3 ? 1 : 2;
} else {
defaultWorkerThreads = defaultWorkerThreads / 2;
public static void haltExecutors() {
MoonriseCommon.WORKER_POOL.shutdown(false);
LOGGER.info("Awaiting termination of worker pool for up to 60s...");
if (!MoonriseCommon.WORKER_POOL.join(TimeUnit.SECONDS.toMillis(60L))) {
LOGGER.error("Worker pool did not shut down in time!");
MoonriseCommon.WORKER_POOL.halt(false);
}
defaultWorkerThreads = Integer.getInteger("Moonrise.WorkerThreadCount", Integer.valueOf(defaultWorkerThreads));

int workerThreads = MoonriseCommon.getConfig().workerThreads;

if (workerThreads <= 0) {
workerThreads = defaultWorkerThreads;
MoonriseCommon.IO_POOL.shutdown(false);
LOGGER.info("Awaiting termination of I/O pool for up to 60s...");
if (!MoonriseCommon.IO_POOL.join(TimeUnit.SECONDS.toMillis(60L))) {
LOGGER.error("I/O pool did not shut down in time!");
MoonriseCommon.IO_POOL.halt(false);
}

WORKER_POOL = new PrioritisedThreadPool(
"Moonrise Worker Pool", workerThreads,
(final Thread thread, final Integer id) -> {
thread.setName("Moonrise Common Worker #" + id.intValue());
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread thread, final Throwable throwable) {
LOGGER.error("Uncaught exception in thread " + thread.getName(), throwable);
}
});
}, (long)(20.0e6)); // 20ms
WORKER_THREADS = workerThreads;
}

private MoonriseCommon() {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package ca.spottedleaf.moonrise.mixin.chunk_system;

import ca.spottedleaf.moonrise.patches.chunk_system.storage.ChunkSystemChunkBuffer;
import net.minecraft.world.level.ChunkPos;
import net.minecraft.world.level.chunk.storage.RegionFile;
import org.spongepowered.asm.mixin.Final;
import org.spongepowered.asm.mixin.Mixin;
import org.spongepowered.asm.mixin.Shadow;
import org.spongepowered.asm.mixin.Unique;
import org.spongepowered.asm.mixin.injection.At;
import org.spongepowered.asm.mixin.injection.Inject;
import org.spongepowered.asm.mixin.injection.Redirect;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

@Mixin(RegionFile.ChunkBuffer.class)
public abstract class ChunkBufferMixin extends ByteArrayOutputStream implements ChunkSystemChunkBuffer {

@Shadow
@Final
private ChunkPos pos;

@Unique
private boolean writeOnClose = true;

@Override
public final boolean moonrise$getWriteOnClose() {
return this.writeOnClose;
}

@Override
public final void moonrise$setWriteOnClose(final boolean value) {
this.writeOnClose = value;
}

@Override
public final void moonrise$write(final RegionFile regionFile) throws IOException {
regionFile.write(this.pos, ByteBuffer.wrap(this.buf, 0, this.count));
}

/**
* @reason Allow delaying write I/O until later
* @author Spottedleaf
*/
@Redirect(
method = "close",
at = @At(
value = "INVOKE",
target = "Lnet/minecraft/world/level/chunk/storage/RegionFile;write(Lnet/minecraft/world/level/ChunkPos;Ljava/nio/ByteBuffer;)V"
)
)
private void redirectClose(final RegionFile instance, final ChunkPos chunkPos, final ByteBuffer byteBuffer) throws IOException {
if (this.writeOnClose) {
instance.write(chunkPos, byteBuffer);
}
}
}
Loading

0 comments on commit bfaddd3

Please sign in to comment.