From fb56189f0a367961e81ac43e87c3dac830da43cf Mon Sep 17 00:00:00 2001 From: Luck Date: Sat, 19 Mar 2022 23:30:31 +0000 Subject: [PATCH] Improve AbstractJavaScheduler (#3326) --- .../bukkit/BukkitSchedulerAdapter.java | 1 + .../scheduler/AbstractJavaScheduler.java | 98 ++++++++++--------- .../fabric/FabricSchedulerAdapter.java | 1 + .../nukkit/NukkitSchedulerAdapter.java | 1 + 4 files changed, 53 insertions(+), 48 deletions(-) diff --git a/bukkit/src/main/java/me/lucko/luckperms/bukkit/BukkitSchedulerAdapter.java b/bukkit/src/main/java/me/lucko/luckperms/bukkit/BukkitSchedulerAdapter.java index 93095e3bf..90fbe7f12 100644 --- a/bukkit/src/main/java/me/lucko/luckperms/bukkit/BukkitSchedulerAdapter.java +++ b/bukkit/src/main/java/me/lucko/luckperms/bukkit/BukkitSchedulerAdapter.java @@ -34,6 +34,7 @@ public class BukkitSchedulerAdapter extends AbstractJavaScheduler implements Sch private final Executor sync; public BukkitSchedulerAdapter(LPBukkitBootstrap bootstrap) { + super(bootstrap); this.sync = r -> bootstrap.getServer().getScheduler().scheduleSyncDelayedTask(bootstrap.getLoader(), r); } diff --git a/common/src/main/java/me/lucko/luckperms/common/plugin/scheduler/AbstractJavaScheduler.java b/common/src/main/java/me/lucko/luckperms/common/plugin/scheduler/AbstractJavaScheduler.java index 1c170c836..e1583e877 100644 --- a/common/src/main/java/me/lucko/luckperms/common/plugin/scheduler/AbstractJavaScheduler.java +++ b/common/src/main/java/me/lucko/luckperms/common/plugin/scheduler/AbstractJavaScheduler.java @@ -25,40 +25,44 @@ package me.lucko.luckperms.common.plugin.scheduler; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import org.checkerframework.checker.nullness.qual.NonNull; +import me.lucko.luckperms.common.plugin.bootstrap.LuckPermsBootstrap; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.Arrays; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import java.util.stream.Collectors; /** * Abstract implementation of {@link SchedulerAdapter} using a {@link ScheduledExecutorService}. */ public abstract class AbstractJavaScheduler implements SchedulerAdapter { + private static final int PARALLELISM = 16; + + private final LuckPermsBootstrap bootstrap; + private final ScheduledThreadPoolExecutor scheduler; - private final ErrorReportingExecutor schedulerWorkerPool; private final ForkJoinPool worker; - public AbstractJavaScheduler() { - this.scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("luckperms-scheduler") - .build() - ); + public AbstractJavaScheduler(LuckPermsBootstrap bootstrap) { + this.bootstrap = bootstrap; + + this.scheduler = new ScheduledThreadPoolExecutor(1, r -> { + Thread thread = Executors.defaultThreadFactory().newThread(r); + thread.setName("luckperms-scheduler"); + return thread; + }); this.scheduler.setRemoveOnCancelPolicy(true); - this.schedulerWorkerPool = new ErrorReportingExecutor(Executors.newCachedThreadPool(new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("luckperms-scheduler-worker-%d") - .build() - )); - this.worker = new ForkJoinPool(32, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (t, e) -> e.printStackTrace(), false); + this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + this.worker = new ForkJoinPool(PARALLELISM, new WorkerThreadFactory(), new ExceptionHandler(), false); } @Override @@ -68,13 +72,13 @@ public abstract class AbstractJavaScheduler implements SchedulerAdapter { @Override public SchedulerTask asyncLater(Runnable task, long delay, TimeUnit unit) { - ScheduledFuture future = this.scheduler.schedule(() -> this.schedulerWorkerPool.execute(task), delay, unit); + ScheduledFuture future = this.scheduler.schedule(() -> this.worker.execute(task), delay, unit); return () -> future.cancel(false); } @Override public SchedulerTask asyncRepeating(Runnable task, long interval, TimeUnit unit) { - ScheduledFuture future = this.scheduler.scheduleAtFixedRate(() -> this.schedulerWorkerPool.execute(task), interval, interval, unit); + ScheduledFuture future = this.scheduler.scheduleAtFixedRate(() -> this.worker.execute(task), interval, interval, unit); return () -> future.cancel(false); } @@ -82,7 +86,10 @@ public abstract class AbstractJavaScheduler implements SchedulerAdapter { public void shutdownScheduler() { this.scheduler.shutdown(); try { - this.scheduler.awaitTermination(1, TimeUnit.MINUTES); + if (!this.scheduler.awaitTermination(1, TimeUnit.MINUTES)) { + this.bootstrap.getPluginLogger().severe("Timed out waiting for the LuckPerms scheduler to terminate"); + reportRunningTasks(thread -> thread.getName().equals("luckperms-scheduler")); + } } catch (InterruptedException e) { e.printStackTrace(); } @@ -90,48 +97,43 @@ public abstract class AbstractJavaScheduler implements SchedulerAdapter { @Override public void shutdownExecutor() { - this.schedulerWorkerPool.delegate.shutdown(); - try { - this.schedulerWorkerPool.delegate.awaitTermination(1, TimeUnit.MINUTES); - } catch (InterruptedException e) { - e.printStackTrace(); - } - this.worker.shutdown(); try { - this.worker.awaitTermination(1, TimeUnit.MINUTES); + if (!this.worker.awaitTermination(1, TimeUnit.MINUTES)) { + this.bootstrap.getPluginLogger().severe("Timed out waiting for the LuckPerms worker thread pool to terminate"); + reportRunningTasks(thread -> thread.getName().startsWith("luckperms-worker-")); + } } catch (InterruptedException e) { e.printStackTrace(); } } - private static final class ErrorReportingExecutor implements Executor { - private final ExecutorService delegate; + private void reportRunningTasks(Predicate predicate) { + Thread.getAllStackTraces().forEach((thread, stack) -> { + if (predicate.test(thread)) { + this.bootstrap.getPluginLogger().warn("Thread " + thread.getName() + " is blocked, and may be the reason for the slow shutdown!\n" + + Arrays.stream(stack).map(el -> " " + el).collect(Collectors.joining("\n")) + ); + } + }); + } - private ErrorReportingExecutor(ExecutorService delegate) { - this.delegate = delegate; - } + private static final class WorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory { + private static final AtomicInteger COUNT = new AtomicInteger(0); @Override - public void execute(@NonNull Runnable command) { - this.delegate.execute(new ErrorReportingRunnable(command)); + public ForkJoinWorkerThread newThread(ForkJoinPool pool) { + ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); + thread.setDaemon(true); + thread.setName("luckperms-worker-" + COUNT.getAndIncrement()); + return thread; } } - private static final class ErrorReportingRunnable implements Runnable { - private final Runnable delegate; - - private ErrorReportingRunnable(Runnable delegate) { - this.delegate = delegate; - } - + private final class ExceptionHandler implements UncaughtExceptionHandler { @Override - public void run() { - try { - this.delegate.run(); - } catch (Exception e) { - e.printStackTrace(); - } + public void uncaughtException(Thread t, Throwable e) { + AbstractJavaScheduler.this.bootstrap.getPluginLogger().warn("Thread " + t.getName() + " threw an uncaught exception", e); } } } diff --git a/fabric/src/main/java/me/lucko/luckperms/fabric/FabricSchedulerAdapter.java b/fabric/src/main/java/me/lucko/luckperms/fabric/FabricSchedulerAdapter.java index 30fc6315d..d7abc4798 100644 --- a/fabric/src/main/java/me/lucko/luckperms/fabric/FabricSchedulerAdapter.java +++ b/fabric/src/main/java/me/lucko/luckperms/fabric/FabricSchedulerAdapter.java @@ -33,6 +33,7 @@ public class FabricSchedulerAdapter extends AbstractJavaScheduler { private final Executor sync; public FabricSchedulerAdapter(LPFabricBootstrap bootstrap) { + super(bootstrap); this.sync = r -> bootstrap.getServer().orElseThrow(() -> new IllegalStateException("Server not ready")).submitAndJoin(r); } diff --git a/nukkit/src/main/java/me/lucko/luckperms/nukkit/NukkitSchedulerAdapter.java b/nukkit/src/main/java/me/lucko/luckperms/nukkit/NukkitSchedulerAdapter.java index 8e33cb687..003ee229d 100644 --- a/nukkit/src/main/java/me/lucko/luckperms/nukkit/NukkitSchedulerAdapter.java +++ b/nukkit/src/main/java/me/lucko/luckperms/nukkit/NukkitSchedulerAdapter.java @@ -34,6 +34,7 @@ public class NukkitSchedulerAdapter extends AbstractJavaScheduler implements Sch private final Executor sync; public NukkitSchedulerAdapter(LPNukkitBootstrap bootstrap) { + super(bootstrap); this.sync = r -> bootstrap.getServer().getScheduler().scheduleTask(bootstrap.getLoader(), r, false); }