A game framework written with osu! in mind.
at master 3.9 kB view raw
1// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence. 2// See the LICENCE file in the repository root for full licence text. 3 4using System; 5using System.Collections.Concurrent; 6using System.Collections.Generic; 7using System.Collections.Immutable; 8using System.Linq; 9using System.Threading; 10using System.Threading.Tasks; 11 12namespace osu.Framework.Threading 13{ 14 /// <summary> 15 /// Provides a scheduler that uses a managed thread "pool". 16 /// </summary> 17 public sealed class ThreadedTaskScheduler : TaskScheduler, IDisposable 18 { 19 private readonly BlockingCollection<Task> tasks; 20 21 private readonly ImmutableArray<Thread> threads; 22 23 /// <summary> 24 /// Initializes a new instance of the StaTaskScheduler class with the specified concurrency level. 25 /// </summary> 26 /// <param name="numberOfThreads">The number of threads that should be created and used by this scheduler.</param> 27 /// <param name="name">The thread name to give threads in this pool.</param> 28 public ThreadedTaskScheduler(int numberOfThreads, string name) 29 { 30 if (numberOfThreads < 1) 31 throw new ArgumentOutOfRangeException(nameof(numberOfThreads)); 32 33 tasks = new BlockingCollection<Task>(); 34 35 threads = Enumerable.Range(0, numberOfThreads).Select(i => 36 { 37 var thread = new Thread(processTasks) 38 { 39 Name = $"ThreadedTaskScheduler ({name})", 40 IsBackground = true 41 }; 42 43 thread.Start(); 44 45 return thread; 46 }).ToImmutableArray(); 47 } 48 49 /// <summary> 50 /// Continually get the next task and try to execute it. 51 /// This will continue as a blocking operation until the scheduler is disposed and no more tasks remain. 52 /// </summary> 53 private void processTasks() 54 { 55 try 56 { 57 foreach (var t in tasks.GetConsumingEnumerable()) TryExecuteTask(t); 58 } 59 catch (ObjectDisposedException) 60 { 61 // tasks may have been disposed. there's no easy way to check on this other than catch for it. 62 } 63 } 64 65 /// <summary> 66 /// Queues a Task to be executed by this scheduler. 67 /// </summary> 68 /// <param name="task">The task to be executed.</param> 69 protected override void QueueTask(Task task) => tasks.Add(task); 70 71 /// <summary> 72 /// Provides a list of the scheduled tasks for the debugger to consume. 73 /// </summary> 74 /// <returns>An enumerable of all tasks currently scheduled.</returns> 75 protected override IEnumerable<Task> GetScheduledTasks() => tasks.ToArray(); 76 77 /// <summary> 78 /// Determines whether a Task may be inlined. 79 /// </summary> 80 /// <param name="task">The task to be executed.</param> 81 /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param> 82 /// <returns>true if the task was successfully inlined; otherwise, false.</returns> 83 protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => threads.Contains(Thread.CurrentThread) && TryExecuteTask(task); 84 85 /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary> 86 public override int MaximumConcurrencyLevel => threads.Length; 87 88 /// <summary> 89 /// Cleans up the scheduler by indicating that no more tasks will be queued. 90 /// This method blocks until all threads successfully shutdown. 91 /// </summary> 92 public void Dispose() 93 { 94 tasks.CompleteAdding(); 95 96 foreach (var thread in threads) 97 thread.Join(TimeSpan.FromSeconds(10)); 98 99 tasks.Dispose(); 100 } 101 } 102}