// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. // See the LICENCE file in the repository root for full licence text. using System; using System.Collections.Generic; using System.Linq; using System.Threading; using JetBrains.Annotations; using osu.Framework.Extensions; using osu.Framework.Timing; namespace osu.Framework.Threading { /// /// Marshals delegates to run from the Scheduler's base thread in a threadsafe manner /// public class Scheduler { private readonly Queue runQueue = new Queue(); private readonly List timedTasks = new List(); private readonly List perUpdateTasks = new List(); private readonly Func isCurrentThread; private IClock clock; private double currentTime => clock?.CurrentTime ?? 0; private readonly object queueLock = new object(); /// /// Whether there are any tasks queued to run (including delayed tasks in the future). /// public bool HasPendingTasks => TotalPendingTasks > 0; /// /// The total number of s tracked by this instance for future execution. /// internal int TotalPendingTasks => runQueue.Count + timedTasks.Count + perUpdateTasks.Count; /// /// The base thread is assumed to be the thread on which the constructor is run. /// public Scheduler() { var currentThread = Thread.CurrentThread; isCurrentThread = () => Thread.CurrentThread == currentThread; clock = new StopwatchClock(true); } /// /// The base thread is assumed to be the thread on which the constructor is run. /// public Scheduler(Func isCurrentThread, IClock clock) { this.isCurrentThread = isCurrentThread; this.clock = clock; } public void UpdateClock(IClock newClock) { if (newClock == null) throw new ArgumentNullException(nameof(newClock)); if (newClock == clock) return; lock (queueLock) { if (clock == null) { // This is the first time we will get a valid time, so assume this is the // reference point everything scheduled so far starts from. foreach (var s in timedTasks) s.ExecutionTime += newClock.CurrentTime; } clock = newClock; } } /// /// Returns whether we are on the main thread or not. /// protected bool IsMainThread => isCurrentThread?.Invoke() ?? true; private readonly List tasksToSchedule = new List(); private readonly List tasksToRemove = new List(); /// /// Run any pending work tasks. /// /// The number of tasks that were run. public virtual int Update() { lock (queueLock) { queueTimedTasks(); queuePerUpdateTasks(); } int countToRun = runQueue.Count; int countRun = 0; while (getNextTask(out ScheduledDelegate sd)) { //todo: error handling sd.RunTaskInternal(); if (++countRun == countToRun) break; } return countRun; } private void queueTimedTasks() { double currentTimeLocal = currentTime; if (timedTasks.Count > 0) { foreach (var sd in timedTasks) { if (sd.ExecutionTime <= currentTimeLocal) { tasksToRemove.Add(sd); if (sd.Cancelled) continue; if (sd.RepeatInterval == 0) { // handling of every-frame tasks is slightly different to reduce overhead. perUpdateTasks.Add(sd); continue; } if (sd.RepeatInterval > 0) { if (timedTasks.Count > 1000) throw new ArgumentException("Too many timed tasks are in the queue!"); // schedule the next repeat of the task. sd.SetNextExecution(currentTimeLocal); tasksToSchedule.Add(sd); } if (!sd.Completed) runQueue.Enqueue(sd); } } foreach (var t in tasksToRemove) timedTasks.Remove(t); tasksToRemove.Clear(); foreach (var t in tasksToSchedule) timedTasks.AddInPlace(t); tasksToSchedule.Clear(); } } private void queuePerUpdateTasks() { for (int i = 0; i < perUpdateTasks.Count; i++) { ScheduledDelegate task = perUpdateTasks[i]; task.SetNextExecution(null); if (task.Cancelled) { perUpdateTasks.RemoveAt(i--); continue; } runQueue.Enqueue(task); } } private bool getNextTask(out ScheduledDelegate task) { lock (queueLock) { if (runQueue.Count > 0) { task = runQueue.Dequeue(); return true; } } task = null; return false; } /// /// Cancel any pending work tasks. /// public void CancelDelayedTasks() { lock (queueLock) { foreach (var t in timedTasks) t.Cancel(); timedTasks.Clear(); } } /// /// Add a task to be scheduled. /// /// If scheduled, the task will be run on the next independent of the current clock time. /// The work to be done. /// If set to false, the task will be executed immediately if we are on the main thread. /// The scheduled task, or null if the task was executed immediately. [CanBeNull] public ScheduledDelegate Add([NotNull] Action task, bool forceScheduled = true) { if (!forceScheduled && IsMainThread) { //We are on the main thread already - don't need to schedule. task.Invoke(); return null; } var del = new ScheduledDelegate(task); lock (queueLock) runQueue.Enqueue(del); return del; } /// /// Add a task to be scheduled. /// /// The task will be run on the next independent of the current clock time. /// The scheduled delegate to add. /// Thrown when attempting to add a scheduled delegate that has been already completed. public void Add([NotNull] ScheduledDelegate task) { if (task.Completed) throw new InvalidOperationException($"Can not add a {nameof(ScheduledDelegate)} that has been already {nameof(ScheduledDelegate.Completed)}"); lock (queueLock) timedTasks.AddInPlace(task); } /// /// Add a task which will be run after a specified delay from the current clock time. /// /// The work to be done. /// Milliseconds until run. /// Whether this task should repeat. /// The scheduled task. [NotNull] public ScheduledDelegate AddDelayed([NotNull] Action task, double timeUntilRun, bool repeat = false) { // We are locking here already to make sure we have no concurrent access to currentTime lock (queueLock) { ScheduledDelegate del = new ScheduledDelegate(task, currentTime + timeUntilRun, repeat ? timeUntilRun : -1); Add(del); return del; } } /// /// Adds a task which will only be run once per frame, no matter how many times it was scheduled in the previous frame. /// /// The task will be run on the next independent of the current clock time. /// The work to be done. /// Whether this is the first queue attempt of this work. public bool AddOnce([NotNull] Action task) { lock (queueLock) { if (runQueue.Any(sd => sd.Task == task)) return false; runQueue.Enqueue(new ScheduledDelegate(task)); } return true; } } }