// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. // See the LICENCE file in the repository root for full licence text. using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Immutable; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace osu.Framework.Threading { /// /// Provides a scheduler that uses a managed thread "pool". /// public sealed class ThreadedTaskScheduler : TaskScheduler, IDisposable { private readonly BlockingCollection tasks; private readonly ImmutableArray threads; /// /// Initializes a new instance of the StaTaskScheduler class with the specified concurrency level. /// /// The number of threads that should be created and used by this scheduler. /// The thread name to give threads in this pool. public ThreadedTaskScheduler(int numberOfThreads, string name) { if (numberOfThreads < 1) throw new ArgumentOutOfRangeException(nameof(numberOfThreads)); tasks = new BlockingCollection(); threads = Enumerable.Range(0, numberOfThreads).Select(i => { var thread = new Thread(processTasks) { Name = $"ThreadedTaskScheduler ({name})", IsBackground = true }; thread.Start(); return thread; }).ToImmutableArray(); } /// /// Continually get the next task and try to execute it. /// This will continue as a blocking operation until the scheduler is disposed and no more tasks remain. /// private void processTasks() { try { foreach (var t in tasks.GetConsumingEnumerable()) TryExecuteTask(t); } catch (ObjectDisposedException) { // tasks may have been disposed. there's no easy way to check on this other than catch for it. } } /// /// Queues a Task to be executed by this scheduler. /// /// The task to be executed. protected override void QueueTask(Task task) => tasks.Add(task); /// /// Provides a list of the scheduled tasks for the debugger to consume. /// /// An enumerable of all tasks currently scheduled. protected override IEnumerable GetScheduledTasks() => tasks.ToArray(); /// /// Determines whether a Task may be inlined. /// /// The task to be executed. /// Whether the task was previously queued. /// true if the task was successfully inlined; otherwise, false. protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => threads.Contains(Thread.CurrentThread) && TryExecuteTask(task); /// Gets the maximum concurrency level supported by this scheduler. public override int MaximumConcurrencyLevel => threads.Length; /// /// Cleans up the scheduler by indicating that no more tasks will be queued. /// This method blocks until all threads successfully shutdown. /// public void Dispose() { tasks.CompleteAdding(); foreach (var thread in threads) thread.Join(TimeSpan.FromSeconds(10)); tasks.Dispose(); } } }