Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace custom Worker implementation with ForkJoinPool and CompletableFutures #57

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions src/main/java/fr/zcraft/quartzlib/components/commands/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,20 @@
package fr.zcraft.quartzlib.components.commands;

import fr.zcraft.quartzlib.components.commands.CommandException.Reason;
import fr.zcraft.quartzlib.components.i18n.I;
import fr.zcraft.quartzlib.components.rawtext.RawText;
import fr.zcraft.quartzlib.core.QuartzLib;
import fr.zcraft.quartzlib.tools.PluginLogger;
import fr.zcraft.quartzlib.tools.mojang.UUIDFetcher;
import fr.zcraft.quartzlib.tools.text.RawMessage;
import org.apache.commons.lang.StringUtils;
import org.bukkit.Bukkit;
import org.bukkit.command.CommandSender;
import org.bukkit.entity.Player;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -792,8 +797,18 @@ protected Player getPlayerParameter(int index) throws CommandException
* @param callback A consumer that will use the offline player's UUID
*/
public void offlinePlayerParameter(final String parameter, final Consumer<UUID> callback){
CommandWorkers cw=new CommandWorkers();
cw.OfflineNameFetch(parameter,callback);
CompletableFuture.supplyAsync(() -> {
try {
return UUIDFetcher.fetch(parameter);
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
})
.exceptionally((e) -> {
PluginLogger.warning(I.t("Error while getting player UUID"));
return null;
})
.thenAccept(callback);
}
/**
* Retrieves a player from its name at the given index, or aborts the
Expand Down

This file was deleted.

277 changes: 91 additions & 186 deletions src/main/java/fr/zcraft/quartzlib/components/worker/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,204 +29,109 @@
*/
package fr.zcraft.quartzlib.components.worker;

import fr.zcraft.quartzlib.core.QuartzLib;
import fr.zcraft.quartzlib.core.QuartzComponent;
import fr.zcraft.quartzlib.tools.PluginLogger;
import fr.zcraft.quartzlib.tools.reflection.Reflection;
import fr.zcraft.quartzlib.core.QuartzLib;

import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;

/**
* The base class for workers.
* A worker is a thread that can handle multiple tasks, which are executed in a queue.
*
*/
public abstract class Worker extends QuartzComponent
{
/*===== Static API =====*/
static private final HashMap<Class<? extends Worker>, Worker> runningWorkers = new HashMap();
static private final HashMap<Class<? extends WorkerRunnable>, Worker> runnables = new HashMap();


static protected <T> Future<T> submitToMainThread(Callable<T> callable)
{
return getCallerWorkerFromRunnable()._submitToMainThread(callable);
}

static protected void submitQuery(WorkerRunnable runnable)
{
getCallerWorker()._submitQuery(runnable);
}

static protected void submitQuery(WorkerRunnable runnable, WorkerCallback callback)
{
getCallerWorker()._submitQuery(runnable, callback);
}

static private Worker getCallerWorker()
{
Class<? extends Worker> caller = Reflection.getCallerClass(Worker.class);
if(caller == null)
throw new IllegalAccessError("Queries must be submitted from a Worker class");

return getWorker(caller);
}

static private Worker getWorker(Class<? extends Worker> workerClass)
{
Worker worker = runningWorkers.get(workerClass);
if(worker == null)
throw new IllegalStateException("Worker '" + workerClass.getName() + "' has not been correctly initialized");

return worker;
}

static private Worker getCallerWorkerFromRunnable()
{
Class<? extends WorkerRunnable> caller = Reflection.getCallerClass(WorkerRunnable.class);
if(caller == null)
throw new IllegalAccessError("Main thread queries must be submitted from a WorkerRunnable");

Worker worker = runnables.get(caller);
if(worker == null)
throw new IllegalStateException("Caller runnable does not belong to any worker");

return worker;
}

private final String name;
private final ArrayDeque<WorkerRunnable> runQueue = new ArrayDeque<>();

private final WorkerCallbackManager callbackManager;
private final WorkerMainThreadExecutor mainThreadExecutor;
private Thread thread;

public Worker()
{
String tempName = null;
WorkerAttributes attributes = getClass().getAnnotation(WorkerAttributes.class);

if(attributes != null)
{
tempName = attributes.name();
this.mainThreadExecutor = attributes.queriesMainThread() ? new WorkerMainThreadExecutor(tempName) : null;
}
else
{
this.mainThreadExecutor = null;
}

if(tempName == null || tempName.isEmpty())
tempName = getClass().getSimpleName();

this.name = tempName;
this.callbackManager = new WorkerCallbackManager(tempName);
}

public class Worker extends QuartzComponent implements ExecutorService {
private ForkJoinPool forkJoinPool = null;
private final int threadCount;

public Worker () {
this(0);
}

public Worker (int threadCount) {
this.threadCount = threadCount;
QuartzLib.loadComponent(this);
}

@Override
public void onEnable()
{
if(thread != null && thread.isAlive())
{
PluginLogger.warning("Restarting thread '{0}'.", name);
onDisable();
protected void onEnable() {
if (this.threadCount <= 0) {
this.forkJoinPool = new ForkJoinPool();
} else {
this.forkJoinPool = new ForkJoinPool(threadCount);
}
callbackManager.init();
if(mainThreadExecutor != null) mainThreadExecutor.init();
runningWorkers.put(getClass(), this);
thread = createThread();
thread.start();
}

@Override
public void onDisable()
{
thread.interrupt();
callbackManager.exit();
if(mainThreadExecutor != null) mainThreadExecutor.exit();
thread = null;
runningWorkers.remove(getClass());
}

private void run()
{
WorkerRunnable currentRunnable;

while(!Thread.interrupted())
{
synchronized(runQueue)
{
try
{
while(runQueue.isEmpty()) runQueue.wait();
}
catch(InterruptedException ex)
{
break;
}
currentRunnable = runQueue.pop();
}

try
{
callbackManager.callback(currentRunnable, currentRunnable.run());
}
catch(Throwable ex)
{
callbackManager.callback(currentRunnable, null, ex);
}
runnables.remove(currentRunnable.getClass());
}
protected void onDisable() {
this.shutdownNow();
}

private void _submitQuery(WorkerRunnable runnable)
{
attachRunnable(runnable);
synchronized(runQueue)
{
runQueue.add(runnable);
runQueue.notify();
}

/* All of the overrides of the world */

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return forkJoinPool.invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return forkJoinPool.invokeAny(tasks, timeout, unit);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return forkJoinPool.invokeAll(tasks, timeout, unit);
}

@Override
public void execute(Runnable task) {
forkJoinPool.execute(task);
}

@Override
public <T> ForkJoinTask<T> submit(Callable<T> task) {
return forkJoinPool.submit(task);
}

@Override
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
return forkJoinPool.submit(task, result);
}

@Override
public ForkJoinTask<?> submit(Runnable task) {
return forkJoinPool.submit(task);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
return forkJoinPool.invokeAll(tasks);
}

@Override
public void shutdown() {
forkJoinPool.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return forkJoinPool.shutdownNow();
}

@Override
public boolean isTerminated() {
return forkJoinPool.isTerminated();
}

@Override
public boolean isShutdown() {
return forkJoinPool.isShutdown();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return forkJoinPool.awaitTermination(timeout, unit);
}

private void _submitQuery(WorkerRunnable runnable, WorkerCallback callback)
{
callbackManager.setupCallback(runnable, callback);
_submitQuery(runnable);
}

private <T> Future<T> _submitToMainThread(Callable<T> callable)
{
if(mainThreadExecutor != null) return mainThreadExecutor.submit(callable);
return null;
}

private Thread createThread()
{
return new Thread(getName())
{
@Override
public void run()
{
Worker.this.run();
}
};
}

private void attachRunnable(WorkerRunnable runnable)
{
if(runnable.getWorker() != null && runnable.getWorker() != this)
throw new IllegalArgumentException("This runnable is already attached to another worker");
runnable.setWorker(this);
runnables.put(runnable.getClass(), this);
}

public String getName()
{
return QuartzLib.getPlugin().getName() + "-" + name;
}

}
Loading