appview: add basic issue indexer (wip) #494

merged
opened by boltless.me targeting master from boltless.me/core: feat/search
  • Heavily inspired by gitea
  • add GetAllIssues which only receives a paginator and gathers all issues ignoring repoAt field

Signed-off-by: Seongmin Lee boltlessengineer@proton.me

Changed files
+391
appview
indexer
issues
models
pages
pagination
state
+1
.gitignore
··· 15 15 .env 16 16 *.rdb 17 17 .envrc 18 + *.bleve 18 19 # Created if following hacking.md 19 20 genjwks.out 20 21 /nix/vm-data
+20
appview/indexer/base36/base36.go
··· 1 + // mostly copied from gitea/modules/indexer/internal/base32 2 + 3 + package base36 4 + 5 + import ( 6 + "fmt" 7 + "strconv" 8 + ) 9 + 10 + func Encode(i int64) string { 11 + return strconv.FormatInt(i, 36) 12 + } 13 + 14 + func Decode(s string) (int64, error) { 15 + i, err := strconv.ParseInt(s, 36, 64) 16 + if err != nil { 17 + return 0, fmt.Errorf("invalid base36 integer %q: %w", s, err) 18 + } 19 + return i, nil 20 + }
+58
appview/indexer/bleve/batch.go
··· 1 + // Copyright 2021 The Gitea Authors. All rights reserved. 2 + // SPDX-License-Identifier: MIT 3 + 4 + package bleveutil 5 + 6 + import ( 7 + "github.com/blevesearch/bleve/v2" 8 + ) 9 + 10 + // FlushingBatch is a batch of operations that automatically flushes to the 11 + // underlying index once it reaches a certain size. 12 + type FlushingBatch struct { 13 + maxBatchSize int 14 + batch *bleve.Batch 15 + index bleve.Index 16 + } 17 + 18 + // NewFlushingBatch creates a new flushing batch for the specified index. Once 19 + // the number of operations in the batch reaches the specified limit, the batch 20 + // automatically flushes its operations to the index. 21 + func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch { 22 + return &FlushingBatch{ 23 + maxBatchSize: maxBatchSize, 24 + batch: index.NewBatch(), 25 + index: index, 26 + } 27 + } 28 + 29 + // Index add a new index to batch 30 + func (b *FlushingBatch) Index(id string, data any) error { 31 + if err := b.batch.Index(id, data); err != nil { 32 + return err 33 + } 34 + return b.flushIfFull() 35 + } 36 + 37 + // Delete add a delete index to batch 38 + func (b *FlushingBatch) Delete(id string) error { 39 + b.batch.Delete(id) 40 + return b.flushIfFull() 41 + } 42 + 43 + func (b *FlushingBatch) flushIfFull() error { 44 + if b.batch.Size() < b.maxBatchSize { 45 + return nil 46 + } 47 + return b.Flush() 48 + } 49 + 50 + // Flush submit the batch and create a new one 51 + func (b *FlushingBatch) Flush() error { 52 + err := b.index.Batch(b.batch) 53 + if err != nil { 54 + return err 55 + } 56 + b.batch = b.index.NewBatch() 57 + return nil 58 + }
+32
appview/indexer/indexer.go
··· 1 + package indexer 2 + 3 + import ( 4 + "context" 5 + "log/slog" 6 + 7 + "tangled.org/core/appview/db" 8 + issues_indexer "tangled.org/core/appview/indexer/issues" 9 + "tangled.org/core/appview/notify" 10 + tlog "tangled.org/core/log" 11 + ) 12 + 13 + type Indexer struct { 14 + Issues *issues_indexer.Indexer 15 + logger *slog.Logger 16 + notify.BaseNotifier 17 + } 18 + 19 + func New(logger *slog.Logger) *Indexer { 20 + return &Indexer { 21 + issues_indexer.NewIndexer("indexes.bleve"), 22 + logger, 23 + notify.BaseNotifier{}, 24 + } 25 + } 26 + 27 + // Init initializes all indexers 28 + func (ix *Indexer) Init(ctx context.Context, db *db.DB) error { 29 + ctx = tlog.IntoContext(ctx, ix.logger) 30 + ix.Issues.Init(ctx, db) 31 + return nil 32 + }
+197
appview/indexer/issues/indexer.go
··· 1 + // heavily inspired by gitea's model (basically copy-pasted) 2 + package issues_indexer 3 + 4 + import ( 5 + "context" 6 + "errors" 7 + "os" 8 + 9 + "github.com/blevesearch/bleve/v2" 10 + "github.com/blevesearch/bleve/v2/index/upsidedown" 11 + "github.com/blevesearch/bleve/v2/search/query" 12 + "tangled.org/core/appview/db" 13 + "tangled.org/core/appview/indexer/base36" 14 + "tangled.org/core/appview/indexer/bleve" 15 + "tangled.org/core/appview/models" 16 + "tangled.org/core/appview/pagination" 17 + tlog "tangled.org/core/log" 18 + ) 19 + 20 + type Indexer struct { 21 + indexer bleve.Index 22 + path string 23 + } 24 + 25 + func NewIndexer(indexDir string) *Indexer { 26 + return &Indexer{ 27 + path: indexDir, 28 + } 29 + } 30 + 31 + // Init initializes the indexer 32 + func (ix *Indexer) Init(ctx context.Context, e db.Execer) { 33 + l := tlog.FromContext(ctx) 34 + existed, err := ix.intialize(ctx) 35 + if err != nil { 36 + l.Error("failed to initialize issue indexer", "err", err) 37 + } 38 + if !existed { 39 + l.Debug("Populating the issue indexer") 40 + err := PopulateIndexer(ctx, ix, e) 41 + if err != nil { 42 + l.Error("failed to populate issue indexer", "err", err) 43 + } 44 + } 45 + l.Info("Initialized the issue indexer") 46 + } 47 + 48 + func (ix *Indexer) intialize(ctx context.Context) (bool, error) { 49 + if ix.indexer != nil { 50 + return false, errors.New("indexer is already initialized") 51 + } 52 + 53 + indexer, err := openIndexer(ctx, ix.path) 54 + if err != nil { 55 + return false, err 56 + } 57 + if indexer != nil { 58 + ix.indexer = indexer 59 + return true, nil 60 + } 61 + 62 + mapping := bleve.NewIndexMapping() 63 + indexer, err = bleve.New(ix.path, mapping) 64 + if err != nil { 65 + return false, err 66 + } 67 + 68 + ix.indexer = indexer 69 + 70 + return false, nil 71 + } 72 + 73 + func openIndexer(ctx context.Context, path string) (bleve.Index, error) { 74 + l := tlog.FromContext(ctx) 75 + indexer, err := bleve.Open(path) 76 + if err != nil { 77 + if errors.Is(err, upsidedown.IncompatibleVersion) { 78 + l.Info("Indexer was built with a previous version of bleve, deleting and rebuilding") 79 + return nil, os.RemoveAll(path) 80 + } 81 + return nil, nil 82 + } 83 + return indexer, nil 84 + } 85 + 86 + func PopulateIndexer(ctx context.Context, ix *Indexer, e db.Execer) error { 87 + l := tlog.FromContext(ctx) 88 + count := 0 89 + err := pagination.IterateAll( 90 + func(page pagination.Page) ([]models.Issue, error) { 91 + return db.GetIssuesPaginated(e, page) 92 + }, 93 + func(issues []models.Issue) error { 94 + count += len(issues) 95 + return ix.Index(ctx, issues...) 96 + }, 97 + ) 98 + l.Info("issues indexed", "count", count) 99 + return err 100 + } 101 + 102 + // issueData data stored and will be indexed 103 + type issueData struct { 104 + ID int64 `json:"id"` 105 + RepoAt string `json:"repo_at"` 106 + IssueID int `json:"issue_id"` 107 + Title string `json:"title"` 108 + Body string `json:"body"` 109 + 110 + IsOpen bool `json:"is_open"` 111 + Comments []IssueCommentData `json:"comments"` 112 + } 113 + 114 + func makeIssueData(issue *models.Issue) issueData { 115 + return issueData{ 116 + ID: issue.Id, 117 + RepoAt: issue.RepoAt.String(), 118 + IssueID: issue.IssueId, 119 + Title: issue.Title, 120 + Body: issue.Body, 121 + IsOpen: issue.Open, 122 + } 123 + } 124 + 125 + type IssueCommentData struct { 126 + Body string `json:"body"` 127 + } 128 + 129 + type SearchResult struct { 130 + Hits []int64 131 + Total uint64 132 + } 133 + 134 + const maxBatchSize = 20 135 + 136 + func (ix *Indexer) Index(ctx context.Context, issues ...models.Issue) error { 137 + batch := bleveutil.NewFlushingBatch(ix.indexer, maxBatchSize) 138 + for _, issue := range issues { 139 + issueData := makeIssueData(&issue) 140 + if err := batch.Index(base36.Encode(issue.Id), issueData); err != nil { 141 + return err 142 + } 143 + } 144 + return batch.Flush() 145 + } 146 + 147 + // Search searches for issues 148 + func (ix *Indexer) Search(ctx context.Context, opts models.IssueSearchOptions) (*SearchResult, error) { 149 + var queries []query.Query 150 + 151 + if opts.Keyword != "" { 152 + queries = append(queries, bleve.NewDisjunctionQuery( 153 + matchAndQuery(opts.Keyword, "title"), 154 + matchAndQuery(opts.Keyword, "body"), 155 + )) 156 + } 157 + queries = append(queries, keywordFieldQuery(opts.RepoAt, "repo_at")) 158 + queries = append(queries, boolFieldQuery(opts.IsOpen, "is_open")) 159 + // TODO: append more queries 160 + 161 + var indexerQuery query.Query = bleve.NewConjunctionQuery(queries...) 162 + searchReq := bleve.NewSearchRequestOptions(indexerQuery, opts.Page.Limit, opts.Page.Offset, false) 163 + res, err := ix.indexer.SearchInContext(ctx, searchReq) 164 + if err != nil { 165 + return nil, nil 166 + } 167 + ret := &SearchResult{ 168 + Total: res.Total, 169 + Hits: make([]int64, len(res.Hits)), 170 + } 171 + for i, hit := range res.Hits { 172 + id, err := base36.Decode(hit.ID) 173 + if err != nil { 174 + return nil, err 175 + } 176 + ret.Hits[i] = id 177 + } 178 + return ret, nil 179 + } 180 + 181 + func matchAndQuery(keyword, field string) query.Query { 182 + q := bleve.NewMatchQuery(keyword) 183 + q.FieldVal = field 184 + return q 185 + } 186 + 187 + func boolFieldQuery(val bool, field string) query.Query { 188 + q := bleve.NewBoolFieldQuery(val) 189 + q.FieldVal = field 190 + return q 191 + } 192 + 193 + func keywordFieldQuery(keyword, field string) query.Query { 194 + q := bleve.NewTermQuery(keyword) 195 + q.FieldVal = field 196 + return q 197 + }
+20
appview/indexer/notifier.go
··· 1 + package indexer 2 + 3 + import ( 4 + "context" 5 + 6 + "tangled.org/core/appview/models" 7 + "tangled.org/core/appview/notify" 8 + "tangled.org/core/log" 9 + ) 10 + 11 + var _ notify.Notifier = &Indexer{} 12 + 13 + func (ix *Indexer) NewIssue(ctx context.Context, issue *models.Issue) { 14 + l := log.FromContext(ctx).With("notifier", "indexer.NewIssue", "issue", issue) 15 + l.Debug("indexing new issue") 16 + err := ix.Issues.Index(ctx, *issue) 17 + if err != nil { 18 + l.Error("failed to index an issue", "err", err) 19 + } 20 + }
+5
appview/issues/issues.go
··· 19 19 "tangled.org/core/api/tangled" 20 20 "tangled.org/core/appview/config" 21 21 "tangled.org/core/appview/db" 22 + issues_indexer "tangled.org/core/appview/indexer/issues" 22 23 "tangled.org/core/appview/models" 23 24 "tangled.org/core/appview/notify" 24 25 "tangled.org/core/appview/oauth" ··· 40 41 notifier notify.Notifier 41 42 logger *slog.Logger 42 43 validator *validator.Validator 44 + indexer *issues_indexer.Indexer 43 45 } 44 46 45 47 func New( ··· 51 53 config *config.Config, 52 54 notifier notify.Notifier, 53 55 validator *validator.Validator, 56 + indexer *issues_indexer.Indexer, 54 57 logger *slog.Logger, 55 58 ) *Issues { 56 59 return &Issues{ ··· 63 66 notifier: notifier, 64 67 logger: logger, 65 68 validator: validator, 69 + indexer: indexer, 66 70 } 67 71 } 68 72 ··· 847 851 Rkey: tid.TID(), 848 852 Title: r.FormValue("title"), 849 853 Body: r.FormValue("body"), 854 + Open: true, 850 855 Did: user.Did, 851 856 Created: time.Now(), 852 857 Repo: &f.Repo,
+23
appview/models/search.go
··· 1 + package models 2 + 3 + import "tangled.org/core/appview/pagination" 4 + 5 + type IssueSearchOptions struct { 6 + Keyword string 7 + RepoAt string 8 + IsOpen bool 9 + 10 + Page pagination.Page 11 + } 12 + 13 + // func (so *SearchOptions) ToFilters() []filter { 14 + // var filters []filter 15 + // if so.IsOpen != nil { 16 + // openValue := 0 17 + // if *so.IsOpen { 18 + // openValue = 1 19 + // } 20 + // filters = append(filters, FilterEq("open", openValue)) 21 + // } 22 + // return filters 23 + // }
+1
appview/pages/pages.go
··· 970 970 LabelDefs map[string]*models.LabelDefinition 971 971 Page pagination.Page 972 972 FilteringByOpen bool 973 + FilterQuery string 973 974 } 974 975 975 976 func (p *Pages) RepoIssues(w io.Writer, params RepoIssuesParams) error {
+23
appview/pagination/page.go
··· 29 29 Limit: p.Limit, 30 30 } 31 31 } 32 + 33 + func IterateAll[T any]( 34 + fetch func(page Page) ([]T, error), 35 + handle func(items []T) error, 36 + ) error { 37 + page := FirstPage() 38 + for { 39 + items, err := fetch(page) 40 + if err != nil { 41 + return err 42 + } 43 + 44 + err = handle(items) 45 + if err != nil { 46 + return err 47 + } 48 + if len(items) < page.Limit { 49 + break 50 + } 51 + page = page.Next() 52 + } 53 + return nil 54 + }
+1
appview/state/router.go
··· 262 262 s.config, 263 263 s.notifier, 264 264 s.validator, 265 + s.indexer.Issues, 265 266 log.SubLogger(s.logger, "issues"), 266 267 ) 267 268 return issues.Router(mw)
+10
appview/state/state.go
··· 14 14 "tangled.org/core/appview" 15 15 "tangled.org/core/appview/config" 16 16 "tangled.org/core/appview/db" 17 + "tangled.org/core/appview/indexer" 17 18 "tangled.org/core/appview/models" 18 19 "tangled.org/core/appview/notify" 19 20 dbnotify "tangled.org/core/appview/notify/db" ··· 43 44 type State struct { 44 45 db *db.DB 45 46 notifier notify.Notifier 47 + indexer *indexer.Indexer 46 48 oauth *oauth.OAuth 47 49 enforcer *rbac.Enforcer 48 50 pages *pages.Pages ··· 65 67 return nil, fmt.Errorf("failed to create db: %w", err) 66 68 } 67 69 70 + indexer := indexer.New(log.SubLogger(logger, "indexer")) 71 + err = indexer.Init(ctx, d) 72 + if err != nil { 73 + return nil, fmt.Errorf("failed to create indexer: %w", err) 74 + } 75 + 68 76 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 69 77 if err != nil { 70 78 return nil, fmt.Errorf("failed to create enforcer: %w", err) ··· 159 167 if !config.Core.Dev { 160 168 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 161 169 } 170 + notifiers = append(notifiers, indexer) 162 171 notifier := notify.NewMergedNotifier(notifiers...) 163 172 164 173 state := &State{ 165 174 d, 166 175 notifier, 176 + indexer, 167 177 oauth, 168 178 enforcer, 169 179 pages,