A game framework written with osu! in mind.
1// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
2// See the LICENCE file in the repository root for full licence text.
3
4using System;
5using System.Collections.Concurrent;
6using System.Collections.Generic;
7using System.Collections.Immutable;
8using System.Linq;
9using System.Threading;
10using System.Threading.Tasks;
11
12namespace osu.Framework.Threading
13{
14 /// <summary>
15 /// Provides a scheduler that uses a managed thread "pool".
16 /// </summary>
17 public sealed class ThreadedTaskScheduler : TaskScheduler, IDisposable
18 {
19 private readonly BlockingCollection<Task> tasks;
20
21 private readonly ImmutableArray<Thread> threads;
22
23 /// <summary>
24 /// Initializes a new instance of the StaTaskScheduler class with the specified concurrency level.
25 /// </summary>
26 /// <param name="numberOfThreads">The number of threads that should be created and used by this scheduler.</param>
27 /// <param name="name">The thread name to give threads in this pool.</param>
28 public ThreadedTaskScheduler(int numberOfThreads, string name)
29 {
30 if (numberOfThreads < 1)
31 throw new ArgumentOutOfRangeException(nameof(numberOfThreads));
32
33 tasks = new BlockingCollection<Task>();
34
35 threads = Enumerable.Range(0, numberOfThreads).Select(i =>
36 {
37 var thread = new Thread(processTasks)
38 {
39 Name = $"ThreadedTaskScheduler ({name})",
40 IsBackground = true
41 };
42
43 thread.Start();
44
45 return thread;
46 }).ToImmutableArray();
47 }
48
49 /// <summary>
50 /// Continually get the next task and try to execute it.
51 /// This will continue as a blocking operation until the scheduler is disposed and no more tasks remain.
52 /// </summary>
53 private void processTasks()
54 {
55 try
56 {
57 foreach (var t in tasks.GetConsumingEnumerable()) TryExecuteTask(t);
58 }
59 catch (ObjectDisposedException)
60 {
61 // tasks may have been disposed. there's no easy way to check on this other than catch for it.
62 }
63 }
64
65 /// <summary>
66 /// Queues a Task to be executed by this scheduler.
67 /// </summary>
68 /// <param name="task">The task to be executed.</param>
69 protected override void QueueTask(Task task) => tasks.Add(task);
70
71 /// <summary>
72 /// Provides a list of the scheduled tasks for the debugger to consume.
73 /// </summary>
74 /// <returns>An enumerable of all tasks currently scheduled.</returns>
75 protected override IEnumerable<Task> GetScheduledTasks() => tasks.ToArray();
76
77 /// <summary>
78 /// Determines whether a Task may be inlined.
79 /// </summary>
80 /// <param name="task">The task to be executed.</param>
81 /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param>
82 /// <returns>true if the task was successfully inlined; otherwise, false.</returns>
83 protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => threads.Contains(Thread.CurrentThread) && TryExecuteTask(task);
84
85 /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
86 public override int MaximumConcurrencyLevel => threads.Length;
87
88 /// <summary>
89 /// Cleans up the scheduler by indicating that no more tasks will be queued.
90 /// This method blocks until all threads successfully shutdown.
91 /// </summary>
92 public void Dispose()
93 {
94 tasks.CompleteAdding();
95
96 foreach (var thread in threads)
97 thread.Join(TimeSpan.FromSeconds(10));
98
99 tasks.Dispose();
100 }
101 }
102}