1// Copyright 2023 The Gitea Authors. All rights reserved.
2// SPDX-License-Identifier: MIT
3
4package queue
5
6import (
7 "testing"
8
9 "forgejo.org/modules/queue/lqinternal"
10 "forgejo.org/modules/setting"
11
12 "code.forgejo.org/forgejo/levelqueue"
13 "github.com/stretchr/testify/assert"
14 "github.com/stretchr/testify/require"
15 "github.com/syndtr/goleveldb/leveldb"
16)
17
18func TestBaseLevelDB(t *testing.T) {
19 _, err := newBaseLevelQueueGeneric(&BaseConfig{ConnStr: "redis://"}, false)
20 require.ErrorContains(t, err, "invalid leveldb connection string")
21
22 _, err = newBaseLevelQueueGeneric(&BaseConfig{DataFullDir: "relative"}, false)
23 require.ErrorContains(t, err, "invalid leveldb data dir")
24
25 testQueueBasic(t, newBaseLevelQueueSimple, toBaseConfig("baseLevelQueue", setting.QueueSettings{Datadir: t.TempDir() + "/queue-test", Length: 10}), false)
26 testQueueBasic(t, newBaseLevelQueueUnique, toBaseConfig("baseLevelQueueUnique", setting.QueueSettings{ConnStr: "leveldb://" + t.TempDir() + "/queue-test", Length: 10}), true)
27}
28
29func TestCorruptedLevelQueue(t *testing.T) {
30 // sometimes the levelqueue could be in a corrupted state, this test is to make sure it can recover from it
31 dbDir := t.TempDir() + "/levelqueue-test"
32 db, err := leveldb.OpenFile(dbDir, nil)
33 require.NoError(t, err)
34
35 defer db.Close()
36
37 require.NoError(t, db.Put([]byte("other-key"), []byte("other-value"), nil))
38
39 nameQueuePrefix := []byte("queue_name")
40 nameSetPrefix := []byte("set_name")
41 lq, err := levelqueue.NewUniqueQueue(db, nameQueuePrefix, nameSetPrefix, false)
42 require.NoError(t, err)
43 require.NoError(t, lq.RPush([]byte("item-1")))
44
45 itemKey := lqinternal.QueueItemKeyBytes(nameQueuePrefix, 1)
46 itemValue, err := db.Get(itemKey, nil)
47 require.NoError(t, err)
48 assert.Equal(t, []byte("item-1"), itemValue)
49
50 // there should be 5 keys in db: queue low, queue high, 1 queue item, 1 set item, and "other-key"
51 keys := lqinternal.ListLevelQueueKeys(db)
52 assert.Len(t, keys, 5)
53
54 // delete the queue item key, to corrupt the queue
55 require.NoError(t, db.Delete(itemKey, nil))
56 // now the queue is corrupted, it never works again
57 _, err = lq.LPop()
58 require.ErrorIs(t, err, levelqueue.ErrNotFound)
59 require.NoError(t, lq.Close())
60
61 // remove all the queue related keys to reset the queue
62 lqinternal.RemoveLevelQueueKeys(db, nameQueuePrefix)
63 lqinternal.RemoveLevelQueueKeys(db, nameSetPrefix)
64 // now there should be only 1 key in db: "other-key"
65 keys = lqinternal.ListLevelQueueKeys(db)
66 assert.Len(t, keys, 1)
67 assert.Equal(t, []byte("other-key"), keys[0])
68
69 // re-create a queue from db
70 lq, err = levelqueue.NewUniqueQueue(db, nameQueuePrefix, nameSetPrefix, false)
71 require.NoError(t, err)
72 require.NoError(t, lq.RPush([]byte("item-new-1")))
73 // now the queue works again
74 itemValue, err = lq.LPop()
75 require.NoError(t, err)
76 assert.Equal(t, []byte("item-new-1"), itemValue)
77 require.NoError(t, lq.Close())
78}