+1
.gitignore
+1
.gitignore
+20
appview/indexer/base36/base36.go
+20
appview/indexer/base36/base36.go
···
···
1
+
// mostly copied from gitea/modules/indexer/internal/base32
2
+
3
+
package base36
4
+
5
+
import (
6
+
"fmt"
7
+
"strconv"
8
+
)
9
+
10
+
func Encode(i int64) string {
11
+
return strconv.FormatInt(i, 36)
12
+
}
13
+
14
+
func Decode(s string) (int64, error) {
15
+
i, err := strconv.ParseInt(s, 36, 64)
16
+
if err != nil {
17
+
return 0, fmt.Errorf("invalid base36 integer %q: %w", s, err)
18
+
}
19
+
return i, nil
20
+
}
+58
appview/indexer/bleve/batch.go
+58
appview/indexer/bleve/batch.go
···
···
1
+
// Copyright 2021 The Gitea Authors. All rights reserved.
2
+
// SPDX-License-Identifier: MIT
3
+
4
+
package bleveutil
5
+
6
+
import (
7
+
"github.com/blevesearch/bleve/v2"
8
+
)
9
+
10
+
// FlushingBatch is a batch of operations that automatically flushes to the
11
+
// underlying index once it reaches a certain size.
12
+
type FlushingBatch struct {
13
+
maxBatchSize int
14
+
batch *bleve.Batch
15
+
index bleve.Index
16
+
}
17
+
18
+
// NewFlushingBatch creates a new flushing batch for the specified index. Once
19
+
// the number of operations in the batch reaches the specified limit, the batch
20
+
// automatically flushes its operations to the index.
21
+
func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch {
22
+
return &FlushingBatch{
23
+
maxBatchSize: maxBatchSize,
24
+
batch: index.NewBatch(),
25
+
index: index,
26
+
}
27
+
}
28
+
29
+
// Index add a new index to batch
30
+
func (b *FlushingBatch) Index(id string, data any) error {
31
+
if err := b.batch.Index(id, data); err != nil {
32
+
return err
33
+
}
34
+
return b.flushIfFull()
35
+
}
36
+
37
+
// Delete add a delete index to batch
38
+
func (b *FlushingBatch) Delete(id string) error {
39
+
b.batch.Delete(id)
40
+
return b.flushIfFull()
41
+
}
42
+
43
+
func (b *FlushingBatch) flushIfFull() error {
44
+
if b.batch.Size() < b.maxBatchSize {
45
+
return nil
46
+
}
47
+
return b.Flush()
48
+
}
49
+
50
+
// Flush submit the batch and create a new one
51
+
func (b *FlushingBatch) Flush() error {
52
+
err := b.index.Batch(b.batch)
53
+
if err != nil {
54
+
return err
55
+
}
56
+
b.batch = b.index.NewBatch()
57
+
return nil
58
+
}
+24
appview/indexer/bleve/query.go
+24
appview/indexer/bleve/query.go
···
···
1
+
package bleveutil
2
+
3
+
import (
4
+
"github.com/blevesearch/bleve/v2"
5
+
"github.com/blevesearch/bleve/v2/search/query"
6
+
)
7
+
8
+
func MatchAndQuery(field, keyword string) query.Query {
9
+
q := bleve.NewMatchQuery(keyword)
10
+
q.FieldVal = field
11
+
return q
12
+
}
13
+
14
+
func BoolFieldQuery(field string, val bool) query.Query {
15
+
q := bleve.NewBoolFieldQuery(val)
16
+
q.FieldVal = field
17
+
return q
18
+
}
19
+
20
+
func KeywordFieldQuery(field, keyword string) query.Query {
21
+
q := bleve.NewTermQuery(keyword)
22
+
q.FieldVal = field
23
+
return q
24
+
}
+32
appview/indexer/indexer.go
+32
appview/indexer/indexer.go
···
···
1
+
package indexer
2
+
3
+
import (
4
+
"context"
5
+
"log/slog"
6
+
7
+
"tangled.org/core/appview/db"
8
+
issues_indexer "tangled.org/core/appview/indexer/issues"
9
+
"tangled.org/core/appview/notify"
10
+
tlog "tangled.org/core/log"
11
+
)
12
+
13
+
type Indexer struct {
14
+
Issues *issues_indexer.Indexer
15
+
logger *slog.Logger
16
+
notify.BaseNotifier
17
+
}
18
+
19
+
func New(logger *slog.Logger) *Indexer {
20
+
return &Indexer{
21
+
issues_indexer.NewIndexer("indexes.bleve"),
22
+
logger,
23
+
notify.BaseNotifier{},
24
+
}
25
+
}
26
+
27
+
// Init initializes all indexers
28
+
func (ix *Indexer) Init(ctx context.Context, db *db.DB) error {
29
+
ctx = tlog.IntoContext(ctx, ix.logger)
30
+
ix.Issues.Init(ctx, db)
31
+
return nil
32
+
}
+180
appview/indexer/issues/indexer.go
+180
appview/indexer/issues/indexer.go
···
···
1
+
// heavily inspired by gitea's model (basically copy-pasted)
2
+
package issues_indexer
3
+
4
+
import (
5
+
"context"
6
+
"errors"
7
+
"log"
8
+
"os"
9
+
10
+
"github.com/blevesearch/bleve/v2"
11
+
"github.com/blevesearch/bleve/v2/index/upsidedown"
12
+
"github.com/blevesearch/bleve/v2/search/query"
13
+
"tangled.org/core/appview/db"
14
+
"tangled.org/core/appview/indexer/base36"
15
+
"tangled.org/core/appview/indexer/bleve"
16
+
"tangled.org/core/appview/models"
17
+
"tangled.org/core/appview/pagination"
18
+
tlog "tangled.org/core/log"
19
+
)
20
+
21
+
type Indexer struct {
22
+
indexer bleve.Index
23
+
path string
24
+
}
25
+
26
+
func NewIndexer(indexDir string) *Indexer {
27
+
return &Indexer{
28
+
path: indexDir,
29
+
}
30
+
}
31
+
32
+
// Init initializes the indexer
33
+
func (ix *Indexer) Init(ctx context.Context, e db.Execer) {
34
+
l := tlog.FromContext(ctx)
35
+
existed, err := ix.intialize(ctx)
36
+
if err != nil {
37
+
log.Fatalln("failed to initialize issue indexer", err)
38
+
}
39
+
if !existed {
40
+
l.Debug("Populating the issue indexer")
41
+
err := PopulateIndexer(ctx, ix, e)
42
+
if err != nil {
43
+
log.Fatalln("failed to populate issue indexer", err)
44
+
}
45
+
}
46
+
l.Info("Initialized the issue indexer")
47
+
}
48
+
49
+
func (ix *Indexer) intialize(ctx context.Context) (bool, error) {
50
+
if ix.indexer != nil {
51
+
return false, errors.New("indexer is already initialized")
52
+
}
53
+
54
+
indexer, err := openIndexer(ctx, ix.path)
55
+
if err != nil {
56
+
return false, err
57
+
}
58
+
if indexer != nil {
59
+
ix.indexer = indexer
60
+
return true, nil
61
+
}
62
+
63
+
mapping := bleve.NewIndexMapping()
64
+
indexer, err = bleve.New(ix.path, mapping)
65
+
if err != nil {
66
+
return false, err
67
+
}
68
+
69
+
ix.indexer = indexer
70
+
71
+
return false, nil
72
+
}
73
+
74
+
func openIndexer(ctx context.Context, path string) (bleve.Index, error) {
75
+
l := tlog.FromContext(ctx)
76
+
indexer, err := bleve.Open(path)
77
+
if err != nil {
78
+
if errors.Is(err, upsidedown.IncompatibleVersion) {
79
+
l.Info("Indexer was built with a previous version of bleve, deleting and rebuilding")
80
+
return nil, os.RemoveAll(path)
81
+
}
82
+
return nil, nil
83
+
}
84
+
return indexer, nil
85
+
}
86
+
87
+
func PopulateIndexer(ctx context.Context, ix *Indexer, e db.Execer) error {
88
+
l := tlog.FromContext(ctx)
89
+
count := 0
90
+
err := pagination.IterateAll(
91
+
func(page pagination.Page) ([]models.Issue, error) {
92
+
return db.GetIssuesPaginated(e, page)
93
+
},
94
+
func(issues []models.Issue) error {
95
+
count += len(issues)
96
+
return ix.Index(ctx, issues...)
97
+
},
98
+
)
99
+
l.Info("issues indexed", "count", count)
100
+
return err
101
+
}
102
+
103
+
// issueData data stored and will be indexed
104
+
type issueData struct {
105
+
ID int64 `json:"id"`
106
+
RepoAt string `json:"repo_at"`
107
+
IssueID int `json:"issue_id"`
108
+
Title string `json:"title"`
109
+
Body string `json:"body"`
110
+
111
+
IsOpen bool `json:"is_open"`
112
+
Comments []IssueCommentData `json:"comments"`
113
+
}
114
+
115
+
func makeIssueData(issue *models.Issue) *issueData {
116
+
return &issueData{
117
+
ID: issue.Id,
118
+
RepoAt: issue.RepoAt.String(),
119
+
IssueID: issue.IssueId,
120
+
Title: issue.Title,
121
+
Body: issue.Body,
122
+
IsOpen: issue.Open,
123
+
}
124
+
}
125
+
126
+
type IssueCommentData struct {
127
+
Body string `json:"body"`
128
+
}
129
+
130
+
type SearchResult struct {
131
+
Hits []int64
132
+
Total uint64
133
+
}
134
+
135
+
const maxBatchSize = 20
136
+
137
+
func (ix *Indexer) Index(ctx context.Context, issues ...models.Issue) error {
138
+
batch := bleveutil.NewFlushingBatch(ix.indexer, maxBatchSize)
139
+
for _, issue := range issues {
140
+
issueData := makeIssueData(&issue)
141
+
if err := batch.Index(base36.Encode(issue.Id), issueData); err != nil {
142
+
return err
143
+
}
144
+
}
145
+
return batch.Flush()
146
+
}
147
+
148
+
// Search searches for issues
149
+
func (ix *Indexer) Search(ctx context.Context, opts models.IssueSearchOptions) (*SearchResult, error) {
150
+
var queries []query.Query
151
+
152
+
if opts.Keyword != "" {
153
+
queries = append(queries, bleve.NewDisjunctionQuery(
154
+
bleveutil.MatchAndQuery("title", opts.Keyword),
155
+
bleveutil.MatchAndQuery("body", opts.Keyword),
156
+
))
157
+
}
158
+
queries = append(queries, bleveutil.KeywordFieldQuery("repo_at", opts.RepoAt))
159
+
queries = append(queries, bleveutil.BoolFieldQuery("is_open", opts.IsOpen))
160
+
// TODO: append more queries
161
+
162
+
var indexerQuery query.Query = bleve.NewConjunctionQuery(queries...)
163
+
searchReq := bleve.NewSearchRequestOptions(indexerQuery, opts.Page.Limit, opts.Page.Offset, false)
164
+
res, err := ix.indexer.SearchInContext(ctx, searchReq)
165
+
if err != nil {
166
+
return nil, nil
167
+
}
168
+
ret := &SearchResult{
169
+
Total: res.Total,
170
+
Hits: make([]int64, len(res.Hits)),
171
+
}
172
+
for i, hit := range res.Hits {
173
+
id, err := base36.Decode(hit.ID)
174
+
if err != nil {
175
+
return nil, err
176
+
}
177
+
ret.Hits[i] = id
178
+
}
179
+
return ret, nil
180
+
}
+20
appview/indexer/notifier.go
+20
appview/indexer/notifier.go
···
···
1
+
package indexer
2
+
3
+
import (
4
+
"context"
5
+
6
+
"tangled.org/core/appview/models"
7
+
"tangled.org/core/appview/notify"
8
+
"tangled.org/core/log"
9
+
)
10
+
11
+
var _ notify.Notifier = &Indexer{}
12
+
13
+
func (ix *Indexer) NewIssue(ctx context.Context, issue *models.Issue) {
14
+
l := log.FromContext(ctx).With("notifier", "indexer.NewIssue", "issue", issue)
15
+
l.Debug("indexing new issue")
16
+
err := ix.Issues.Index(ctx, *issue)
17
+
if err != nil {
18
+
l.Error("failed to index an issue", "err", err)
19
+
}
20
+
}
+5
appview/issues/issues.go
+5
appview/issues/issues.go
···
19
"tangled.org/core/api/tangled"
20
"tangled.org/core/appview/config"
21
"tangled.org/core/appview/db"
22
"tangled.org/core/appview/models"
23
"tangled.org/core/appview/notify"
24
"tangled.org/core/appview/oauth"
···
40
notifier notify.Notifier
41
logger *slog.Logger
42
validator *validator.Validator
43
}
44
45
func New(
···
51
config *config.Config,
52
notifier notify.Notifier,
53
validator *validator.Validator,
54
logger *slog.Logger,
55
) *Issues {
56
return &Issues{
···
63
notifier: notifier,
64
logger: logger,
65
validator: validator,
66
}
67
}
68
···
843
Rkey: tid.TID(),
844
Title: r.FormValue("title"),
845
Body: r.FormValue("body"),
846
Did: user.Did,
847
Created: time.Now(),
848
Repo: &f.Repo,
···
19
"tangled.org/core/api/tangled"
20
"tangled.org/core/appview/config"
21
"tangled.org/core/appview/db"
22
+
issues_indexer "tangled.org/core/appview/indexer/issues"
23
"tangled.org/core/appview/models"
24
"tangled.org/core/appview/notify"
25
"tangled.org/core/appview/oauth"
···
41
notifier notify.Notifier
42
logger *slog.Logger
43
validator *validator.Validator
44
+
indexer *issues_indexer.Indexer
45
}
46
47
func New(
···
53
config *config.Config,
54
notifier notify.Notifier,
55
validator *validator.Validator,
56
+
indexer *issues_indexer.Indexer,
57
logger *slog.Logger,
58
) *Issues {
59
return &Issues{
···
66
notifier: notifier,
67
logger: logger,
68
validator: validator,
69
+
indexer: indexer,
70
}
71
}
72
···
847
Rkey: tid.TID(),
848
Title: r.FormValue("title"),
849
Body: r.FormValue("body"),
850
+
Open: true,
851
Did: user.Did,
852
Created: time.Now(),
853
Repo: &f.Repo,
+23
appview/models/search.go
+23
appview/models/search.go
···
···
1
+
package models
2
+
3
+
import "tangled.org/core/appview/pagination"
4
+
5
+
type IssueSearchOptions struct {
6
+
Keyword string
7
+
RepoAt string
8
+
IsOpen bool
9
+
10
+
Page pagination.Page
11
+
}
12
+
13
+
// func (so *SearchOptions) ToFilters() []filter {
14
+
// var filters []filter
15
+
// if so.IsOpen != nil {
16
+
// openValue := 0
17
+
// if *so.IsOpen {
18
+
// openValue = 1
19
+
// }
20
+
// filters = append(filters, FilterEq("open", openValue))
21
+
// }
22
+
// return filters
23
+
// }
+1
appview/pages/pages.go
+1
appview/pages/pages.go
+23
appview/pagination/page.go
+23
appview/pagination/page.go
···
52
Limit: p.Limit,
53
}
54
}
55
+
56
+
func IterateAll[T any](
57
+
fetch func(page Page) ([]T, error),
58
+
handle func(items []T) error,
59
+
) error {
60
+
page := FirstPage()
61
+
for {
62
+
items, err := fetch(page)
63
+
if err != nil {
64
+
return err
65
+
}
66
+
67
+
err = handle(items)
68
+
if err != nil {
69
+
return err
70
+
}
71
+
if len(items) < page.Limit {
72
+
break
73
+
}
74
+
page = page.Next()
75
+
}
76
+
return nil
77
+
}
+1
appview/state/router.go
+1
appview/state/router.go
+10
appview/state/state.go
+10
appview/state/state.go
···
14
"tangled.org/core/appview"
15
"tangled.org/core/appview/config"
16
"tangled.org/core/appview/db"
17
"tangled.org/core/appview/models"
18
"tangled.org/core/appview/notify"
19
dbnotify "tangled.org/core/appview/notify/db"
···
43
type State struct {
44
db *db.DB
45
notifier notify.Notifier
46
oauth *oauth.OAuth
47
enforcer *rbac.Enforcer
48
pages *pages.Pages
···
63
d, err := db.Make(ctx, config.Core.DbPath)
64
if err != nil {
65
return nil, fmt.Errorf("failed to create db: %w", err)
66
}
67
68
enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
···
159
if !config.Core.Dev {
160
notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
161
}
162
notifier := notify.NewMergedNotifier(notifiers...)
163
164
state := &State{
165
d,
166
notifier,
167
oauth,
168
enforcer,
169
pages,
···
14
"tangled.org/core/appview"
15
"tangled.org/core/appview/config"
16
"tangled.org/core/appview/db"
17
+
"tangled.org/core/appview/indexer"
18
"tangled.org/core/appview/models"
19
"tangled.org/core/appview/notify"
20
dbnotify "tangled.org/core/appview/notify/db"
···
44
type State struct {
45
db *db.DB
46
notifier notify.Notifier
47
+
indexer *indexer.Indexer
48
oauth *oauth.OAuth
49
enforcer *rbac.Enforcer
50
pages *pages.Pages
···
65
d, err := db.Make(ctx, config.Core.DbPath)
66
if err != nil {
67
return nil, fmt.Errorf("failed to create db: %w", err)
68
+
}
69
+
70
+
indexer := indexer.New(log.SubLogger(logger, "indexer"))
71
+
err = indexer.Init(ctx, d)
72
+
if err != nil {
73
+
return nil, fmt.Errorf("failed to create indexer: %w", err)
74
}
75
76
enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
···
167
if !config.Core.Dev {
168
notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
169
}
170
+
notifiers = append(notifiers, indexer)
171
notifier := notify.NewMergedNotifier(notifiers...)
172
173
state := &State{
174
d,
175
notifier,
176
+
indexer,
177
oauth,
178
enforcer,
179
pages,