1// Copyright 2020 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package par
6
7import "fmt"
8
9// Queue manages a set of work items to be executed in parallel. The number of
10// active work items is limited, and excess items are queued sequentially.
11type Queue struct {
12 maxActive int
13 st chan queueState
14}
15
16type queueState struct {
17 active int // number of goroutines processing work; always nonzero when len(backlog) > 0
18 backlog []func()
19 idle chan struct{} // if non-nil, closed when active becomes 0
20}
21
22// NewQueue returns a Queue that executes up to maxActive items in parallel.
23//
24// maxActive must be positive.
25func NewQueue(maxActive int) *Queue {
26 if maxActive < 1 {
27 panic(fmt.Sprintf("par.NewQueue called with nonpositive limit (%d)", maxActive))
28 }
29
30 q := &Queue{
31 maxActive: maxActive,
32 st: make(chan queueState, 1),
33 }
34 q.st <- queueState{}
35 return q
36}
37
38// Add adds f as a work item in the queue.
39//
40// Add returns immediately, but the queue will be marked as non-idle until after
41// f (and any subsequently-added work) has completed.
42func (q *Queue) Add(f func()) {
43 st := <-q.st
44 if st.active == q.maxActive {
45 st.backlog = append(st.backlog, f)
46 q.st <- st
47 return
48 }
49 if st.active == 0 {
50 // Mark q as non-idle.
51 st.idle = nil
52 }
53 st.active++
54 q.st <- st
55
56 go func() {
57 for {
58 f()
59
60 st := <-q.st
61 if len(st.backlog) == 0 {
62 if st.active--; st.active == 0 && st.idle != nil {
63 close(st.idle)
64 }
65 q.st <- st
66 return
67 }
68 f, st.backlog = st.backlog[0], st.backlog[1:]
69 q.st <- st
70 }
71 }()
72}
73
74// Idle returns a channel that will be closed when q has no (active or enqueued)
75// work outstanding.
76func (q *Queue) Idle() <-chan struct{} {
77 st := <-q.st
78 defer func() { q.st <- st }()
79
80 if st.idle == nil {
81 st.idle = make(chan struct{})
82 if st.active == 0 {
83 close(st.idle)
84 }
85 }
86
87 return st.idle
88}