appview: add basic issue indexer #672

closed
opened by boltless.me targeting master from boltless.me/core: feat/search
  • Heavily inspired by gitea
  • add GetAllIssues which only receives a paginator and gathers all issues ignoring repoAt field

Signed-off-by: Seongmin Lee boltlessengineer@proton.me

Changed files
+388
appview
indexer
issues
models
pages
pagination
state
+1
.gitignore
··· 15 15 .env 16 16 *.rdb 17 17 .envrc 18 + *.bleve 18 19 # Created if following hacking.md 19 20 genjwks.out 20 21 /nix/vm-data
+20
appview/indexer/base36/base36.go
··· 1 + // mostly copied from gitea/modules/indexer/internal/base32 2 + 3 + package base36 4 + 5 + import ( 6 + "fmt" 7 + "strconv" 8 + ) 9 + 10 + func Encode(i int64) string { 11 + return strconv.FormatInt(i, 36) 12 + } 13 + 14 + func Decode(s string) (int64, error) { 15 + i, err := strconv.ParseInt(s, 36, 64) 16 + if err != nil { 17 + return 0, fmt.Errorf("invalid base36 integer %q: %w", s, err) 18 + } 19 + return i, nil 20 + }
+58
appview/indexer/bleve/batch.go
··· 1 + // Copyright 2021 The Gitea Authors. All rights reserved. 2 + // SPDX-License-Identifier: MIT 3 + 4 + package bleveutil 5 + 6 + import ( 7 + "github.com/blevesearch/bleve/v2" 8 + ) 9 + 10 + // FlushingBatch is a batch of operations that automatically flushes to the 11 + // underlying index once it reaches a certain size. 12 + type FlushingBatch struct { 13 + maxBatchSize int 14 + batch *bleve.Batch 15 + index bleve.Index 16 + } 17 + 18 + // NewFlushingBatch creates a new flushing batch for the specified index. Once 19 + // the number of operations in the batch reaches the specified limit, the batch 20 + // automatically flushes its operations to the index. 21 + func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch { 22 + return &FlushingBatch{ 23 + maxBatchSize: maxBatchSize, 24 + batch: index.NewBatch(), 25 + index: index, 26 + } 27 + } 28 + 29 + // Index add a new index to batch 30 + func (b *FlushingBatch) Index(id string, data any) error { 31 + if err := b.batch.Index(id, data); err != nil { 32 + return err 33 + } 34 + return b.flushIfFull() 35 + } 36 + 37 + // Delete add a delete index to batch 38 + func (b *FlushingBatch) Delete(id string) error { 39 + b.batch.Delete(id) 40 + return b.flushIfFull() 41 + } 42 + 43 + func (b *FlushingBatch) flushIfFull() error { 44 + if b.batch.Size() < b.maxBatchSize { 45 + return nil 46 + } 47 + return b.Flush() 48 + } 49 + 50 + // Flush submit the batch and create a new one 51 + func (b *FlushingBatch) Flush() error { 52 + err := b.index.Batch(b.batch) 53 + if err != nil { 54 + return err 55 + } 56 + b.batch = b.index.NewBatch() 57 + return nil 58 + }
+32
appview/indexer/indexer.go
··· 1 + package indexer 2 + 3 + import ( 4 + "context" 5 + "log/slog" 6 + 7 + "tangled.org/core/appview/db" 8 + issues_indexer "tangled.org/core/appview/indexer/issues" 9 + "tangled.org/core/appview/notify" 10 + tlog "tangled.org/core/log" 11 + ) 12 + 13 + type Indexer struct { 14 + Issues *issues_indexer.Indexer 15 + logger *slog.Logger 16 + notify.BaseNotifier 17 + } 18 + 19 + func New(logger *slog.Logger) *Indexer { 20 + return &Indexer { 21 + issues_indexer.NewIndexer("indexes.bleve"), 22 + logger, 23 + notify.BaseNotifier{}, 24 + } 25 + } 26 + 27 + // Init initializes all indexers 28 + func (ix *Indexer) Init(ctx context.Context, db *db.DB) error { 29 + ctx = tlog.IntoContext(ctx, ix.logger) 30 + ix.Issues.Init(ctx, db) 31 + return nil 32 + }
+189
appview/indexer/issues/indexer.go
··· 1 + package issues_indexer 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "os" 7 + 8 + "github.com/blevesearch/bleve/v2" 9 + "github.com/blevesearch/bleve/v2/index/upsidedown" 10 + "github.com/blevesearch/bleve/v2/search/query" 11 + "tangled.org/core/appview/db" 12 + "tangled.org/core/appview/indexer/base36" 13 + "tangled.org/core/appview/indexer/bleve" 14 + "tangled.org/core/appview/models" 15 + "tangled.org/core/appview/pagination" 16 + tlog "tangled.org/core/log" 17 + ) 18 + 19 + type Indexer struct { 20 + indexer bleve.Index 21 + path string 22 + } 23 + 24 + func NewIndexer(indexDir string) *Indexer { 25 + return &Indexer{ 26 + path: indexDir, 27 + } 28 + } 29 + 30 + // Init initializes the indexer 31 + func (ix *Indexer) Init(ctx context.Context, e db.Execer) { 32 + l := tlog.FromContext(ctx) 33 + existed, err := ix.intialize(ctx) 34 + if err != nil { 35 + l.Error("failed to initialize issue indexer", "err", err) 36 + } 37 + if !existed { 38 + l.Debug("Populating the issue indexer") 39 + err := PopulateIndexer(ctx, ix, e) 40 + if err != nil { 41 + l.Error("failed to populate issue indexer", "err", err) 42 + } 43 + } 44 + l.Info("Initialized the issue indexer") 45 + } 46 + 47 + func (ix *Indexer) intialize(ctx context.Context) (bool, error) { 48 + if ix.indexer != nil { 49 + return false, errors.New("indexer is already initialized") 50 + } 51 + 52 + indexer, err := openIndexer(ctx, ix.path) 53 + if err != nil { 54 + return false, err 55 + } 56 + if indexer != nil { 57 + ix.indexer = indexer 58 + return true, nil 59 + } 60 + 61 + mapping := bleve.NewIndexMapping() 62 + indexer, err = bleve.New(ix.path, mapping) 63 + if err != nil { 64 + return false, err 65 + } 66 + 67 + ix.indexer = indexer 68 + 69 + return false, nil 70 + } 71 + 72 + func openIndexer(ctx context.Context, path string) (bleve.Index, error) { 73 + l := tlog.FromContext(ctx) 74 + indexer, err := bleve.Open(path) 75 + if err != nil { 76 + if errors.Is(err, upsidedown.IncompatibleVersion) { 77 + l.Info("Indexer was built with a previous version of bleve, deleting and rebuilding") 78 + return nil, os.RemoveAll(path) 79 + } 80 + return nil, nil 81 + } 82 + return indexer, nil 83 + } 84 + 85 + func PopulateIndexer(ctx context.Context, ix *Indexer, e db.Execer) error { 86 + l := tlog.FromContext(ctx) 87 + count := 0 88 + err := pagination.IterateAll( 89 + func(page pagination.Page) ([]models.Issue, error) { 90 + return db.GetIssuesPaginated(e, page) 91 + }, 92 + func(issues []models.Issue) error { 93 + var dataList []*IssueData 94 + for _, issue := range issues { 95 + dataList = append(dataList, &IssueData{ 96 + ID: issue.Id, 97 + IssueID: issue.IssueId, 98 + Title: issue.Title, 99 + Body: issue.Body, 100 + IsOpen: issue.Open, 101 + }) 102 + err := ix.Index(ctx, dataList...) 103 + if err != nil { 104 + return err 105 + } 106 + } 107 + return nil 108 + }, 109 + ) 110 + l.Info("issues indexed", "count", count) 111 + return err 112 + } 113 + 114 + // IssueData data stored and will be indexed 115 + type IssueData struct { 116 + ID int64 `json:"id"` 117 + IssueID int `json:"issue_id"` 118 + Title string `json:"title"` 119 + Body string `json:"body"` 120 + 121 + IsOpen bool `json:"is_open"` 122 + Comments []IssueCommentData `json:"comments"` 123 + } 124 + 125 + type IssueCommentData struct { 126 + Body string `json:"body"` 127 + } 128 + 129 + type SearchResult struct { 130 + Hits []int64 131 + Total uint64 132 + } 133 + 134 + const maxBatchSize = 20 135 + 136 + func (ix *Indexer) Index(ctx context.Context, issues ...*IssueData) error { 137 + batch := bleveutil.NewFlushingBatch(ix.indexer, maxBatchSize) 138 + for _, issue := range issues { 139 + if err := batch.Index(base36.Encode(issue.ID), issue); err != nil { 140 + return err 141 + } 142 + } 143 + return batch.Flush() 144 + } 145 + 146 + // Search searches for issues 147 + func (ix *Indexer) Search(ctx context.Context, opts models.IssueSearchOptions) (*SearchResult, error) { 148 + var queries []query.Query 149 + 150 + if opts.Keyword != "" { 151 + queries = append(queries, bleve.NewDisjunctionQuery( 152 + matchAndQuery(opts.Keyword, "title"), 153 + matchAndQuery(opts.Keyword, "body"), 154 + )) 155 + } 156 + queries = append(queries, boolFieldQuery(opts.IsOpen, "is_open")) 157 + // TODO: append more queries 158 + 159 + var indexerQuery query.Query = bleve.NewConjunctionQuery(queries...) 160 + searchReq := bleve.NewSearchRequestOptions(indexerQuery, opts.Page.Limit, opts.Page.Offset, false) 161 + res, err := ix.indexer.SearchInContext(ctx, searchReq) 162 + if err != nil { 163 + return nil, nil 164 + } 165 + ret := &SearchResult{ 166 + Total: res.Total, 167 + Hits: make([]int64, len(res.Hits)), 168 + } 169 + for i, hit := range res.Hits { 170 + id, err := base36.Decode(hit.ID) 171 + if err != nil { 172 + return nil, err 173 + } 174 + ret.Hits[i] = id 175 + } 176 + return ret, nil 177 + } 178 + 179 + func matchAndQuery(keyword, field string) query.Query { 180 + q := bleve.NewMatchQuery(keyword) 181 + q.FieldVal = field 182 + return q 183 + } 184 + 185 + func boolFieldQuery(val bool, field string) query.Query { 186 + q := bleve.NewBoolFieldQuery(val) 187 + q.FieldVal = field 188 + return q 189 + }
+26
appview/indexer/notifier.go
··· 1 + package indexer 2 + 3 + import ( 4 + "context" 5 + 6 + issues_indexer "tangled.org/core/appview/indexer/issues" 7 + "tangled.org/core/appview/models" 8 + "tangled.org/core/appview/notify" 9 + ) 10 + 11 + var _ notify.Notifier = &Indexer{} 12 + 13 + func (ix *Indexer) NewIssue(ctx context.Context, issue *models.Issue) { 14 + ix.Issues.Index(ctx, &issues_indexer.IssueData{ 15 + ID: issue.Id, 16 + IssueID: issue.IssueId, 17 + Title: issue.Title, 18 + Body: issue.Body, 19 + IsOpen: issue.Open, 20 + Comments: []issues_indexer.IssueCommentData{}, 21 + }) 22 + } 23 + 24 + func (ix *Indexer) NewPullComment(ctx context.Context, comment *models.PullComment) { 25 + panic("unimplemented") 26 + }
+4
appview/issues/issues.go
··· 19 19 "tangled.org/core/api/tangled" 20 20 "tangled.org/core/appview/config" 21 21 "tangled.org/core/appview/db" 22 + issues_indexer "tangled.org/core/appview/indexer/issues" 22 23 "tangled.org/core/appview/models" 23 24 "tangled.org/core/appview/notify" 24 25 "tangled.org/core/appview/oauth" ··· 40 41 notifier notify.Notifier 41 42 logger *slog.Logger 42 43 validator *validator.Validator 44 + indexer *issues_indexer.Indexer 43 45 } 44 46 45 47 func New( ··· 51 53 config *config.Config, 52 54 notifier notify.Notifier, 53 55 validator *validator.Validator, 56 + indexer *issues_indexer.Indexer, 54 57 logger *slog.Logger, 55 58 ) *Issues { 56 59 return &Issues{ ··· 63 66 notifier: notifier, 64 67 logger: logger, 65 68 validator: validator, 69 + indexer: indexer, 66 70 } 67 71 } 68 72
+23
appview/models/search.go
··· 1 + package models 2 + 3 + import "tangled.org/core/appview/pagination" 4 + 5 + type IssueSearchOptions struct { 6 + Keyword string 7 + RepoAt string 8 + IsOpen bool 9 + 10 + Page pagination.Page 11 + } 12 + 13 + // func (so *SearchOptions) ToFilters() []filter { 14 + // var filters []filter 15 + // if so.IsOpen != nil { 16 + // openValue := 0 17 + // if *so.IsOpen { 18 + // openValue = 1 19 + // } 20 + // filters = append(filters, FilterEq("open", openValue)) 21 + // } 22 + // return filters 23 + // }
+1
appview/pages/pages.go
··· 970 970 LabelDefs map[string]*models.LabelDefinition 971 971 Page pagination.Page 972 972 FilteringByOpen bool 973 + FilterQuery string 973 974 } 974 975 975 976 func (p *Pages) RepoIssues(w io.Writer, params RepoIssuesParams) error {
+23
appview/pagination/page.go
··· 29 29 Limit: p.Limit, 30 30 } 31 31 } 32 + 33 + func IterateAll[T any]( 34 + fetch func(page Page) ([]T, error), 35 + handle func(items []T) error, 36 + ) error { 37 + page := FirstPage() 38 + for { 39 + items, err := fetch(page) 40 + if err != nil { 41 + return err 42 + } 43 + 44 + err = handle(items) 45 + if err != nil { 46 + return err 47 + } 48 + if len(items) < page.Limit { 49 + break 50 + } 51 + page = page.Next() 52 + } 53 + return nil 54 + }
+1
appview/state/router.go
··· 262 262 s.config, 263 263 s.notifier, 264 264 s.validator, 265 + s.indexer.Issues, 265 266 log.SubLogger(s.logger, "issues"), 266 267 ) 267 268 return issues.Router(mw)
+10
appview/state/state.go
··· 14 14 "tangled.org/core/appview" 15 15 "tangled.org/core/appview/config" 16 16 "tangled.org/core/appview/db" 17 + "tangled.org/core/appview/indexer" 17 18 "tangled.org/core/appview/models" 18 19 "tangled.org/core/appview/notify" 19 20 dbnotify "tangled.org/core/appview/notify/db" ··· 43 44 type State struct { 44 45 db *db.DB 45 46 notifier notify.Notifier 47 + indexer *indexer.Indexer 46 48 oauth *oauth.OAuth 47 49 enforcer *rbac.Enforcer 48 50 pages *pages.Pages ··· 65 67 return nil, fmt.Errorf("failed to create db: %w", err) 66 68 } 67 69 70 + indexer := indexer.New(log.SubLogger(logger, "indexer")) 71 + err = indexer.Init(ctx, d) 72 + if err != nil { 73 + return nil, fmt.Errorf("failed to create indexer: %w", err) 74 + } 75 + 68 76 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 69 77 if err != nil { 70 78 return nil, fmt.Errorf("failed to create enforcer: %w", err) ··· 159 167 if !config.Core.Dev { 160 168 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 161 169 } 170 + notifiers = append(notifiers, indexer) 162 171 notifier := notify.NewMergedNotifier(notifiers...) 163 172 164 173 state := &State{ 165 174 d, 166 175 notifier, 176 + indexer, 167 177 oauth, 168 178 enforcer, 169 179 pages,