A game framework written with osu! in mind.
1// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
2// See the LICENCE file in the repository root for full licence text.
3
4using System;
5using System.Collections.Generic;
6using System.Linq;
7using System.Threading;
8using JetBrains.Annotations;
9using osu.Framework.Extensions;
10using osu.Framework.Timing;
11
12namespace osu.Framework.Threading
13{
14 /// <summary>
15 /// Marshals delegates to run from the Scheduler's base thread in a threadsafe manner
16 /// </summary>
17 public class Scheduler
18 {
19 private readonly Queue<ScheduledDelegate> runQueue = new Queue<ScheduledDelegate>();
20 private readonly List<ScheduledDelegate> timedTasks = new List<ScheduledDelegate>();
21 private readonly List<ScheduledDelegate> perUpdateTasks = new List<ScheduledDelegate>();
22
23 private readonly Func<bool> isCurrentThread;
24
25 private IClock clock;
26 private double currentTime => clock?.CurrentTime ?? 0;
27
28 private readonly object queueLock = new object();
29
30 /// <summary>
31 /// Whether there are any tasks queued to run (including delayed tasks in the future).
32 /// </summary>
33 public bool HasPendingTasks => TotalPendingTasks > 0;
34
35 /// <summary>
36 /// The total number of <see cref="ScheduledDelegate"/>s tracked by this instance for future execution.
37 /// </summary>
38 internal int TotalPendingTasks => runQueue.Count + timedTasks.Count + perUpdateTasks.Count;
39
40 /// <summary>
41 /// The base thread is assumed to be the thread on which the constructor is run.
42 /// </summary>
43 public Scheduler()
44 {
45 var currentThread = Thread.CurrentThread;
46 isCurrentThread = () => Thread.CurrentThread == currentThread;
47
48 clock = new StopwatchClock(true);
49 }
50
51 /// <summary>
52 /// The base thread is assumed to be the thread on which the constructor is run.
53 /// </summary>
54 public Scheduler(Func<bool> isCurrentThread, IClock clock)
55 {
56 this.isCurrentThread = isCurrentThread;
57 this.clock = clock;
58 }
59
60 public void UpdateClock(IClock newClock)
61 {
62 if (newClock == null)
63 throw new ArgumentNullException(nameof(newClock));
64
65 if (newClock == clock)
66 return;
67
68 lock (queueLock)
69 {
70 if (clock == null)
71 {
72 // This is the first time we will get a valid time, so assume this is the
73 // reference point everything scheduled so far starts from.
74 foreach (var s in timedTasks)
75 s.ExecutionTime += newClock.CurrentTime;
76 }
77
78 clock = newClock;
79 }
80 }
81
82 /// <summary>
83 /// Returns whether we are on the main thread or not.
84 /// </summary>
85 protected bool IsMainThread => isCurrentThread?.Invoke() ?? true;
86
87 private readonly List<ScheduledDelegate> tasksToSchedule = new List<ScheduledDelegate>();
88 private readonly List<ScheduledDelegate> tasksToRemove = new List<ScheduledDelegate>();
89
90 /// <summary>
91 /// Run any pending work tasks.
92 /// </summary>
93 /// <returns>The number of tasks that were run.</returns>
94 public virtual int Update()
95 {
96 lock (queueLock)
97 {
98 queueTimedTasks();
99 queuePerUpdateTasks();
100 }
101
102 int countToRun = runQueue.Count;
103 int countRun = 0;
104
105 while (getNextTask(out ScheduledDelegate sd))
106 {
107 //todo: error handling
108 sd.RunTaskInternal();
109
110 if (++countRun == countToRun)
111 break;
112 }
113
114 return countRun;
115 }
116
117 private void queueTimedTasks()
118 {
119 double currentTimeLocal = currentTime;
120
121 if (timedTasks.Count > 0)
122 {
123 foreach (var sd in timedTasks)
124 {
125 if (sd.ExecutionTime <= currentTimeLocal)
126 {
127 tasksToRemove.Add(sd);
128
129 if (sd.Cancelled) continue;
130
131 if (sd.RepeatInterval == 0)
132 {
133 // handling of every-frame tasks is slightly different to reduce overhead.
134 perUpdateTasks.Add(sd);
135 continue;
136 }
137
138 if (sd.RepeatInterval > 0)
139 {
140 if (timedTasks.Count > 1000)
141 throw new ArgumentException("Too many timed tasks are in the queue!");
142
143 // schedule the next repeat of the task.
144 sd.SetNextExecution(currentTimeLocal);
145 tasksToSchedule.Add(sd);
146 }
147
148 if (!sd.Completed) runQueue.Enqueue(sd);
149 }
150 }
151
152 foreach (var t in tasksToRemove)
153 timedTasks.Remove(t);
154
155 tasksToRemove.Clear();
156
157 foreach (var t in tasksToSchedule)
158 timedTasks.AddInPlace(t);
159
160 tasksToSchedule.Clear();
161 }
162 }
163
164 private void queuePerUpdateTasks()
165 {
166 for (int i = 0; i < perUpdateTasks.Count; i++)
167 {
168 ScheduledDelegate task = perUpdateTasks[i];
169
170 task.SetNextExecution(null);
171
172 if (task.Cancelled)
173 {
174 perUpdateTasks.RemoveAt(i--);
175 continue;
176 }
177
178 runQueue.Enqueue(task);
179 }
180 }
181
182 private bool getNextTask(out ScheduledDelegate task)
183 {
184 lock (queueLock)
185 {
186 if (runQueue.Count > 0)
187 {
188 task = runQueue.Dequeue();
189 return true;
190 }
191 }
192
193 task = null;
194 return false;
195 }
196
197 /// <summary>
198 /// Cancel any pending work tasks.
199 /// </summary>
200 public void CancelDelayedTasks()
201 {
202 lock (queueLock)
203 {
204 foreach (var t in timedTasks)
205 t.Cancel();
206 timedTasks.Clear();
207 }
208 }
209
210 /// <summary>
211 /// Add a task to be scheduled.
212 /// </summary>
213 /// <remarks>If scheduled, the task will be run on the next <see cref="Update"/> independent of the current clock time.</remarks>
214 /// <param name="task">The work to be done.</param>
215 /// <param name="forceScheduled">If set to false, the task will be executed immediately if we are on the main thread.</param>
216 /// <returns>The scheduled task, or <c>null</c> if the task was executed immediately.</returns>
217 [CanBeNull]
218 public ScheduledDelegate Add([NotNull] Action task, bool forceScheduled = true)
219 {
220 if (!forceScheduled && IsMainThread)
221 {
222 //We are on the main thread already - don't need to schedule.
223 task.Invoke();
224 return null;
225 }
226
227 var del = new ScheduledDelegate(task);
228
229 lock (queueLock)
230 runQueue.Enqueue(del);
231
232 return del;
233 }
234
235 /// <summary>
236 /// Add a task to be scheduled.
237 /// </summary>
238 /// <remarks>The task will be run on the next <see cref="Update"/> independent of the current clock time.</remarks>
239 /// <param name="task">The scheduled delegate to add.</param>
240 /// <exception cref="InvalidOperationException">Thrown when attempting to add a scheduled delegate that has been already completed.</exception>
241 public void Add([NotNull] ScheduledDelegate task)
242 {
243 if (task.Completed)
244 throw new InvalidOperationException($"Can not add a {nameof(ScheduledDelegate)} that has been already {nameof(ScheduledDelegate.Completed)}");
245
246 lock (queueLock)
247 timedTasks.AddInPlace(task);
248 }
249
250 /// <summary>
251 /// Add a task which will be run after a specified delay from the current clock time.
252 /// </summary>
253 /// <param name="task">The work to be done.</param>
254 /// <param name="timeUntilRun">Milliseconds until run.</param>
255 /// <param name="repeat">Whether this task should repeat.</param>
256 /// <returns>The scheduled task.</returns>
257 [NotNull]
258 public ScheduledDelegate AddDelayed([NotNull] Action task, double timeUntilRun, bool repeat = false)
259 {
260 // We are locking here already to make sure we have no concurrent access to currentTime
261 lock (queueLock)
262 {
263 ScheduledDelegate del = new ScheduledDelegate(task, currentTime + timeUntilRun, repeat ? timeUntilRun : -1);
264 Add(del);
265 return del;
266 }
267 }
268
269 /// <summary>
270 /// Adds a task which will only be run once per frame, no matter how many times it was scheduled in the previous frame.
271 /// </summary>
272 /// <remarks>The task will be run on the next <see cref="Update"/> independent of the current clock time.</remarks>
273 /// <param name="task">The work to be done.</param>
274 /// <returns>Whether this is the first queue attempt of this work.</returns>
275 public bool AddOnce([NotNull] Action task)
276 {
277 lock (queueLock)
278 {
279 if (runQueue.Any(sd => sd.Task == task))
280 return false;
281
282 runQueue.Enqueue(new ScheduledDelegate(task));
283 }
284
285 return true;
286 }
287 }
288}