fork of indigo with slightly nicer lexgen
at main 12 kB view raw
1package search 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "slices" 8 "strconv" 9 "strings" 10 "sync" 11 12 appbsky "github.com/bluesky-social/indigo/api/bsky" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 15 "github.com/labstack/echo/v4" 16 otel "go.opentelemetry.io/otel" 17 "go.opentelemetry.io/otel/attribute" 18 "go.opentelemetry.io/otel/codes" 19) 20 21var tracer = otel.Tracer("search") 22 23func parseCursorLimit(e echo.Context) (int, int, error) { 24 offset := 0 25 if c := strings.TrimSpace(e.QueryParam("cursor")); c != "" { 26 v, err := strconv.Atoi(c) 27 if err != nil { 28 return 0, 0, &echo.HTTPError{ 29 Code: 400, 30 Message: fmt.Sprintf("invalid value for 'cursor': %s", err), 31 } 32 } 33 offset = v 34 } 35 36 if offset < 0 { 37 offset = 0 38 } 39 if offset > 10000 { 40 return 0, 0, &echo.HTTPError{ 41 Code: 400, 42 Message: "invalid value for 'cursor' (can't paginate so deep)", 43 } 44 } 45 46 limit := 25 47 if l := strings.TrimSpace(e.QueryParam("limit")); l != "" { 48 v, err := strconv.Atoi(l) 49 if err != nil { 50 return 0, 0, &echo.HTTPError{ 51 Code: 400, 52 Message: fmt.Sprintf("invalid value for 'count': %s", err), 53 } 54 } 55 56 limit = v 57 } 58 59 if limit > 100 { 60 limit = 100 61 } 62 if limit < 0 { 63 limit = 0 64 } 65 return offset, limit, nil 66} 67 68func (s *Server) handleSearchPostsSkeleton(e echo.Context) error { 69 ctx, span := tracer.Start(e.Request().Context(), "handleSearchPostsSkeleton") 70 defer span.End() 71 72 span.SetAttributes(attribute.String("query", e.QueryParam("q"))) 73 74 q := strings.TrimSpace(e.QueryParam("q")) 75 if q == "" { 76 return e.JSON(400, map[string]any{ 77 "error": "must pass non-empty search query", 78 }) 79 } 80 81 params := PostSearchParams{ 82 Query: q, 83 // TODO: parse/validate the sort options here? 84 Sort: e.QueryParam("sort"), 85 Domain: e.QueryParam("domain"), 86 URL: e.QueryParam("url"), 87 } 88 89 viewerStr := e.QueryParam("viewer") 90 if viewerStr != "" { 91 d, err := syntax.ParseDID(viewerStr) 92 if err != nil { 93 return e.JSON(400, map[string]any{ 94 "error": "BadRequest", 95 "message": fmt.Sprintf("invalid DID for 'viewer': %s", err), 96 }) 97 } 98 params.Viewer = &d 99 } 100 authorStr := e.QueryParam("author") 101 if authorStr != "" { 102 atid, err := syntax.ParseAtIdentifier(authorStr) 103 if err != nil { 104 return &echo.HTTPError{ 105 Code: 400, 106 Message: fmt.Sprintf("invalid DID for 'author': %s", err), 107 } 108 } 109 if atid.IsHandle() { 110 ident, err := s.dir.Lookup(e.Request().Context(), *atid) 111 if err != nil { 112 return e.JSON(400, map[string]any{ 113 "error": "BadRequest", 114 "message": fmt.Sprintf("invalid Handle for 'author': %s", err), 115 }) 116 } 117 params.Author = &ident.DID 118 } else { 119 d, err := atid.AsDID() 120 if err != nil { 121 return err 122 } 123 params.Author = &d 124 } 125 } 126 127 mentionsStr := e.QueryParam("mentions") 128 if mentionsStr != "" { 129 atid, err := syntax.ParseAtIdentifier(mentionsStr) 130 if err != nil { 131 return &echo.HTTPError{ 132 Code: 400, 133 Message: fmt.Sprintf("invalid DID for 'mentions': %s", err), 134 } 135 } 136 if atid.IsHandle() { 137 ident, err := s.dir.Lookup(e.Request().Context(), *atid) 138 if err != nil { 139 return e.JSON(400, map[string]any{ 140 "error": "BadRequest", 141 "message": fmt.Sprintf("invalid Handle for 'mentions': %s", err), 142 }) 143 } 144 params.Mentions = &ident.DID 145 } else { 146 d, err := atid.AsDID() 147 if err != nil { 148 return err 149 } 150 params.Mentions = &d 151 } 152 } 153 154 sinceStr := e.QueryParam("since") 155 if sinceStr != "" { 156 dt, err := syntax.ParseDatetime(sinceStr) 157 if err != nil { 158 return e.JSON(400, map[string]any{ 159 "error": "BadRequest", 160 "message": fmt.Sprintf("invalid Datetime for 'since': %s", err), 161 }) 162 } 163 params.Since = &dt 164 } 165 166 untilStr := e.QueryParam("until") 167 if untilStr != "" { 168 dt, err := syntax.ParseDatetime(untilStr) 169 if err != nil { 170 return e.JSON(400, map[string]any{ 171 "error": "BadRequest", 172 "message": fmt.Sprintf("invalid Datetime for 'until': %s", err), 173 }) 174 } 175 params.Until = &dt 176 } 177 178 langStr := e.QueryParam("lang") 179 if langStr != "" { 180 l, err := syntax.ParseLanguage(langStr) 181 if err != nil { 182 return e.JSON(400, map[string]any{ 183 "error": "BadRequest", 184 "message": fmt.Sprintf("invalid Language for 'lang': %s", err), 185 }) 186 } 187 params.Lang = &l 188 } 189 // TODO: could be multiple tag params; guess we should "bind"? 190 tags := e.Request().URL.Query()["tags"] 191 if len(tags) > 0 { 192 params.Tags = tags 193 } 194 195 offset, limit, err := parseCursorLimit(e) 196 if err != nil { 197 span.SetAttributes(attribute.String("error", fmt.Sprintf("invalid cursor/limit: %s", err))) 198 span.SetStatus(codes.Error, err.Error()) 199 return err 200 } 201 202 params.Offset = offset 203 params.Size = limit 204 span.SetAttributes(attribute.Int("offset", offset), attribute.Int("limit", limit)) 205 206 out, err := s.SearchPosts(ctx, &params) 207 if err != nil { 208 span.SetAttributes(attribute.String("error", fmt.Sprintf("failed to SearchPosts: %s", err))) 209 span.SetStatus(codes.Error, err.Error()) 210 return err 211 } 212 213 span.SetAttributes(attribute.Int("posts.length", len(out.Posts))) 214 215 return e.JSON(200, out) 216} 217 218func (s *Server) handleSearchActorsSkeleton(e echo.Context) error { 219 ctx, span := tracer.Start(e.Request().Context(), "handleSearchActorsSkeleton") 220 defer span.End() 221 222 span.SetAttributes(attribute.String("query", e.QueryParam("q"))) 223 224 q := strings.TrimSpace(e.QueryParam("q")) 225 if q == "" { 226 return e.JSON(400, map[string]any{ 227 "error": "BadRequest", 228 "message": "must pass non-empty search query", 229 }) 230 } 231 232 offset, limit, err := parseCursorLimit(e) 233 if err != nil { 234 span.SetAttributes(attribute.String("error", fmt.Sprintf("invalid cursor/limit: %s", err))) 235 span.SetStatus(codes.Error, err.Error()) 236 return err 237 } 238 239 typeahead := false 240 if q := strings.TrimSpace(e.QueryParam("typeahead")); q == "true" || q == "1" || q == "y" { 241 typeahead = true 242 } 243 244 params := ActorSearchParams{ 245 Query: q, 246 Typeahead: typeahead, 247 Offset: offset, 248 Size: limit, 249 } 250 251 viewerStr := e.QueryParam("viewer") 252 if viewerStr != "" { 253 d, err := syntax.ParseDID(viewerStr) 254 if err != nil { 255 return e.JSON(400, map[string]any{ 256 "error": "BadRequest", 257 "message": fmt.Sprintf("invalid DID for 'viewer': %s", err), 258 }) 259 } 260 params.Viewer = &d 261 } 262 263 span.SetAttributes( 264 attribute.Int("offset", offset), 265 attribute.Int("limit", limit), 266 attribute.Bool("typeahead", typeahead), 267 ) 268 269 out, err := s.SearchProfiles(ctx, &params) 270 if err != nil { 271 span.SetAttributes(attribute.String("error", fmt.Sprintf("failed to SearchProfiles: %s", err))) 272 span.SetStatus(codes.Error, err.Error()) 273 return err 274 } 275 276 span.SetAttributes(attribute.Int("actors.length", len(out.Actors))) 277 278 return e.JSON(200, out) 279} 280 281func (s *Server) SearchPosts(ctx context.Context, params *PostSearchParams) (*appbsky.UnspeccedSearchPostsSkeleton_Output, error) { 282 ctx, span := tracer.Start(ctx, "SearchPosts") 283 defer span.End() 284 285 resp, err := DoSearchPosts(ctx, s.dir, s.escli, s.postIndex, params) 286 if err != nil { 287 return nil, err 288 } 289 290 posts := []*appbsky.UnspeccedDefs_SkeletonSearchPost{} 291 for _, r := range resp.Hits.Hits { 292 var doc PostDoc 293 if err := json.Unmarshal(r.Source, &doc); err != nil { 294 return nil, fmt.Errorf("decoding post doc from search response: %w", err) 295 } 296 297 did, err := syntax.ParseDID(doc.DID) 298 if err != nil { 299 return nil, fmt.Errorf("invalid DID in indexed document: %w", err) 300 } 301 302 posts = append(posts, &appbsky.UnspeccedDefs_SkeletonSearchPost{ 303 Uri: fmt.Sprintf("at://%s/app.bsky.feed.post/%s", did, doc.RecordRkey), 304 }) 305 } 306 307 out := appbsky.UnspeccedSearchPostsSkeleton_Output{Posts: posts} 308 if len(posts) == params.Size && (params.Offset+params.Size) < 10000 { 309 s := fmt.Sprintf("%d", params.Offset+params.Size) 310 out.Cursor = &s 311 } 312 if resp.Hits.Total.Relation == "eq" { 313 i := int64(resp.Hits.Total.Value) 314 out.HitsTotal = &i 315 } 316 return &out, nil 317} 318 319func (s *Server) SearchProfiles(ctx context.Context, params *ActorSearchParams) (*appbsky.UnspeccedSearchActorsSkeleton_Output, error) { 320 ctx, span := tracer.Start(ctx, "SearchProfiles") 321 defer span.End() 322 span.SetAttributes( 323 attribute.String("query", params.Query), 324 attribute.Bool("typeahead", params.Typeahead), 325 attribute.Int("offset", params.Offset), 326 attribute.Int("size", params.Size), 327 ) 328 329 var globalResp *EsSearchResponse 330 var personalizedResp *EsSearchResponse 331 var globalErr error 332 var personalizedErr error 333 334 wg := sync.WaitGroup{} 335 336 wg.Add(1) 337 // Conduct the global search 338 go func(myQ ActorSearchParams) { 339 defer wg.Done() 340 // Clear out the following list to conduct the global search 341 myQ.Follows = nil 342 343 if myQ.Typeahead { 344 globalResp, globalErr = DoSearchProfilesTypeahead(ctx, s.escli, s.profileIndex, &myQ) 345 } else { 346 globalResp, globalErr = DoSearchProfiles(ctx, s.dir, s.escli, s.profileIndex, &myQ) 347 } 348 }(*params) 349 350 // If we have a following list, conduct a second search to filter the results 351 if len(params.Follows) > 0 { 352 wg.Add(1) 353 go func(myQ ActorSearchParams) { 354 defer wg.Done() 355 if myQ.Typeahead { 356 personalizedResp, personalizedErr = DoSearchProfilesTypeahead(ctx, s.escli, s.profileIndex, &myQ) 357 } else { 358 personalizedResp, personalizedErr = DoSearchProfiles(ctx, s.dir, s.escli, s.profileIndex, &myQ) 359 } 360 }(*params) 361 } 362 363 wg.Wait() 364 365 if globalErr != nil { 366 return nil, globalErr 367 } 368 369 if len(params.Follows) > 0 { 370 if personalizedErr != nil { 371 return nil, personalizedErr 372 } 373 374 followingBoost := 0.1 375 376 // Insert the personalized results into the global results, deduping as we go and maintaining score-order 377 followingSeen := map[string]struct{}{} 378 for _, r := range personalizedResp.Hits.Hits { 379 var doc ProfileDoc 380 if err := json.Unmarshal(r.Source, &doc); err != nil { 381 return nil, fmt.Errorf("decoding profile doc from search response: %w", err) 382 } 383 384 did, err := syntax.ParseDID(doc.DID) 385 if err != nil { 386 return nil, fmt.Errorf("invalid DID in indexed document: %w", err) 387 } 388 389 if _, ok := followingSeen[did.String()]; ok { 390 continue 391 } 392 393 followingSeen[did.String()] = struct{}{} 394 395 // Insert the profile into the global results 396 globalResp.Hits.Hits = append(globalResp.Hits.Hits, r) 397 } 398 399 // Walk the combined results and boost the scores of the personalized results and dedupe 400 seen := map[string]struct{}{} 401 deduped := []EsSearchHit{} 402 for _, r := range globalResp.Hits.Hits { 403 var doc ProfileDoc 404 if err := json.Unmarshal(r.Source, &doc); err != nil { 405 return nil, fmt.Errorf("decoding profile doc from search response: %w", err) 406 } 407 408 did, err := syntax.ParseDID(doc.DID) 409 if err != nil { 410 return nil, fmt.Errorf("invalid DID in indexed document: %w", err) 411 } 412 413 // Boost the score of the personalized results 414 if _, ok := followingSeen[did.String()]; ok { 415 r.Score += followingBoost 416 } 417 418 // Dedupe the results 419 if _, ok := seen[did.String()]; ok { 420 continue 421 } 422 423 seen[did.String()] = struct{}{} 424 deduped = append(deduped, r) 425 } 426 427 // Sort the results by score 428 slices.SortFunc(deduped, func(a, b EsSearchHit) int { 429 if a.Score < b.Score { 430 return 1 431 } 432 if a.Score > b.Score { 433 return -1 434 } 435 return 0 436 }) 437 438 // Trim the results to the requested size 439 if len(deduped) > params.Size { 440 deduped = deduped[:params.Size] 441 } 442 443 globalResp.Hits.Hits = deduped 444 } 445 446 actors := []*appbsky.UnspeccedDefs_SkeletonSearchActor{} 447 for _, r := range globalResp.Hits.Hits { 448 var doc ProfileDoc 449 if err := json.Unmarshal(r.Source, &doc); err != nil { 450 return nil, fmt.Errorf("decoding profile doc from search response: %w", err) 451 } 452 453 did, err := syntax.ParseDID(doc.DID) 454 if err != nil { 455 return nil, fmt.Errorf("invalid DID in indexed document: %w", err) 456 } 457 458 actors = append(actors, &appbsky.UnspeccedDefs_SkeletonSearchActor{ 459 Did: did.String(), 460 }) 461 } 462 463 out := appbsky.UnspeccedSearchActorsSkeleton_Output{Actors: actors} 464 if len(actors) == params.Size && (params.Offset+params.Size) < 10000 { 465 s := fmt.Sprintf("%d", params.Offset+params.Size) 466 out.Cursor = &s 467 } 468 if globalResp.Hits.Total.Relation == "eq" { 469 i := int64(globalResp.Hits.Total.Value) 470 out.HitsTotal = &i 471 } 472 return &out, nil 473}