That fuck shit the fascists are using
at master 424 lines 15 kB view raw
1package org.tm.archive.database; 2 3import android.app.Application; 4 5import androidx.annotation.NonNull; 6import androidx.annotation.VisibleForTesting; 7 8import org.jetbrains.annotations.NotNull; 9import org.signal.core.util.concurrent.SignalExecutors; 10import org.tm.archive.database.model.MessageId; 11import org.tm.archive.recipients.Recipient; 12import org.tm.archive.recipients.RecipientId; 13import org.tm.archive.service.webrtc.links.CallLinkRoomId; 14import org.tm.archive.util.concurrent.SerialExecutor; 15 16import java.util.Collection; 17import java.util.HashMap; 18import java.util.HashSet; 19import java.util.Map; 20import java.util.Set; 21import java.util.UUID; 22import java.util.concurrent.CountDownLatch; 23import java.util.concurrent.Executor; 24 25/** 26 * Allows listening to database changes to varying degrees of specificity. 27 * 28 * A replacement for the observer system in {@link DatabaseTable}. We should move to this over time. 29 */ 30public class DatabaseObserver { 31 32 private static final String KEY_CONVERSATION = "Conversation:"; 33 private static final String KEY_VERBOSE_CONVERSATION = "VerboseConversation:"; 34 private static final String KEY_CONVERSATION_LIST = "ConversationList"; 35 private static final String KEY_PAYMENT = "Payment:"; 36 private static final String KEY_ALL_PAYMENTS = "AllPayments"; 37 private static final String KEY_CHAT_COLORS = "ChatColors"; 38 private static final String KEY_STICKERS = "Stickers"; 39 private static final String KEY_STICKER_PACKS = "StickerPacks"; 40 private static final String KEY_ATTACHMENTS = "Attachments"; 41 private static final String KEY_MESSAGE_UPDATE = "MessageUpdate:"; 42 private static final String KEY_MESSAGE_INSERT = "MessageInsert:"; 43 private static final String KEY_NOTIFICATION_PROFILES = "NotificationProfiles"; 44 private static final String KEY_RECIPIENT = "Recipient"; 45 private static final String KEY_STORY_OBSERVER = "Story"; 46 private static final String KEY_SCHEDULED_MESSAGES = "ScheduledMessages"; 47 private static final String KEY_CONVERSATION_DELETES = "ConversationDeletes"; 48 49 private static final String KEY_CALL_UPDATES = "CallUpdates"; 50 private static final String KEY_CALL_LINK_UPDATES = "CallLinkUpdates"; 51 52 private final Application application; 53 private final Executor executor; 54 55 private final Set<Observer> conversationListObservers; 56 private final Map<Long, Set<Observer>> conversationObservers; 57 private final Map<Long, Set<Observer>> verboseConversationObservers; 58 private final Map<Long, Set<Observer>> conversationDeleteObservers; 59 private final Map<UUID, Set<Observer>> paymentObservers; 60 private final Map<Long, Set<Observer>> scheduledMessageObservers; 61 private final Set<Observer> allPaymentsObservers; 62 private final Set<Observer> chatColorsObservers; 63 private final Set<Observer> stickerObservers; 64 private final Set<Observer> stickerPackObservers; 65 private final Set<Observer> attachmentObservers; 66 private final Set<MessageObserver> messageUpdateObservers; 67 private final Map<Long, Set<MessageObserver>> messageInsertObservers; 68 private final Set<Observer> notificationProfileObservers; 69 private final Map<RecipientId, Set<Observer>> storyObservers; 70 private final Set<Observer> callUpdateObservers; 71 private final Map<CallLinkRoomId, Set<Observer>> callLinkObservers; 72 73 public DatabaseObserver(Application application) { 74 this.application = application; 75 this.executor = new SerialExecutor(SignalExecutors.BOUNDED); 76 this.conversationListObservers = new HashSet<>(); 77 this.conversationObservers = new HashMap<>(); 78 this.verboseConversationObservers = new HashMap<>(); 79 this.conversationDeleteObservers = new HashMap<>(); 80 this.paymentObservers = new HashMap<>(); 81 this.allPaymentsObservers = new HashSet<>(); 82 this.chatColorsObservers = new HashSet<>(); 83 this.stickerObservers = new HashSet<>(); 84 this.stickerPackObservers = new HashSet<>(); 85 this.attachmentObservers = new HashSet<>(); 86 this.messageUpdateObservers = new HashSet<>(); 87 this.messageInsertObservers = new HashMap<>(); 88 this.notificationProfileObservers = new HashSet<>(); 89 this.storyObservers = new HashMap<>(); 90 this.scheduledMessageObservers = new HashMap<>(); 91 this.callUpdateObservers = new HashSet<>(); 92 this.callLinkObservers = new HashMap<>(); 93 } 94 95 public void registerConversationListObserver(@NonNull Observer listener) { 96 executor.execute(() -> { 97 conversationListObservers.add(listener); 98 }); 99 } 100 101 public void registerConversationObserver(long threadId, @NonNull Observer listener) { 102 executor.execute(() -> { 103 registerMapped(conversationObservers, threadId, listener); 104 }); 105 } 106 107 public void registerVerboseConversationObserver(long threadId, @NonNull Observer listener) { 108 executor.execute(() -> { 109 registerMapped(verboseConversationObservers, threadId, listener); 110 }); 111 } 112 113 public void registerConversationDeleteObserver(long threadId, @NonNull Observer listener) { 114 executor.execute(() -> { 115 registerMapped(conversationDeleteObservers, threadId, listener); 116 }); 117 } 118 119 public void registerPaymentObserver(@NonNull UUID paymentId, @NonNull Observer listener) { 120 executor.execute(() -> { 121 registerMapped(paymentObservers, paymentId, listener); 122 }); 123 } 124 125 public void registerAllPaymentsObserver(@NonNull Observer listener) { 126 executor.execute(() -> { 127 allPaymentsObservers.add(listener); 128 }); 129 } 130 131 public void registerChatColorsObserver(@NonNull Observer listener) { 132 executor.execute(() -> { 133 chatColorsObservers.add(listener); 134 }); 135 } 136 137 public void registerStickerObserver(@NonNull Observer listener) { 138 executor.execute(() -> { 139 stickerObservers.add(listener); 140 }); 141 } 142 143 public void registerStickerPackObserver(@NonNull Observer listener) { 144 executor.execute(() -> { 145 stickerPackObservers.add(listener); 146 }); 147 } 148 149 public void registerAttachmentObserver(@NonNull Observer listener) { 150 executor.execute(() -> { 151 attachmentObservers.add(listener); 152 }); 153 } 154 155 public void registerMessageUpdateObserver(@NonNull MessageObserver listener) { 156 executor.execute(() -> { 157 messageUpdateObservers.add(listener); 158 }); 159 } 160 161 public void registerMessageInsertObserver(long threadId, @NonNull MessageObserver listener) { 162 executor.execute(() -> { 163 registerMapped(messageInsertObservers, threadId, listener); 164 }); 165 } 166 167 public void registerNotificationProfileObserver(@NotNull Observer listener) { 168 executor.execute(() -> { 169 notificationProfileObservers.add(listener); 170 }); 171 } 172 173 /** 174 * Adds an observer which will be notified whenever a new Story message is inserted into the database. 175 */ 176 public void registerStoryObserver(@NonNull RecipientId recipientId, @NonNull Observer listener) { 177 executor.execute(() -> { 178 registerMapped(storyObservers, recipientId, listener); 179 }); 180 } 181 182 public void registerScheduledMessageObserver(long threadId, @NonNull Observer listener) { 183 executor.execute(() -> { 184 registerMapped(scheduledMessageObservers, threadId, listener); 185 }); 186 } 187 188 public void registerCallUpdateObserver(@NonNull Observer observer) { 189 executor.execute(() -> callUpdateObservers.add(observer)); 190 } 191 192 public void registerCallLinkObserver(@NonNull CallLinkRoomId callLinkRoomId, @NonNull Observer observer) { 193 executor.execute(() -> { 194 registerMapped(callLinkObservers, callLinkRoomId, observer); 195 }); 196 } 197 198 public void unregisterObserver(@NonNull Observer listener) { 199 executor.execute(() -> { 200 conversationListObservers.remove(listener); 201 unregisterMapped(conversationObservers, listener); 202 unregisterMapped(verboseConversationObservers, listener); 203 unregisterMapped(paymentObservers, listener); 204 chatColorsObservers.remove(listener); 205 stickerObservers.remove(listener); 206 stickerPackObservers.remove(listener); 207 attachmentObservers.remove(listener); 208 notificationProfileObservers.remove(listener); 209 unregisterMapped(storyObservers, listener); 210 unregisterMapped(scheduledMessageObservers, listener); 211 unregisterMapped(conversationDeleteObservers, listener); 212 callUpdateObservers.remove(listener); 213 unregisterMapped(callLinkObservers, listener); 214 }); 215 } 216 217 public void unregisterObserver(@NonNull MessageObserver listener) { 218 executor.execute(() -> { 219 messageUpdateObservers.remove(listener); 220 unregisterMapped(messageInsertObservers, listener); 221 }); 222 } 223 224 public void notifyConversationListeners(Set<Long> threadIds) { 225 for (long threadId : threadIds) { 226 notifyConversationListeners(threadId); 227 } 228 } 229 230 public void notifyConversationListeners(long threadId) { 231 runPostSuccessfulTransaction(KEY_CONVERSATION + threadId, () -> { 232 notifyMapped(conversationObservers, threadId); 233 notifyMapped(verboseConversationObservers, threadId); 234 }); 235 } 236 237 public void notifyVerboseConversationListeners(Set<Long> threadIds) { 238 for (long threadId : threadIds) { 239 runPostSuccessfulTransaction(KEY_VERBOSE_CONVERSATION + threadId, () -> { 240 notifyMapped(verboseConversationObservers, threadId); 241 }); 242 } 243 } 244 245 public void notifyConversationDeleteListeners(Set<Long> threadIds) { 246 for (long threadId : threadIds) { 247 notifyConversationDeleteListeners(threadId); 248 } 249 } 250 251 public void notifyConversationDeleteListeners(long threadId) { 252 runPostSuccessfulTransaction(KEY_CONVERSATION_DELETES + threadId, () -> { 253 notifyMapped(conversationDeleteObservers, threadId); 254 }); 255 } 256 257 public void notifyConversationListListeners() { 258 runPostSuccessfulTransaction(KEY_CONVERSATION_LIST, () -> { 259 for (Observer listener : conversationListObservers) { 260 listener.onChanged(); 261 } 262 }); 263 } 264 265 public void notifyPaymentListeners(@NonNull UUID paymentId) { 266 runPostSuccessfulTransaction(KEY_PAYMENT + paymentId.toString(), () -> { 267 notifyMapped(paymentObservers, paymentId); 268 }); 269 } 270 271 public void notifyAllPaymentsListeners() { 272 runPostSuccessfulTransaction(KEY_ALL_PAYMENTS, () -> { 273 notifySet(allPaymentsObservers); 274 }); 275 } 276 277 public void notifyChatColorsListeners() { 278 runPostSuccessfulTransaction(KEY_CHAT_COLORS, () -> { 279 for (Observer chatColorsObserver : chatColorsObservers) { 280 chatColorsObserver.onChanged(); 281 } 282 }); 283 } 284 285 public void notifyStickerObservers() { 286 runPostSuccessfulTransaction(KEY_STICKERS, () -> { 287 notifySet(stickerObservers); 288 }); 289 } 290 291 public void notifyStickerPackObservers() { 292 runPostSuccessfulTransaction(KEY_STICKER_PACKS, () -> { 293 notifySet(stickerPackObservers); 294 }); 295 } 296 297 public void notifyAttachmentObservers() { 298 runPostSuccessfulTransaction(KEY_ATTACHMENTS, () -> { 299 notifySet(attachmentObservers); 300 }); 301 } 302 303 public void notifyMessageUpdateObservers(@NonNull MessageId messageId) { 304 runPostSuccessfulTransaction(KEY_MESSAGE_UPDATE + messageId.toString(), () -> { 305 messageUpdateObservers.stream().forEach(l -> l.onMessageChanged(messageId)); 306 }); 307 } 308 309 public void notifyMessageInsertObservers(long threadId, @NonNull MessageId messageId) { 310 runPostSuccessfulTransaction(KEY_MESSAGE_INSERT + messageId, () -> { 311 Set<MessageObserver> listeners = messageInsertObservers.get(threadId); 312 313 if (listeners != null) { 314 listeners.stream().forEach(l -> l.onMessageChanged(messageId)); 315 } 316 }); 317 } 318 319 public void notifyNotificationProfileObservers() { 320 runPostSuccessfulTransaction(KEY_NOTIFICATION_PROFILES, () -> { 321 notifySet(notificationProfileObservers); 322 }); 323 } 324 325 public void notifyRecipientChanged(@NonNull RecipientId recipientId) { 326 SignalDatabase.runPostSuccessfulTransaction(KEY_RECIPIENT + recipientId.serialize(), () -> { 327 Recipient.live(recipientId).refresh(); 328 }); 329 } 330 331 public void notifyStoryObservers(@NonNull RecipientId recipientId) { 332 runPostSuccessfulTransaction(KEY_STORY_OBSERVER, () -> { 333 notifyMapped(storyObservers, recipientId); 334 }); 335 } 336 337 public void notifyStoryObservers(@NonNull Collection<RecipientId> recipientIds) { 338 for (RecipientId recipientId : recipientIds) { 339 runPostSuccessfulTransaction(KEY_STORY_OBSERVER, () -> { 340 notifyMapped(storyObservers, recipientId); 341 }); 342 } 343 } 344 345 public void notifyScheduledMessageObservers(long threadId) { 346 runPostSuccessfulTransaction(KEY_SCHEDULED_MESSAGES + threadId, () -> { 347 notifyMapped(scheduledMessageObservers, threadId); 348 }); 349 } 350 351 public void notifyCallUpdateObservers() { 352 runPostSuccessfulTransaction(KEY_CALL_UPDATES, () -> notifySet(callUpdateObservers)); 353 } 354 355 public void notifyCallLinkObservers(@NonNull CallLinkRoomId callLinkRoomId) { 356 runPostSuccessfulTransaction(KEY_CALL_LINK_UPDATES, () -> notifyMapped(callLinkObservers, callLinkRoomId)); 357 } 358 359 private void runPostSuccessfulTransaction(@NonNull String dedupeKey, @NonNull Runnable runnable) { 360 SignalDatabase.runPostSuccessfulTransaction(dedupeKey, () -> { 361 executor.execute(runnable); 362 }); 363 } 364 365 private <K, V> void registerMapped(@NonNull Map<K, Set<V>> map, @NonNull K key, @NonNull V listener) { 366 Set<V> listeners = map.get(key); 367 368 if (listeners == null) { 369 listeners = new HashSet<>(); 370 } 371 372 listeners.add(listener); 373 map.put(key, listeners); 374 } 375 376 private <K, V> void unregisterMapped(@NonNull Map<K, Set<V>> map, @NonNull V listener) { 377 for (Map.Entry<K, Set<V>> entry : map.entrySet()) { 378 entry.getValue().remove(listener); 379 } 380 } 381 382 private static <K> void notifyMapped(@NonNull Map<K, Set<Observer>> map, @NonNull K key) { 383 Set<Observer> listeners = map.get(key); 384 385 if (listeners != null) { 386 for (Observer listener : listeners) { 387 listener.onChanged(); 388 } 389 } 390 } 391 392 private static void notifySet(@NonNull Set<Observer> set) { 393 for (final Observer observer : set) { 394 observer.onChanged(); 395 } 396 } 397 398 /** 399 * Blocks until the executor is empty. Only intended to be used for testing. 400 */ 401 @VisibleForTesting 402 void flush() { 403 CountDownLatch latch = new CountDownLatch(1); 404 executor.execute(latch::countDown); 405 406 try { 407 latch.await(); 408 } catch (InterruptedException e) { 409 throw new AssertionError(); 410 } 411 } 412 413 public interface Observer { 414 /** 415 * Called when the relevant data changes. Executed on a serial executor, so don't do any 416 * long-running tasks! 417 */ 418 void onChanged(); 419 } 420 421 public interface MessageObserver { 422 void onMessageChanged(@NonNull MessageId messageId); 423 } 424}