That fuck shit the fascists are using
at master 69 lines 2.2 kB view raw
1package org.tm.archive.database 2 3import io.reactivex.rxjava3.core.BackpressureStrategy 4import io.reactivex.rxjava3.core.Emitter 5import io.reactivex.rxjava3.core.Flowable 6import io.reactivex.rxjava3.schedulers.Schedulers 7import org.tm.archive.dependencies.ApplicationDependencies 8import java.util.concurrent.TimeUnit 9 10/** 11 * Provide a shared Rx interface to listen to database updates and ensure listeners 12 * execute on [Schedulers.io]. 13 */ 14object RxDatabaseObserver { 15 16 val conversationList: Flowable<Unit> by lazy { conversationListFlowable() } 17 val notificationProfiles: Flowable<Unit> by lazy { notificationProfilesFlowable() } 18 19 private fun conversationListFlowable(): Flowable<Unit> { 20 return databaseFlowable { listener -> 21 ApplicationDependencies.getDatabaseObserver().registerConversationListObserver(listener) 22 } 23 } 24 25 fun conversation(threadId: Long): Flowable<Unit> { 26 return databaseFlowable { listener -> 27 ApplicationDependencies.getDatabaseObserver().registerVerboseConversationObserver(threadId, listener) 28 } 29 } 30 31 @Suppress("RedundantUnitExpression") 32 private fun notificationProfilesFlowable(): Flowable<Unit> { 33 return Flowable.combineLatest( 34 Flowable.interval(0, 30, TimeUnit.SECONDS), 35 databaseFlowable { ApplicationDependencies.getDatabaseObserver().registerNotificationProfileObserver(it) } 36 ) { _, _ -> Unit } 37 } 38 39 private fun databaseFlowable(registerObserver: (RxObserver) -> Unit): Flowable<Unit> { 40 val flowable = Flowable.create( 41 { 42 val listener = RxObserver(it) 43 44 registerObserver(listener) 45 it.setCancellable { ApplicationDependencies.getDatabaseObserver().unregisterObserver(listener) } 46 47 listener.prime() 48 }, 49 BackpressureStrategy.LATEST 50 ) 51 52 return flowable 53 .subscribeOn(Schedulers.io()) 54 .observeOn(Schedulers.io()) 55 .replay(1) 56 .refCount() 57 .observeOn(Schedulers.io()) 58 } 59 60 private class RxObserver(private val emitter: Emitter<Unit>) : DatabaseObserver.Observer { 61 fun prime() { 62 emitter.onNext(Unit) 63 } 64 65 override fun onChanged() { 66 emitter.onNext(Unit) 67 } 68 } 69}