1<?php
2
3// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the GNU Affero General Public License v3.0.
4// See the LICENCE file in the repository root for full licence text.
5
6declare(strict_types=1);
7
8namespace App\Console\Commands;
9
10use App\Exceptions\InvariantException;
11use App\Libraries\Search\ScoreSearch;
12use App\Models\Solo\Score;
13use Ds\Set;
14use Illuminate\Console\Command;
15use Illuminate\Database\Eloquent\Builder;
16use Illuminate\Database\Eloquent\Collection;
17use Symfony\Component\Console\Helper\ProgressBar;
18
19class EsIndexScoresQueue extends Command
20{
21 /**
22 * The name and signature of the console command.
23 *
24 * @var string
25 */
26 protected $signature = 'es:index-scores:queue
27 {--a|all : Queue all the scores in the database}
28 {--from= : Queue all the scores after (but not including) the specified id}
29 {--ids= : Queue specified comma-separated list of score ids}
30 {--schema= : Index schema to queue the scores to. Will use active schemas set in redis if not specified}
31 {--user= : Filter scores by user id}
32 ';
33
34 /**
35 * The console command description.
36 *
37 * @var string
38 */
39 protected $description = 'Queue scores to be indexed into Elasticsearch.';
40
41 private ProgressBar $bar;
42 private array $ids;
43 private array $schemas;
44 private ScoreSearch $search;
45 private int $total;
46 private Builder $query;
47
48 /**
49 * Execute the console command.
50 *
51 * @return mixed
52 */
53 public function handle()
54 {
55 $this->search = new ScoreSearch();
56
57 $this->parseOptions();
58
59 if (!$this->confirm('This will queue scores for indexing to schema '.implode(', ', $this->schemas).', continue?', true)) {
60 return $this->info('User aborted');
61 }
62
63 $startTimeNs = hrtime(true);
64
65 $this->bar = $this->output->createProgressBar();
66 $this->bar->start();
67 $this->total = 0;
68
69 if (isset($this->ids)) {
70 $this->queueIds($this->ids);
71 }
72
73 if (isset($this->query)) {
74 $this->query->chunkById(100, function (Collection $scores): void {
75 $this->queueIds(array_map(fn (Score $score): int => $score->getKey(), $scores->all()));
76 });
77 }
78
79 $this->bar->finish();
80 $this->line('');
81 $totalTime = (int) ((hrtime(true) - $startTimeNs) / 1000000000);
82 $this->info("Queued {$this->total} scores in {$totalTime}s");
83 }
84
85 private function parseOptions(): void
86 {
87 $query = Score::select('id');
88 $userId = get_int($this->option('user'));
89 if ($userId !== null) {
90 $query->where('user_id', $userId);
91 }
92
93 $doneParsingId = false;
94
95 $ids = $this->parseOptionIds();
96 if ($ids->count() > 0) {
97 $doneParsingId = true;
98 if ($userId === null) {
99 $this->ids = $ids->toArray();
100 } else {
101 $this->query = $query->whereKey($ids->toArray());
102 }
103 }
104
105 $from = get_int($this->option('from'));
106 if ($from !== null) {
107 if ($doneParsingId) {
108 throw new InvariantException('only one of the id parameters may be specified');
109 }
110 $doneParsingId = true;
111 $this->query = $query->where('id', '>', $from);
112 }
113
114 if ($this->option('all')) {
115 if ($doneParsingId) {
116 throw new InvariantException('only one of the id parameters may be specified');
117 }
118 $doneParsingId = true;
119 $this->query = $query;
120 }
121
122 if (!$doneParsingId) {
123 throw new InvariantException('id parameter must be specified');
124 }
125
126 $schema = presence($this->option('schema'));
127 if ($schema === null) {
128 $this->schemas = $this->search->getActiveSchemas();
129
130 if (count($this->schemas) === 0) {
131 throw new InvariantException('Index schema is not specified and there is no active schemas');
132 }
133 } else {
134 $this->schemas = [$schema];
135 }
136 }
137
138 private function parseOptionIds(): Set
139 {
140 $ret = new Set();
141 $ids = $this->option('ids');
142 $ids = is_array($ids) ? $ids : explode(',', get_string($ids) ?? '');
143
144 foreach ($ids as $idString) {
145 $id = get_int($idString);
146
147 if ($id !== null) {
148 $ret->add($id);
149 }
150 }
151
152 return $ret;
153 }
154
155 private function queueIds(array $ids): void
156 {
157 $this->search->queueForIndex($this->schemas, $ids);
158
159 $this->bar->setProgress(array_last($ids) ?? 0);
160 $this->total += count($ids);
161 }
162}