1package search
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "slices"
8 "strconv"
9 "strings"
10 "sync"
11
12 appbsky "github.com/bluesky-social/indigo/api/bsky"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14
15 "github.com/labstack/echo/v4"
16 otel "go.opentelemetry.io/otel"
17 "go.opentelemetry.io/otel/attribute"
18 "go.opentelemetry.io/otel/codes"
19)
20
21var tracer = otel.Tracer("search")
22
23func parseCursorLimit(e echo.Context) (int, int, error) {
24 offset := 0
25 if c := strings.TrimSpace(e.QueryParam("cursor")); c != "" {
26 v, err := strconv.Atoi(c)
27 if err != nil {
28 return 0, 0, &echo.HTTPError{
29 Code: 400,
30 Message: fmt.Sprintf("invalid value for 'cursor': %s", err),
31 }
32 }
33 offset = v
34 }
35
36 if offset < 0 {
37 offset = 0
38 }
39 if offset > 10000 {
40 return 0, 0, &echo.HTTPError{
41 Code: 400,
42 Message: "invalid value for 'cursor' (can't paginate so deep)",
43 }
44 }
45
46 limit := 25
47 if l := strings.TrimSpace(e.QueryParam("limit")); l != "" {
48 v, err := strconv.Atoi(l)
49 if err != nil {
50 return 0, 0, &echo.HTTPError{
51 Code: 400,
52 Message: fmt.Sprintf("invalid value for 'count': %s", err),
53 }
54 }
55
56 limit = v
57 }
58
59 if limit > 100 {
60 limit = 100
61 }
62 if limit < 0 {
63 limit = 0
64 }
65 return offset, limit, nil
66}
67
68func (s *Server) handleSearchPostsSkeleton(e echo.Context) error {
69 ctx, span := tracer.Start(e.Request().Context(), "handleSearchPostsSkeleton")
70 defer span.End()
71
72 span.SetAttributes(attribute.String("query", e.QueryParam("q")))
73
74 q := strings.TrimSpace(e.QueryParam("q"))
75 if q == "" {
76 return e.JSON(400, map[string]any{
77 "error": "must pass non-empty search query",
78 })
79 }
80
81 params := PostSearchParams{
82 Query: q,
83 // TODO: parse/validate the sort options here?
84 Sort: e.QueryParam("sort"),
85 Domain: e.QueryParam("domain"),
86 URL: e.QueryParam("url"),
87 }
88
89 viewerStr := e.QueryParam("viewer")
90 if viewerStr != "" {
91 d, err := syntax.ParseDID(viewerStr)
92 if err != nil {
93 return e.JSON(400, map[string]any{
94 "error": "BadRequest",
95 "message": fmt.Sprintf("invalid DID for 'viewer': %s", err),
96 })
97 }
98 params.Viewer = &d
99 }
100 authorStr := e.QueryParam("author")
101 if authorStr != "" {
102 atid, err := syntax.ParseAtIdentifier(authorStr)
103 if err != nil {
104 return &echo.HTTPError{
105 Code: 400,
106 Message: fmt.Sprintf("invalid DID for 'author': %s", err),
107 }
108 }
109 if atid.IsHandle() {
110 ident, err := s.dir.Lookup(e.Request().Context(), *atid)
111 if err != nil {
112 return e.JSON(400, map[string]any{
113 "error": "BadRequest",
114 "message": fmt.Sprintf("invalid Handle for 'author': %s", err),
115 })
116 }
117 params.Author = &ident.DID
118 } else {
119 d, err := atid.AsDID()
120 if err != nil {
121 return err
122 }
123 params.Author = &d
124 }
125 }
126
127 mentionsStr := e.QueryParam("mentions")
128 if mentionsStr != "" {
129 atid, err := syntax.ParseAtIdentifier(mentionsStr)
130 if err != nil {
131 return &echo.HTTPError{
132 Code: 400,
133 Message: fmt.Sprintf("invalid DID for 'mentions': %s", err),
134 }
135 }
136 if atid.IsHandle() {
137 ident, err := s.dir.Lookup(e.Request().Context(), *atid)
138 if err != nil {
139 return e.JSON(400, map[string]any{
140 "error": "BadRequest",
141 "message": fmt.Sprintf("invalid Handle for 'mentions': %s", err),
142 })
143 }
144 params.Mentions = &ident.DID
145 } else {
146 d, err := atid.AsDID()
147 if err != nil {
148 return err
149 }
150 params.Mentions = &d
151 }
152 }
153
154 sinceStr := e.QueryParam("since")
155 if sinceStr != "" {
156 dt, err := syntax.ParseDatetime(sinceStr)
157 if err != nil {
158 return e.JSON(400, map[string]any{
159 "error": "BadRequest",
160 "message": fmt.Sprintf("invalid Datetime for 'since': %s", err),
161 })
162 }
163 params.Since = &dt
164 }
165
166 untilStr := e.QueryParam("until")
167 if untilStr != "" {
168 dt, err := syntax.ParseDatetime(untilStr)
169 if err != nil {
170 return e.JSON(400, map[string]any{
171 "error": "BadRequest",
172 "message": fmt.Sprintf("invalid Datetime for 'until': %s", err),
173 })
174 }
175 params.Until = &dt
176 }
177
178 langStr := e.QueryParam("lang")
179 if langStr != "" {
180 l, err := syntax.ParseLanguage(langStr)
181 if err != nil {
182 return e.JSON(400, map[string]any{
183 "error": "BadRequest",
184 "message": fmt.Sprintf("invalid Language for 'lang': %s", err),
185 })
186 }
187 params.Lang = &l
188 }
189 // TODO: could be multiple tag params; guess we should "bind"?
190 tags := e.Request().URL.Query()["tags"]
191 if len(tags) > 0 {
192 params.Tags = tags
193 }
194
195 offset, limit, err := parseCursorLimit(e)
196 if err != nil {
197 span.SetAttributes(attribute.String("error", fmt.Sprintf("invalid cursor/limit: %s", err)))
198 span.SetStatus(codes.Error, err.Error())
199 return err
200 }
201
202 params.Offset = offset
203 params.Size = limit
204 span.SetAttributes(attribute.Int("offset", offset), attribute.Int("limit", limit))
205
206 out, err := s.SearchPosts(ctx, ¶ms)
207 if err != nil {
208 span.SetAttributes(attribute.String("error", fmt.Sprintf("failed to SearchPosts: %s", err)))
209 span.SetStatus(codes.Error, err.Error())
210 return err
211 }
212
213 span.SetAttributes(attribute.Int("posts.length", len(out.Posts)))
214
215 return e.JSON(200, out)
216}
217
218func (s *Server) handleSearchActorsSkeleton(e echo.Context) error {
219 ctx, span := tracer.Start(e.Request().Context(), "handleSearchActorsSkeleton")
220 defer span.End()
221
222 span.SetAttributes(attribute.String("query", e.QueryParam("q")))
223
224 q := strings.TrimSpace(e.QueryParam("q"))
225 if q == "" {
226 return e.JSON(400, map[string]any{
227 "error": "BadRequest",
228 "message": "must pass non-empty search query",
229 })
230 }
231
232 offset, limit, err := parseCursorLimit(e)
233 if err != nil {
234 span.SetAttributes(attribute.String("error", fmt.Sprintf("invalid cursor/limit: %s", err)))
235 span.SetStatus(codes.Error, err.Error())
236 return err
237 }
238
239 typeahead := false
240 if q := strings.TrimSpace(e.QueryParam("typeahead")); q == "true" || q == "1" || q == "y" {
241 typeahead = true
242 }
243
244 params := ActorSearchParams{
245 Query: q,
246 Typeahead: typeahead,
247 Offset: offset,
248 Size: limit,
249 }
250
251 viewerStr := e.QueryParam("viewer")
252 if viewerStr != "" {
253 d, err := syntax.ParseDID(viewerStr)
254 if err != nil {
255 return e.JSON(400, map[string]any{
256 "error": "BadRequest",
257 "message": fmt.Sprintf("invalid DID for 'viewer': %s", err),
258 })
259 }
260 params.Viewer = &d
261 }
262
263 span.SetAttributes(
264 attribute.Int("offset", offset),
265 attribute.Int("limit", limit),
266 attribute.Bool("typeahead", typeahead),
267 )
268
269 out, err := s.SearchProfiles(ctx, ¶ms)
270 if err != nil {
271 span.SetAttributes(attribute.String("error", fmt.Sprintf("failed to SearchProfiles: %s", err)))
272 span.SetStatus(codes.Error, err.Error())
273 return err
274 }
275
276 span.SetAttributes(attribute.Int("actors.length", len(out.Actors)))
277
278 return e.JSON(200, out)
279}
280
281func (s *Server) SearchPosts(ctx context.Context, params *PostSearchParams) (*appbsky.UnspeccedSearchPostsSkeleton_Output, error) {
282 ctx, span := tracer.Start(ctx, "SearchPosts")
283 defer span.End()
284
285 resp, err := DoSearchPosts(ctx, s.dir, s.escli, s.postIndex, params)
286 if err != nil {
287 return nil, err
288 }
289
290 posts := []*appbsky.UnspeccedDefs_SkeletonSearchPost{}
291 for _, r := range resp.Hits.Hits {
292 var doc PostDoc
293 if err := json.Unmarshal(r.Source, &doc); err != nil {
294 return nil, fmt.Errorf("decoding post doc from search response: %w", err)
295 }
296
297 did, err := syntax.ParseDID(doc.DID)
298 if err != nil {
299 return nil, fmt.Errorf("invalid DID in indexed document: %w", err)
300 }
301
302 posts = append(posts, &appbsky.UnspeccedDefs_SkeletonSearchPost{
303 Uri: fmt.Sprintf("at://%s/app.bsky.feed.post/%s", did, doc.RecordRkey),
304 })
305 }
306
307 out := appbsky.UnspeccedSearchPostsSkeleton_Output{Posts: posts}
308 if len(posts) == params.Size && (params.Offset+params.Size) < 10000 {
309 s := fmt.Sprintf("%d", params.Offset+params.Size)
310 out.Cursor = &s
311 }
312 if resp.Hits.Total.Relation == "eq" {
313 i := int64(resp.Hits.Total.Value)
314 out.HitsTotal = &i
315 }
316 return &out, nil
317}
318
319func (s *Server) SearchProfiles(ctx context.Context, params *ActorSearchParams) (*appbsky.UnspeccedSearchActorsSkeleton_Output, error) {
320 ctx, span := tracer.Start(ctx, "SearchProfiles")
321 defer span.End()
322 span.SetAttributes(
323 attribute.String("query", params.Query),
324 attribute.Bool("typeahead", params.Typeahead),
325 attribute.Int("offset", params.Offset),
326 attribute.Int("size", params.Size),
327 )
328
329 var globalResp *EsSearchResponse
330 var personalizedResp *EsSearchResponse
331 var globalErr error
332 var personalizedErr error
333
334 wg := sync.WaitGroup{}
335
336 wg.Add(1)
337 // Conduct the global search
338 go func(myQ ActorSearchParams) {
339 defer wg.Done()
340 // Clear out the following list to conduct the global search
341 myQ.Follows = nil
342
343 if myQ.Typeahead {
344 globalResp, globalErr = DoSearchProfilesTypeahead(ctx, s.escli, s.profileIndex, &myQ)
345 } else {
346 globalResp, globalErr = DoSearchProfiles(ctx, s.dir, s.escli, s.profileIndex, &myQ)
347 }
348 }(*params)
349
350 // If we have a following list, conduct a second search to filter the results
351 if len(params.Follows) > 0 {
352 wg.Add(1)
353 go func(myQ ActorSearchParams) {
354 defer wg.Done()
355 if myQ.Typeahead {
356 personalizedResp, personalizedErr = DoSearchProfilesTypeahead(ctx, s.escli, s.profileIndex, &myQ)
357 } else {
358 personalizedResp, personalizedErr = DoSearchProfiles(ctx, s.dir, s.escli, s.profileIndex, &myQ)
359 }
360 }(*params)
361 }
362
363 wg.Wait()
364
365 if globalErr != nil {
366 return nil, globalErr
367 }
368
369 if len(params.Follows) > 0 {
370 if personalizedErr != nil {
371 return nil, personalizedErr
372 }
373
374 followingBoost := 0.1
375
376 // Insert the personalized results into the global results, deduping as we go and maintaining score-order
377 followingSeen := map[string]struct{}{}
378 for _, r := range personalizedResp.Hits.Hits {
379 var doc ProfileDoc
380 if err := json.Unmarshal(r.Source, &doc); err != nil {
381 return nil, fmt.Errorf("decoding profile doc from search response: %w", err)
382 }
383
384 did, err := syntax.ParseDID(doc.DID)
385 if err != nil {
386 return nil, fmt.Errorf("invalid DID in indexed document: %w", err)
387 }
388
389 if _, ok := followingSeen[did.String()]; ok {
390 continue
391 }
392
393 followingSeen[did.String()] = struct{}{}
394
395 // Insert the profile into the global results
396 globalResp.Hits.Hits = append(globalResp.Hits.Hits, r)
397 }
398
399 // Walk the combined results and boost the scores of the personalized results and dedupe
400 seen := map[string]struct{}{}
401 deduped := []EsSearchHit{}
402 for _, r := range globalResp.Hits.Hits {
403 var doc ProfileDoc
404 if err := json.Unmarshal(r.Source, &doc); err != nil {
405 return nil, fmt.Errorf("decoding profile doc from search response: %w", err)
406 }
407
408 did, err := syntax.ParseDID(doc.DID)
409 if err != nil {
410 return nil, fmt.Errorf("invalid DID in indexed document: %w", err)
411 }
412
413 // Boost the score of the personalized results
414 if _, ok := followingSeen[did.String()]; ok {
415 r.Score += followingBoost
416 }
417
418 // Dedupe the results
419 if _, ok := seen[did.String()]; ok {
420 continue
421 }
422
423 seen[did.String()] = struct{}{}
424 deduped = append(deduped, r)
425 }
426
427 // Sort the results by score
428 slices.SortFunc(deduped, func(a, b EsSearchHit) int {
429 if a.Score < b.Score {
430 return 1
431 }
432 if a.Score > b.Score {
433 return -1
434 }
435 return 0
436 })
437
438 // Trim the results to the requested size
439 if len(deduped) > params.Size {
440 deduped = deduped[:params.Size]
441 }
442
443 globalResp.Hits.Hits = deduped
444 }
445
446 actors := []*appbsky.UnspeccedDefs_SkeletonSearchActor{}
447 for _, r := range globalResp.Hits.Hits {
448 var doc ProfileDoc
449 if err := json.Unmarshal(r.Source, &doc); err != nil {
450 return nil, fmt.Errorf("decoding profile doc from search response: %w", err)
451 }
452
453 did, err := syntax.ParseDID(doc.DID)
454 if err != nil {
455 return nil, fmt.Errorf("invalid DID in indexed document: %w", err)
456 }
457
458 actors = append(actors, &appbsky.UnspeccedDefs_SkeletonSearchActor{
459 Did: did.String(),
460 })
461 }
462
463 out := appbsky.UnspeccedSearchActorsSkeleton_Output{Actors: actors}
464 if len(actors) == params.Size && (params.Offset+params.Size) < 10000 {
465 s := fmt.Sprintf("%d", params.Offset+params.Size)
466 out.Cursor = &s
467 }
468 if globalResp.Hits.Total.Relation == "eq" {
469 i := int64(globalResp.Hits.Total.Value)
470 out.HitsTotal = &i
471 }
472 return &out, nil
473}