That fuck shit the fascists are using
1package org.tm.archive.database
2
3import io.reactivex.rxjava3.core.BackpressureStrategy
4import io.reactivex.rxjava3.core.Emitter
5import io.reactivex.rxjava3.core.Flowable
6import io.reactivex.rxjava3.schedulers.Schedulers
7import org.tm.archive.dependencies.ApplicationDependencies
8import java.util.concurrent.TimeUnit
9
10/**
11 * Provide a shared Rx interface to listen to database updates and ensure listeners
12 * execute on [Schedulers.io].
13 */
14object RxDatabaseObserver {
15
16 val conversationList: Flowable<Unit> by lazy { conversationListFlowable() }
17 val notificationProfiles: Flowable<Unit> by lazy { notificationProfilesFlowable() }
18
19 private fun conversationListFlowable(): Flowable<Unit> {
20 return databaseFlowable { listener ->
21 ApplicationDependencies.getDatabaseObserver().registerConversationListObserver(listener)
22 }
23 }
24
25 fun conversation(threadId: Long): Flowable<Unit> {
26 return databaseFlowable { listener ->
27 ApplicationDependencies.getDatabaseObserver().registerVerboseConversationObserver(threadId, listener)
28 }
29 }
30
31 @Suppress("RedundantUnitExpression")
32 private fun notificationProfilesFlowable(): Flowable<Unit> {
33 return Flowable.combineLatest(
34 Flowable.interval(0, 30, TimeUnit.SECONDS),
35 databaseFlowable { ApplicationDependencies.getDatabaseObserver().registerNotificationProfileObserver(it) }
36 ) { _, _ -> Unit }
37 }
38
39 private fun databaseFlowable(registerObserver: (RxObserver) -> Unit): Flowable<Unit> {
40 val flowable = Flowable.create(
41 {
42 val listener = RxObserver(it)
43
44 registerObserver(listener)
45 it.setCancellable { ApplicationDependencies.getDatabaseObserver().unregisterObserver(listener) }
46
47 listener.prime()
48 },
49 BackpressureStrategy.LATEST
50 )
51
52 return flowable
53 .subscribeOn(Schedulers.io())
54 .observeOn(Schedulers.io())
55 .replay(1)
56 .refCount()
57 .observeOn(Schedulers.io())
58 }
59
60 private class RxObserver(private val emitter: Emitter<Unit>) : DatabaseObserver.Observer {
61 fun prime() {
62 emitter.onNext(Unit)
63 }
64
65 override fun onChanged() {
66 emitter.onNext(Unit)
67 }
68 }
69}