package org.bukkit.craftbukkit.scheduler; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.logging.Level; import org.apache.commons.lang.Validate; import org.bukkit.plugin.IllegalPluginAccessException; import org.bukkit.plugin.Plugin; import org.bukkit.scheduler.BukkitRunnable; import org.bukkit.scheduler.BukkitScheduler; import org.bukkit.scheduler.BukkitTask; import org.bukkit.scheduler.BukkitWorker; /** * The fundamental concepts for this implementation: *
  • Main thread owns {@link #head} and {@link #currentTick}, but it may be read from any thread
  • *
  • Main thread exclusively controls {@link #temp} and {@link #pending}. * They are never to be accessed outside of the main thread; alternatives exist to prevent locking.
  • *
  • {@link #head} to {@link #tail} act as a linked list/queue, with 1 consumer and infinite producers. * Adding to the tail is atomic and very efficient; utility method is {@link #handle(CraftTask, long)} or {@link #addTask(CraftTask)}.
  • *
  • Changing the period on a task is delicate. * Any future task needs to notify waiting threads. * Async tasks must be synchronized to make sure that any thread that's finishing will remove itself from {@link #runners}. * Another utility method is provided for this, {@link #cancelTask(int)}
  • *
  • {@link #runners} provides a moderately up-to-date view of active tasks. * If the linked head to tail set is read, all remaining tasks that were active at the time execution started will be located in runners.
  • *
  • Async tasks are responsible for removing themselves from runners
  • *
  • Sync tasks are only to be removed from runners on the main thread when coupled with a removal from pending and temp.
  • *
  • Most of the design in this scheduler relies on queuing special tasks to perform any data changes on the main thread. * When executed from inside a synchronous method, the scheduler will be updated before next execution by virtue of the frequent {@link #parsePending()} calls.
  • */ public class CraftScheduler implements BukkitScheduler { /** * Counter for IDs. Order doesn't matter, only uniqueness. */ private final AtomicInteger ids = new AtomicInteger(1); /** * Current head of linked-list. This reference is always stale, {@link CraftTask#next} is the live reference. */ private volatile CraftTask head = new CraftTask(); /** * Tail of a linked-list. AtomicReference only matters when adding to queue */ private final AtomicReference tail = new AtomicReference(head); /** * Main thread logic only */ private final PriorityQueue pending = new PriorityQueue(10, new Comparator() { @Override public int compare(final CraftTask o1, final CraftTask o2) { int value = Long.compare(o1.getNextRun(), o2.getNextRun()); // If the tasks should run on the same tick they should be run FIFO return value != 0 ? value : Integer.compare(o1.getTaskId(), o2.getTaskId()); } }); /** * Main thread logic only */ private final List temp = new ArrayList(); /** * These are tasks that are currently active. It's provided for 'viewing' the current state. */ private final ConcurrentHashMap runners = new ConcurrentHashMap(); /** * The sync task that is currently running on the main thread. */ private volatile CraftTask currentTask = null; private volatile int currentTick = -1; private final Executor executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("Craft Scheduler Thread - %d").build()); private CraftAsyncDebugger debugHead = new CraftAsyncDebugger(-1, null, null) {@Override StringBuilder debugTo(StringBuilder string) {return string;}}; private CraftAsyncDebugger debugTail = debugHead; private static final int RECENT_TICKS; static { RECENT_TICKS = 30; } @Override public int scheduleSyncDelayedTask(final Plugin plugin, final Runnable task) { return this.scheduleSyncDelayedTask(plugin, task, 0L); } @Override public BukkitTask runTask(Plugin plugin, Runnable runnable) { return runTaskLater(plugin, runnable, 0L); } @Override public void runTask(Plugin plugin, Consumer task) throws IllegalArgumentException { runTaskLater(plugin, task, 0L); } @Deprecated @Override public int scheduleAsyncDelayedTask(final Plugin plugin, final Runnable task) { return this.scheduleAsyncDelayedTask(plugin, task, 0L); } @Override public BukkitTask runTaskAsynchronously(Plugin plugin, Runnable runnable) { return runTaskLaterAsynchronously(plugin, runnable, 0L); } @Override public void runTaskAsynchronously(Plugin plugin, Consumer task) throws IllegalArgumentException { runTaskLaterAsynchronously(plugin, task, 0L); } @Override public int scheduleSyncDelayedTask(final Plugin plugin, final Runnable task, final long delay) { return this.scheduleSyncRepeatingTask(plugin, task, delay, CraftTask.NO_REPEATING); } @Override public BukkitTask runTaskLater(Plugin plugin, Runnable runnable, long delay) { return runTaskTimer(plugin, runnable, delay, CraftTask.NO_REPEATING); } @Override public void runTaskLater(Plugin plugin, Consumer task, long delay) throws IllegalArgumentException { runTaskTimer(plugin, task, delay, CraftTask.NO_REPEATING); } @Deprecated @Override public int scheduleAsyncDelayedTask(final Plugin plugin, final Runnable task, final long delay) { return this.scheduleAsyncRepeatingTask(plugin, task, delay, CraftTask.NO_REPEATING); } @Override public BukkitTask runTaskLaterAsynchronously(Plugin plugin, Runnable runnable, long delay) { return runTaskTimerAsynchronously(plugin, runnable, delay, CraftTask.NO_REPEATING); } @Override public void runTaskLaterAsynchronously(Plugin plugin, Consumer task, long delay) throws IllegalArgumentException { runTaskTimerAsynchronously(plugin, task, delay, CraftTask.NO_REPEATING); } @Override public void runTaskTimerAsynchronously(Plugin plugin, Consumer task, long delay, long period) throws IllegalArgumentException { runTaskTimerAsynchronously(plugin, (Object) task, delay, CraftTask.NO_REPEATING); } @Override public int scheduleSyncRepeatingTask(final Plugin plugin, final Runnable runnable, long delay, long period) { return runTaskTimer(plugin, runnable, delay, period).getTaskId(); } @Override public BukkitTask runTaskTimer(Plugin plugin, Runnable runnable, long delay, long period) { return runTaskTimer(plugin, (Object) runnable, delay, period); } @Override public void runTaskTimer(Plugin plugin, Consumer task, long delay, long period) throws IllegalArgumentException { runTaskTimer(plugin, (Object) task, delay, period); } public BukkitTask runTaskTimer(Plugin plugin, Object runnable, long delay, long period) { validate(plugin, runnable); if (delay < 0L) { delay = 0; } if (period == CraftTask.ERROR) { period = 1L; } else if (period < CraftTask.NO_REPEATING) { period = CraftTask.NO_REPEATING; } return handle(new CraftTask(plugin, runnable, nextId(), period), delay); } @Deprecated @Override public int scheduleAsyncRepeatingTask(final Plugin plugin, final Runnable runnable, long delay, long period) { return runTaskTimerAsynchronously(plugin, runnable, delay, period).getTaskId(); } @Override public BukkitTask runTaskTimerAsynchronously(Plugin plugin, Runnable runnable, long delay, long period) { return runTaskTimerAsynchronously(plugin, (Object) runnable, delay, period); } public BukkitTask runTaskTimerAsynchronously(Plugin plugin, Object runnable, long delay, long period) { validate(plugin, runnable); if (delay < 0L) { delay = 0; } if (period == CraftTask.ERROR) { period = 1L; } else if (period < CraftTask.NO_REPEATING) { period = CraftTask.NO_REPEATING; } return handle(new CraftAsyncTask(runners, plugin, runnable, nextId(), period), delay); } @Override public Future callSyncMethod(final Plugin plugin, final Callable task) { validate(plugin, task); final CraftFuture future = new CraftFuture(task, plugin, nextId()); handle(future, 0L); return future; } @Override public void cancelTask(final int taskId) { if (taskId <= 0) { return; } CraftTask task = runners.get(taskId); if (task != null) { task.cancel0(); } task = new CraftTask( new Runnable() { @Override public void run() { if (!check(CraftScheduler.this.temp)) { check(CraftScheduler.this.pending); } } private boolean check(final Iterable collection) { final Iterator tasks = collection.iterator(); while (tasks.hasNext()) { final CraftTask task = tasks.next(); if (task.getTaskId() == taskId) { task.cancel0(); tasks.remove(); if (task.isSync()) { runners.remove(taskId); } return true; } } return false; }}); handle(task, 0L); for (CraftTask taskPending = head.getNext(); taskPending != null; taskPending = taskPending.getNext()) { if (taskPending == task) { return; } if (taskPending.getTaskId() == taskId) { taskPending.cancel0(); } } } @Override public void cancelTasks(final Plugin plugin) { Validate.notNull(plugin, "Cannot cancel tasks of null plugin"); final CraftTask task = new CraftTask( new Runnable() { @Override public void run() { check(CraftScheduler.this.pending); check(CraftScheduler.this.temp); } void check(final Iterable collection) { final Iterator tasks = collection.iterator(); while (tasks.hasNext()) { final CraftTask task = tasks.next(); if (task.getOwner().equals(plugin)) { task.cancel0(); tasks.remove(); if (task.isSync()) { runners.remove(task.getTaskId()); } } } } }); handle(task, 0L); for (CraftTask taskPending = head.getNext(); taskPending != null; taskPending = taskPending.getNext()) { if (taskPending == task) { break; } if (taskPending.getTaskId() != -1 && taskPending.getOwner().equals(plugin)) { taskPending.cancel0(); } } for (CraftTask runner : runners.values()) { if (runner.getOwner().equals(plugin)) { runner.cancel0(); } } } @Override public boolean isCurrentlyRunning(final int taskId) { final CraftTask task = runners.get(taskId); if (task == null) { return false; } if (task.isSync()) { return (task == currentTask); } final CraftAsyncTask asyncTask = (CraftAsyncTask) task; synchronized (asyncTask.getWorkers()) { return !asyncTask.getWorkers().isEmpty(); } } @Override public boolean isQueued(final int taskId) { if (taskId <= 0) { return false; } for (CraftTask task = head.getNext(); task != null; task = task.getNext()) { if (task.getTaskId() == taskId) { return task.getPeriod() >= CraftTask.NO_REPEATING; // The task will run } } CraftTask task = runners.get(taskId); return task != null && task.getPeriod() >= CraftTask.NO_REPEATING; } @Override public List getActiveWorkers() { final ArrayList workers = new ArrayList(); for (final CraftTask taskObj : runners.values()) { // Iterator will be a best-effort (may fail to grab very new values) if called from an async thread if (taskObj.isSync()) { continue; } final CraftAsyncTask task = (CraftAsyncTask) taskObj; synchronized (task.getWorkers()) { // This will never have an issue with stale threads; it's state-safe workers.addAll(task.getWorkers()); } } return workers; } @Override public List getPendingTasks() { final ArrayList truePending = new ArrayList(); for (CraftTask task = head.getNext(); task != null; task = task.getNext()) { if (task.getTaskId() != -1) { // -1 is special code truePending.add(task); } } final ArrayList pending = new ArrayList(); for (CraftTask task : runners.values()) { if (task.getPeriod() >= CraftTask.NO_REPEATING) { pending.add(task); } } for (final CraftTask task : truePending) { if (task.getPeriod() >= CraftTask.NO_REPEATING && !pending.contains(task)) { pending.add(task); } } return pending; } /** * This method is designed to never block or wait for locks; an immediate execution of all current tasks. */ public void mainThreadHeartbeat(final int currentTick) { this.currentTick = currentTick; final List temp = this.temp; parsePending(); while (isReady(currentTick)) { final CraftTask task = pending.remove(); if (task.getPeriod() < CraftTask.NO_REPEATING) { if (task.isSync()) { runners.remove(task.getTaskId(), task); } parsePending(); continue; } if (task.isSync()) { currentTask = task; try { task.run(); } catch (final Throwable throwable) { task.getOwner().getLogger().log( Level.WARNING, String.format( "Task #%s for %s generated an exception", task.getTaskId(), task.getOwner().getDescription().getFullName()), throwable); } finally { currentTask = null; } parsePending(); } else { debugTail = debugTail.setNext(new CraftAsyncDebugger(currentTick + RECENT_TICKS, task.getOwner(), task.getTaskClass())); executor.execute(task); // We don't need to parse pending // (async tasks must live with race-conditions if they attempt to cancel between these few lines of code) } final long period = task.getPeriod(); // State consistency if (period > 0) { task.setNextRun(currentTick + period); temp.add(task); } else if (task.isSync()) { runners.remove(task.getTaskId()); } } pending.addAll(temp); temp.clear(); debugHead = debugHead.getNextHead(currentTick); } private void addTask(final CraftTask task) { final AtomicReference tail = this.tail; CraftTask tailTask = tail.get(); while (!tail.compareAndSet(tailTask, task)) { tailTask = tail.get(); } tailTask.setNext(task); } private CraftTask handle(final CraftTask task, final long delay) { task.setNextRun(currentTick + delay); addTask(task); return task; } private static void validate(final Plugin plugin, final Object task) { Validate.notNull(plugin, "Plugin cannot be null"); Validate.notNull(task, "Task cannot be null"); Validate.isTrue(task instanceof Runnable || task instanceof Consumer || task instanceof Callable, "Task must be Runnable, Consumer, or Callable"); if (!plugin.isEnabled()) { throw new IllegalPluginAccessException("Plugin attempted to register task while disabled"); } } private int nextId() { return ids.incrementAndGet(); } private void parsePending() { CraftTask head = this.head; CraftTask task = head.getNext(); CraftTask lastTask = head; for (; task != null; task = (lastTask = task).getNext()) { if (task.getTaskId() == -1) { task.run(); } else if (task.getPeriod() >= CraftTask.NO_REPEATING) { pending.add(task); runners.put(task.getTaskId(), task); } } // We split this because of the way things are ordered for all of the async calls in CraftScheduler // (it prevents race-conditions) for (task = head; task != lastTask; task = head) { head = task.getNext(); task.setNext(null); } this.head = lastTask; } private boolean isReady(final int currentTick) { return !pending.isEmpty() && pending.peek().getNextRun() <= currentTick; } @Override public String toString() { int debugTick = currentTick; StringBuilder string = new StringBuilder("Recent tasks from ").append(debugTick - RECENT_TICKS).append('-').append(debugTick).append('{'); debugHead.debugTo(string); return string.append('}').toString(); } @Deprecated @Override public int scheduleSyncDelayedTask(Plugin plugin, BukkitRunnable task, long delay) { throw new UnsupportedOperationException("Use BukkitRunnable#runTaskLater(Plugin, long)"); } @Deprecated @Override public int scheduleSyncDelayedTask(Plugin plugin, BukkitRunnable task) { throw new UnsupportedOperationException("Use BukkitRunnable#runTask(Plugin)"); } @Deprecated @Override public int scheduleSyncRepeatingTask(Plugin plugin, BukkitRunnable task, long delay, long period) { throw new UnsupportedOperationException("Use BukkitRunnable#runTaskTimer(Plugin, long, long)"); } @Deprecated @Override public BukkitTask runTask(Plugin plugin, BukkitRunnable task) throws IllegalArgumentException { throw new UnsupportedOperationException("Use BukkitRunnable#runTask(Plugin)"); } @Deprecated @Override public BukkitTask runTaskAsynchronously(Plugin plugin, BukkitRunnable task) throws IllegalArgumentException { throw new UnsupportedOperationException("Use BukkitRunnable#runTaskAsynchronously(Plugin)"); } @Deprecated @Override public BukkitTask runTaskLater(Plugin plugin, BukkitRunnable task, long delay) throws IllegalArgumentException { throw new UnsupportedOperationException("Use BukkitRunnable#runTaskLater(Plugin, long)"); } @Deprecated @Override public BukkitTask runTaskLaterAsynchronously(Plugin plugin, BukkitRunnable task, long delay) throws IllegalArgumentException { throw new UnsupportedOperationException("Use BukkitRunnable#runTaskLaterAsynchronously(Plugin, long)"); } @Deprecated @Override public BukkitTask runTaskTimer(Plugin plugin, BukkitRunnable task, long delay, long period) throws IllegalArgumentException { throw new UnsupportedOperationException("Use BukkitRunnable#runTaskTimer(Plugin, long, long)"); } @Deprecated @Override public BukkitTask runTaskTimerAsynchronously(Plugin plugin, BukkitRunnable task, long delay, long period) throws IllegalArgumentException { throw new UnsupportedOperationException("Use BukkitRunnable#runTaskTimerAsynchronously(Plugin, long, long)"); } }