That fuck shit the fascists are using
1package org.tm.archive.database;
2
3import android.app.Application;
4
5import androidx.annotation.NonNull;
6import androidx.annotation.VisibleForTesting;
7
8import org.jetbrains.annotations.NotNull;
9import org.signal.core.util.concurrent.SignalExecutors;
10import org.tm.archive.database.model.MessageId;
11import org.tm.archive.recipients.Recipient;
12import org.tm.archive.recipients.RecipientId;
13import org.tm.archive.service.webrtc.links.CallLinkRoomId;
14import org.tm.archive.util.concurrent.SerialExecutor;
15
16import java.util.Collection;
17import java.util.HashMap;
18import java.util.HashSet;
19import java.util.Map;
20import java.util.Set;
21import java.util.UUID;
22import java.util.concurrent.CountDownLatch;
23import java.util.concurrent.Executor;
24
25/**
26 * Allows listening to database changes to varying degrees of specificity.
27 *
28 * A replacement for the observer system in {@link DatabaseTable}. We should move to this over time.
29 */
30public class DatabaseObserver {
31
32 private static final String KEY_CONVERSATION = "Conversation:";
33 private static final String KEY_VERBOSE_CONVERSATION = "VerboseConversation:";
34 private static final String KEY_CONVERSATION_LIST = "ConversationList";
35 private static final String KEY_PAYMENT = "Payment:";
36 private static final String KEY_ALL_PAYMENTS = "AllPayments";
37 private static final String KEY_CHAT_COLORS = "ChatColors";
38 private static final String KEY_STICKERS = "Stickers";
39 private static final String KEY_STICKER_PACKS = "StickerPacks";
40 private static final String KEY_ATTACHMENTS = "Attachments";
41 private static final String KEY_MESSAGE_UPDATE = "MessageUpdate:";
42 private static final String KEY_MESSAGE_INSERT = "MessageInsert:";
43 private static final String KEY_NOTIFICATION_PROFILES = "NotificationProfiles";
44 private static final String KEY_RECIPIENT = "Recipient";
45 private static final String KEY_STORY_OBSERVER = "Story";
46 private static final String KEY_SCHEDULED_MESSAGES = "ScheduledMessages";
47 private static final String KEY_CONVERSATION_DELETES = "ConversationDeletes";
48
49 private static final String KEY_CALL_UPDATES = "CallUpdates";
50 private static final String KEY_CALL_LINK_UPDATES = "CallLinkUpdates";
51
52 private final Application application;
53 private final Executor executor;
54
55 private final Set<Observer> conversationListObservers;
56 private final Map<Long, Set<Observer>> conversationObservers;
57 private final Map<Long, Set<Observer>> verboseConversationObservers;
58 private final Map<Long, Set<Observer>> conversationDeleteObservers;
59 private final Map<UUID, Set<Observer>> paymentObservers;
60 private final Map<Long, Set<Observer>> scheduledMessageObservers;
61 private final Set<Observer> allPaymentsObservers;
62 private final Set<Observer> chatColorsObservers;
63 private final Set<Observer> stickerObservers;
64 private final Set<Observer> stickerPackObservers;
65 private final Set<Observer> attachmentObservers;
66 private final Set<MessageObserver> messageUpdateObservers;
67 private final Map<Long, Set<MessageObserver>> messageInsertObservers;
68 private final Set<Observer> notificationProfileObservers;
69 private final Map<RecipientId, Set<Observer>> storyObservers;
70 private final Set<Observer> callUpdateObservers;
71 private final Map<CallLinkRoomId, Set<Observer>> callLinkObservers;
72
73 public DatabaseObserver(Application application) {
74 this.application = application;
75 this.executor = new SerialExecutor(SignalExecutors.BOUNDED);
76 this.conversationListObservers = new HashSet<>();
77 this.conversationObservers = new HashMap<>();
78 this.verboseConversationObservers = new HashMap<>();
79 this.conversationDeleteObservers = new HashMap<>();
80 this.paymentObservers = new HashMap<>();
81 this.allPaymentsObservers = new HashSet<>();
82 this.chatColorsObservers = new HashSet<>();
83 this.stickerObservers = new HashSet<>();
84 this.stickerPackObservers = new HashSet<>();
85 this.attachmentObservers = new HashSet<>();
86 this.messageUpdateObservers = new HashSet<>();
87 this.messageInsertObservers = new HashMap<>();
88 this.notificationProfileObservers = new HashSet<>();
89 this.storyObservers = new HashMap<>();
90 this.scheduledMessageObservers = new HashMap<>();
91 this.callUpdateObservers = new HashSet<>();
92 this.callLinkObservers = new HashMap<>();
93 }
94
95 public void registerConversationListObserver(@NonNull Observer listener) {
96 executor.execute(() -> {
97 conversationListObservers.add(listener);
98 });
99 }
100
101 public void registerConversationObserver(long threadId, @NonNull Observer listener) {
102 executor.execute(() -> {
103 registerMapped(conversationObservers, threadId, listener);
104 });
105 }
106
107 public void registerVerboseConversationObserver(long threadId, @NonNull Observer listener) {
108 executor.execute(() -> {
109 registerMapped(verboseConversationObservers, threadId, listener);
110 });
111 }
112
113 public void registerConversationDeleteObserver(long threadId, @NonNull Observer listener) {
114 executor.execute(() -> {
115 registerMapped(conversationDeleteObservers, threadId, listener);
116 });
117 }
118
119 public void registerPaymentObserver(@NonNull UUID paymentId, @NonNull Observer listener) {
120 executor.execute(() -> {
121 registerMapped(paymentObservers, paymentId, listener);
122 });
123 }
124
125 public void registerAllPaymentsObserver(@NonNull Observer listener) {
126 executor.execute(() -> {
127 allPaymentsObservers.add(listener);
128 });
129 }
130
131 public void registerChatColorsObserver(@NonNull Observer listener) {
132 executor.execute(() -> {
133 chatColorsObservers.add(listener);
134 });
135 }
136
137 public void registerStickerObserver(@NonNull Observer listener) {
138 executor.execute(() -> {
139 stickerObservers.add(listener);
140 });
141 }
142
143 public void registerStickerPackObserver(@NonNull Observer listener) {
144 executor.execute(() -> {
145 stickerPackObservers.add(listener);
146 });
147 }
148
149 public void registerAttachmentObserver(@NonNull Observer listener) {
150 executor.execute(() -> {
151 attachmentObservers.add(listener);
152 });
153 }
154
155 public void registerMessageUpdateObserver(@NonNull MessageObserver listener) {
156 executor.execute(() -> {
157 messageUpdateObservers.add(listener);
158 });
159 }
160
161 public void registerMessageInsertObserver(long threadId, @NonNull MessageObserver listener) {
162 executor.execute(() -> {
163 registerMapped(messageInsertObservers, threadId, listener);
164 });
165 }
166
167 public void registerNotificationProfileObserver(@NotNull Observer listener) {
168 executor.execute(() -> {
169 notificationProfileObservers.add(listener);
170 });
171 }
172
173 /**
174 * Adds an observer which will be notified whenever a new Story message is inserted into the database.
175 */
176 public void registerStoryObserver(@NonNull RecipientId recipientId, @NonNull Observer listener) {
177 executor.execute(() -> {
178 registerMapped(storyObservers, recipientId, listener);
179 });
180 }
181
182 public void registerScheduledMessageObserver(long threadId, @NonNull Observer listener) {
183 executor.execute(() -> {
184 registerMapped(scheduledMessageObservers, threadId, listener);
185 });
186 }
187
188 public void registerCallUpdateObserver(@NonNull Observer observer) {
189 executor.execute(() -> callUpdateObservers.add(observer));
190 }
191
192 public void registerCallLinkObserver(@NonNull CallLinkRoomId callLinkRoomId, @NonNull Observer observer) {
193 executor.execute(() -> {
194 registerMapped(callLinkObservers, callLinkRoomId, observer);
195 });
196 }
197
198 public void unregisterObserver(@NonNull Observer listener) {
199 executor.execute(() -> {
200 conversationListObservers.remove(listener);
201 unregisterMapped(conversationObservers, listener);
202 unregisterMapped(verboseConversationObservers, listener);
203 unregisterMapped(paymentObservers, listener);
204 chatColorsObservers.remove(listener);
205 stickerObservers.remove(listener);
206 stickerPackObservers.remove(listener);
207 attachmentObservers.remove(listener);
208 notificationProfileObservers.remove(listener);
209 unregisterMapped(storyObservers, listener);
210 unregisterMapped(scheduledMessageObservers, listener);
211 unregisterMapped(conversationDeleteObservers, listener);
212 callUpdateObservers.remove(listener);
213 unregisterMapped(callLinkObservers, listener);
214 });
215 }
216
217 public void unregisterObserver(@NonNull MessageObserver listener) {
218 executor.execute(() -> {
219 messageUpdateObservers.remove(listener);
220 unregisterMapped(messageInsertObservers, listener);
221 });
222 }
223
224 public void notifyConversationListeners(Set<Long> threadIds) {
225 for (long threadId : threadIds) {
226 notifyConversationListeners(threadId);
227 }
228 }
229
230 public void notifyConversationListeners(long threadId) {
231 runPostSuccessfulTransaction(KEY_CONVERSATION + threadId, () -> {
232 notifyMapped(conversationObservers, threadId);
233 notifyMapped(verboseConversationObservers, threadId);
234 });
235 }
236
237 public void notifyVerboseConversationListeners(Set<Long> threadIds) {
238 for (long threadId : threadIds) {
239 runPostSuccessfulTransaction(KEY_VERBOSE_CONVERSATION + threadId, () -> {
240 notifyMapped(verboseConversationObservers, threadId);
241 });
242 }
243 }
244
245 public void notifyConversationDeleteListeners(Set<Long> threadIds) {
246 for (long threadId : threadIds) {
247 notifyConversationDeleteListeners(threadId);
248 }
249 }
250
251 public void notifyConversationDeleteListeners(long threadId) {
252 runPostSuccessfulTransaction(KEY_CONVERSATION_DELETES + threadId, () -> {
253 notifyMapped(conversationDeleteObservers, threadId);
254 });
255 }
256
257 public void notifyConversationListListeners() {
258 runPostSuccessfulTransaction(KEY_CONVERSATION_LIST, () -> {
259 for (Observer listener : conversationListObservers) {
260 listener.onChanged();
261 }
262 });
263 }
264
265 public void notifyPaymentListeners(@NonNull UUID paymentId) {
266 runPostSuccessfulTransaction(KEY_PAYMENT + paymentId.toString(), () -> {
267 notifyMapped(paymentObservers, paymentId);
268 });
269 }
270
271 public void notifyAllPaymentsListeners() {
272 runPostSuccessfulTransaction(KEY_ALL_PAYMENTS, () -> {
273 notifySet(allPaymentsObservers);
274 });
275 }
276
277 public void notifyChatColorsListeners() {
278 runPostSuccessfulTransaction(KEY_CHAT_COLORS, () -> {
279 for (Observer chatColorsObserver : chatColorsObservers) {
280 chatColorsObserver.onChanged();
281 }
282 });
283 }
284
285 public void notifyStickerObservers() {
286 runPostSuccessfulTransaction(KEY_STICKERS, () -> {
287 notifySet(stickerObservers);
288 });
289 }
290
291 public void notifyStickerPackObservers() {
292 runPostSuccessfulTransaction(KEY_STICKER_PACKS, () -> {
293 notifySet(stickerPackObservers);
294 });
295 }
296
297 public void notifyAttachmentObservers() {
298 runPostSuccessfulTransaction(KEY_ATTACHMENTS, () -> {
299 notifySet(attachmentObservers);
300 });
301 }
302
303 public void notifyMessageUpdateObservers(@NonNull MessageId messageId) {
304 runPostSuccessfulTransaction(KEY_MESSAGE_UPDATE + messageId.toString(), () -> {
305 messageUpdateObservers.stream().forEach(l -> l.onMessageChanged(messageId));
306 });
307 }
308
309 public void notifyMessageInsertObservers(long threadId, @NonNull MessageId messageId) {
310 runPostSuccessfulTransaction(KEY_MESSAGE_INSERT + messageId, () -> {
311 Set<MessageObserver> listeners = messageInsertObservers.get(threadId);
312
313 if (listeners != null) {
314 listeners.stream().forEach(l -> l.onMessageChanged(messageId));
315 }
316 });
317 }
318
319 public void notifyNotificationProfileObservers() {
320 runPostSuccessfulTransaction(KEY_NOTIFICATION_PROFILES, () -> {
321 notifySet(notificationProfileObservers);
322 });
323 }
324
325 public void notifyRecipientChanged(@NonNull RecipientId recipientId) {
326 SignalDatabase.runPostSuccessfulTransaction(KEY_RECIPIENT + recipientId.serialize(), () -> {
327 Recipient.live(recipientId).refresh();
328 });
329 }
330
331 public void notifyStoryObservers(@NonNull RecipientId recipientId) {
332 runPostSuccessfulTransaction(KEY_STORY_OBSERVER, () -> {
333 notifyMapped(storyObservers, recipientId);
334 });
335 }
336
337 public void notifyStoryObservers(@NonNull Collection<RecipientId> recipientIds) {
338 for (RecipientId recipientId : recipientIds) {
339 runPostSuccessfulTransaction(KEY_STORY_OBSERVER, () -> {
340 notifyMapped(storyObservers, recipientId);
341 });
342 }
343 }
344
345 public void notifyScheduledMessageObservers(long threadId) {
346 runPostSuccessfulTransaction(KEY_SCHEDULED_MESSAGES + threadId, () -> {
347 notifyMapped(scheduledMessageObservers, threadId);
348 });
349 }
350
351 public void notifyCallUpdateObservers() {
352 runPostSuccessfulTransaction(KEY_CALL_UPDATES, () -> notifySet(callUpdateObservers));
353 }
354
355 public void notifyCallLinkObservers(@NonNull CallLinkRoomId callLinkRoomId) {
356 runPostSuccessfulTransaction(KEY_CALL_LINK_UPDATES, () -> notifyMapped(callLinkObservers, callLinkRoomId));
357 }
358
359 private void runPostSuccessfulTransaction(@NonNull String dedupeKey, @NonNull Runnable runnable) {
360 SignalDatabase.runPostSuccessfulTransaction(dedupeKey, () -> {
361 executor.execute(runnable);
362 });
363 }
364
365 private <K, V> void registerMapped(@NonNull Map<K, Set<V>> map, @NonNull K key, @NonNull V listener) {
366 Set<V> listeners = map.get(key);
367
368 if (listeners == null) {
369 listeners = new HashSet<>();
370 }
371
372 listeners.add(listener);
373 map.put(key, listeners);
374 }
375
376 private <K, V> void unregisterMapped(@NonNull Map<K, Set<V>> map, @NonNull V listener) {
377 for (Map.Entry<K, Set<V>> entry : map.entrySet()) {
378 entry.getValue().remove(listener);
379 }
380 }
381
382 private static <K> void notifyMapped(@NonNull Map<K, Set<Observer>> map, @NonNull K key) {
383 Set<Observer> listeners = map.get(key);
384
385 if (listeners != null) {
386 for (Observer listener : listeners) {
387 listener.onChanged();
388 }
389 }
390 }
391
392 private static void notifySet(@NonNull Set<Observer> set) {
393 for (final Observer observer : set) {
394 observer.onChanged();
395 }
396 }
397
398 /**
399 * Blocks until the executor is empty. Only intended to be used for testing.
400 */
401 @VisibleForTesting
402 void flush() {
403 CountDownLatch latch = new CountDownLatch(1);
404 executor.execute(latch::countDown);
405
406 try {
407 latch.await();
408 } catch (InterruptedException e) {
409 throw new AssertionError();
410 }
411 }
412
413 public interface Observer {
414 /**
415 * Called when the relevant data changes. Executed on a serial executor, so don't do any
416 * long-running tasks!
417 */
418 void onChanged();
419 }
420
421 public interface MessageObserver {
422 void onMessageChanged(@NonNull MessageId messageId);
423 }
424}