From f708bb93e238ac100dc9ff26a2e2219e84762fa4 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sun, 8 Dec 2024 12:04:11 +0300 Subject: [PATCH] Re-implement MergedTimeline --- .../commonMain/kotlin/GeneratingTimeline.kt | 4 +- .../src/commonMain/kotlin/MergedTimeline.kt | 82 ++++++++++++++++--- ...bstractTimeline.kt => ProducerTimeline.kt} | 7 +- .../src/commonMain/kotlin/SharedTimeline.kt | 2 +- 4 files changed, 77 insertions(+), 18 deletions(-) rename simulation-kt/src/commonMain/kotlin/{AbstractTimeline.kt => ProducerTimeline.kt} (90%) diff --git a/simulation-kt/src/commonMain/kotlin/GeneratingTimeline.kt b/simulation-kt/src/commonMain/kotlin/GeneratingTimeline.kt index 5690d7c..0698b45 100644 --- a/simulation-kt/src/commonMain/kotlin/GeneratingTimeline.kt +++ b/simulation-kt/src/commonMain/kotlin/GeneratingTimeline.kt @@ -22,11 +22,11 @@ public fun Flow.withTimeThreshold( * @param lookaheadInterval an interval for generated events ahead of the last observed event. */ public class GeneratingTimeline( - private val origin: E, + origin: E, private val lookaheadInterval: Duration, coroutineContext: CoroutineContext = EmptyCoroutineContext, private val generator: suspend FlowCollector.(E) -> Unit -) : AbstractTimeline(origin.time, coroutineContext) { +) : ProducerTimeline(origin.time, coroutineContext) { private val startEventFlow = MutableStateFlow(origin) diff --git a/simulation-kt/src/commonMain/kotlin/MergedTimeline.kt b/simulation-kt/src/commonMain/kotlin/MergedTimeline.kt index 0da6227..4614fbb 100644 --- a/simulation-kt/src/commonMain/kotlin/MergedTimeline.kt +++ b/simulation-kt/src/commonMain/kotlin/MergedTimeline.kt @@ -1,7 +1,11 @@ package space.kscience.simulation -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.datetime.Instant import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext @@ -9,18 +13,70 @@ import kotlin.coroutines.EmptyCoroutineContext public class MergedTimeline( private val timelines: List>, coroutineContext: CoroutineContext = EmptyCoroutineContext -) : AbstractTimeline(timelines.minOf { it.time.value }, coroutineContext) { +) : Timeline { - override fun events(): Flow = flow { - val buffer = TODO() -// -// timelines.forEach { timeline -> -// timeline.observe { -// collect{ -// buffer.add(it) -// } -// } -// } + protected val timelineScope: CoroutineScope = CoroutineScope( + coroutineContext + + SupervisorJob(coroutineContext[Job]) + + CoroutineExceptionHandler{ _, throwable -> throwable.printStackTrace() } + + CoroutineName("MergedTimeline") + ) + + override val time: StateFlow = combine(timelines.map { it.time }){ array-> + array.max() + }.stateIn(timelineScope, SharingStarted.Lazily, timelines.maxOf { it.time.value }) + + override suspend fun advance(toTime: Instant) { + observers.forEach { + it.collect(toTime) + } + } + + private val observers: MutableSet = mutableSetOf() + + override suspend fun observe(collector: suspend Flow.() -> Unit): TimelineObserver { + val context = currentCoroutineContext() + val buffer = mutableListOf() + + val timelineObservers = timelines.map { + it.observeEach { event -> + buffer.add(event) + } + } + + val observer = object : TimelineObserver { + + private val channel = Channel() + + override val time = MutableStateFlow(this@MergedTimeline.time.value) + + private val collectJob = timelineScope.launch(context) { + channel.consumeAsFlow().onEach { + time.emit(it.time) + }.collector() + } + + private val mutex = Mutex() + + override suspend fun collect(upTo: Instant) = mutex.withLock{ + timelineObservers.forEach { + it.collect(upTo) + } + buffer.sortedBy { it.time }.forEach { + channel.send(it) + buffer.remove(it) + } + } + + override fun close() { + collectJob.cancel() + observers.remove(this) + } + + } + + observers.add(observer) + return observer } } \ No newline at end of file diff --git a/simulation-kt/src/commonMain/kotlin/AbstractTimeline.kt b/simulation-kt/src/commonMain/kotlin/ProducerTimeline.kt similarity index 90% rename from simulation-kt/src/commonMain/kotlin/AbstractTimeline.kt rename to simulation-kt/src/commonMain/kotlin/ProducerTimeline.kt index b4d243f..a201d92 100644 --- a/simulation-kt/src/commonMain/kotlin/AbstractTimeline.kt +++ b/simulation-kt/src/commonMain/kotlin/ProducerTimeline.kt @@ -4,10 +4,12 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.* +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlinx.datetime.Instant import kotlin.coroutines.CoroutineContext -public abstract class AbstractTimeline( +public abstract class ProducerTimeline( protected var startTime: Instant, coroutineContext: CoroutineContext ) : Timeline, AutoCloseable { @@ -53,8 +55,9 @@ public abstract class AbstractTimeline( }.collector() } + private val mutex = Mutex() - override suspend fun collect(upTo: Instant) = coroutineScope { + override suspend fun collect(upTo: Instant) = mutex.withLock { require(upTo >= time.value) { "Requested time $upTo is lower than observed ${time.value}" } events().takeWhile { it.time <= upTo diff --git a/simulation-kt/src/commonMain/kotlin/SharedTimeline.kt b/simulation-kt/src/commonMain/kotlin/SharedTimeline.kt index 1b45002..051524e 100644 --- a/simulation-kt/src/commonMain/kotlin/SharedTimeline.kt +++ b/simulation-kt/src/commonMain/kotlin/SharedTimeline.kt @@ -13,7 +13,7 @@ import kotlin.coroutines.EmptyCoroutineContext public class SharedTimeline( startTime: Instant, coroutineContext: CoroutineContext = EmptyCoroutineContext -) : AbstractTimeline(startTime, coroutineContext) { +) : ProducerTimeline(startTime, coroutineContext) { private val events = MutableSharedFlow(replay = Channel.UNLIMITED)