1// Copyright 2023 The Gitea Authors. All rights reserved.
2// SPDX-License-Identifier: MIT
3
4package queue
5
6import (
7 "context"
8 "time"
9)
10
11var pushBlockTime = 5 * time.Second
12
13type baseQueue interface {
14 PushItem(ctx context.Context, data []byte) error
15 PopItem(ctx context.Context) ([]byte, error)
16 HasItem(ctx context.Context, data []byte) (bool, error)
17 Len(ctx context.Context) (int, error)
18 Close() error
19 RemoveAll(ctx context.Context) error
20}
21
22func popItemByChan(ctx context.Context, popItemFn func(ctx context.Context) ([]byte, error)) (chanItem chan []byte, chanErr chan error) {
23 chanItem = make(chan []byte)
24 chanErr = make(chan error)
25 go func() {
26 for {
27 it, err := popItemFn(ctx)
28 if err != nil {
29 close(chanItem)
30 chanErr <- err
31 return
32 }
33 if it == nil {
34 close(chanItem)
35 close(chanErr)
36 return
37 }
38 chanItem <- it
39 }
40 }()
41 return chanItem, chanErr
42}