1// Copyright 2023 The Gitea Authors. All rights reserved.
2// SPDX-License-Identifier: MIT
3
4package queue
5
6import (
7 "context"
8 "fmt"
9 "testing"
10 "time"
11
12 "github.com/stretchr/testify/assert"
13 "github.com/stretchr/testify/require"
14)
15
16func testQueueBasic(t *testing.T, newFn func(cfg *BaseConfig) (baseQueue, error), cfg *BaseConfig, isUnique bool) {
17 t.Run(fmt.Sprintf("testQueueBasic-%s-unique:%v", cfg.ManagedName, isUnique), func(t *testing.T) {
18 q, err := newFn(cfg)
19 require.NoError(t, err)
20
21 ctx := t.Context()
22 _ = q.RemoveAll(ctx)
23 cnt, err := q.Len(ctx)
24 require.NoError(t, err)
25 assert.Equal(t, 0, cnt)
26
27 // push the first item
28 err = q.PushItem(ctx, []byte("foo"))
29 require.NoError(t, err)
30
31 cnt, err = q.Len(ctx)
32 require.NoError(t, err)
33 assert.Equal(t, 1, cnt)
34
35 // push a duplicate item
36 err = q.PushItem(ctx, []byte("foo"))
37 if !isUnique {
38 require.NoError(t, err)
39 } else {
40 require.ErrorIs(t, err, ErrAlreadyInQueue)
41 }
42
43 // check the duplicate item
44 cnt, err = q.Len(ctx)
45 require.NoError(t, err)
46 has, err := q.HasItem(ctx, []byte("foo"))
47 require.NoError(t, err)
48 if !isUnique {
49 assert.Equal(t, 2, cnt)
50 assert.False(t, has) // non-unique queues don't check for duplicates
51 } else {
52 assert.Equal(t, 1, cnt)
53 assert.True(t, has)
54 }
55
56 // push another item
57 err = q.PushItem(ctx, []byte("bar"))
58 require.NoError(t, err)
59
60 // pop the first item (and the duplicate if non-unique)
61 it, err := q.PopItem(ctx)
62 require.NoError(t, err)
63 assert.Equal(t, "foo", string(it))
64
65 if !isUnique {
66 it, err = q.PopItem(ctx)
67 require.NoError(t, err)
68 assert.Equal(t, "foo", string(it))
69 }
70
71 // pop another item
72 it, err = q.PopItem(ctx)
73 require.NoError(t, err)
74 assert.Equal(t, "bar", string(it))
75
76 // pop an empty queue (timeout, cancel)
77 ctxTimed, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
78 it, err = q.PopItem(ctxTimed)
79 require.ErrorIs(t, err, context.DeadlineExceeded)
80 assert.Nil(t, it)
81 cancel()
82
83 ctxTimed, cancel = context.WithTimeout(ctx, 10*time.Millisecond)
84 cancel()
85 it, err = q.PopItem(ctxTimed)
86 require.ErrorIs(t, err, context.Canceled)
87 assert.Nil(t, it)
88
89 // test blocking push if queue is full
90 for i := 0; i < cfg.Length; i++ {
91 err = q.PushItem(ctx, []byte(fmt.Sprintf("item-%d", i)))
92 require.NoError(t, err)
93 }
94 ctxTimed, cancel = context.WithTimeout(ctx, 10*time.Millisecond)
95 err = q.PushItem(ctxTimed, []byte("item-full"))
96 require.ErrorIs(t, err, context.DeadlineExceeded)
97 cancel()
98
99 // test blocking push if queue is full (with custom pushBlockTime)
100 oldPushBlockTime := pushBlockTime
101 timeStart := time.Now()
102 pushBlockTime = 30 * time.Millisecond
103 err = q.PushItem(ctx, []byte("item-full"))
104 require.ErrorIs(t, err, context.DeadlineExceeded)
105 assert.GreaterOrEqual(t, time.Since(timeStart), pushBlockTime*2/3)
106 pushBlockTime = oldPushBlockTime
107
108 // remove all
109 cnt, err = q.Len(ctx)
110 require.NoError(t, err)
111 assert.Equal(t, cfg.Length, cnt)
112
113 _ = q.RemoveAll(ctx)
114
115 cnt, err = q.Len(ctx)
116 require.NoError(t, err)
117 assert.Equal(t, 0, cnt)
118 })
119}
120
121func TestBaseDummy(t *testing.T) {
122 q, err := newBaseDummy(&BaseConfig{}, true)
123 require.NoError(t, err)
124
125 ctx := t.Context()
126 require.NoError(t, q.PushItem(ctx, []byte("foo")))
127
128 cnt, err := q.Len(ctx)
129 require.NoError(t, err)
130 assert.Equal(t, 0, cnt)
131
132 has, err := q.HasItem(ctx, []byte("foo"))
133 require.NoError(t, err)
134 assert.False(t, has)
135
136 it, err := q.PopItem(ctx)
137 require.NoError(t, err)
138 assert.Nil(t, it)
139
140 require.NoError(t, q.RemoveAll(ctx))
141}