Re-implement MergedTimeline

This commit is contained in:
Alexander Nozik 2024-12-08 12:04:11 +03:00
parent 203a8c1570
commit f708bb93e2
4 changed files with 77 additions and 18 deletions

View File

@ -22,11 +22,11 @@ public fun <E : TimelineEvent> Flow<E>.withTimeThreshold(
* @param lookaheadInterval an interval for generated events ahead of the last observed event.
*/
public class GeneratingTimeline<E : TimelineEvent>(
private val origin: E,
origin: E,
private val lookaheadInterval: Duration,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
private val generator: suspend FlowCollector<E>.(E) -> Unit
) : AbstractTimeline<E>(origin.time, coroutineContext) {
) : ProducerTimeline<E>(origin.time, coroutineContext) {
private val startEventFlow = MutableStateFlow(origin)

View File

@ -1,7 +1,11 @@
package space.kscience.simulation
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Instant
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
@ -9,18 +13,70 @@ import kotlin.coroutines.EmptyCoroutineContext
public class MergedTimeline<E : TimelineEvent>(
private val timelines: List<Timeline<E>>,
coroutineContext: CoroutineContext = EmptyCoroutineContext
) : AbstractTimeline<E>(timelines.minOf { it.time.value }, coroutineContext) {
) : Timeline<E> {
override fun events(): Flow<E> = flow {
val buffer = TODO()
//
// timelines.forEach { timeline ->
// timeline.observe {
// collect{
// buffer.add(it)
// }
// }
// }
protected val timelineScope: CoroutineScope = CoroutineScope(
coroutineContext +
SupervisorJob(coroutineContext[Job]) +
CoroutineExceptionHandler{ _, throwable -> throwable.printStackTrace() } +
CoroutineName("MergedTimeline")
)
override val time: StateFlow<Instant> = combine(timelines.map { it.time }){ array->
array.max()
}.stateIn(timelineScope, SharingStarted.Lazily, timelines.maxOf { it.time.value })
override suspend fun advance(toTime: Instant) {
observers.forEach {
it.collect(toTime)
}
}
private val observers: MutableSet<TimelineObserver> = mutableSetOf()
override suspend fun observe(collector: suspend Flow<E>.() -> Unit): TimelineObserver {
val context = currentCoroutineContext()
val buffer = mutableListOf<E>()
val timelineObservers = timelines.map {
it.observeEach { event ->
buffer.add(event)
}
}
val observer = object : TimelineObserver {
private val channel = Channel<E>()
override val time = MutableStateFlow(this@MergedTimeline.time.value)
private val collectJob = timelineScope.launch(context) {
channel.consumeAsFlow().onEach {
time.emit(it.time)
}.collector()
}
private val mutex = Mutex()
override suspend fun collect(upTo: Instant) = mutex.withLock{
timelineObservers.forEach {
it.collect(upTo)
}
buffer.sortedBy { it.time }.forEach {
channel.send(it)
buffer.remove(it)
}
}
override fun close() {
collectJob.cancel()
observers.remove(this)
}
}
observers.add(observer)
return observer
}
}

View File

@ -4,10 +4,12 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Instant
import kotlin.coroutines.CoroutineContext
public abstract class AbstractTimeline<E : TimelineEvent>(
public abstract class ProducerTimeline<E : TimelineEvent>(
protected var startTime: Instant,
coroutineContext: CoroutineContext
) : Timeline<E>, AutoCloseable {
@ -53,8 +55,9 @@ public abstract class AbstractTimeline<E : TimelineEvent>(
}.collector()
}
private val mutex = Mutex()
override suspend fun collect(upTo: Instant) = coroutineScope {
override suspend fun collect(upTo: Instant) = mutex.withLock {
require(upTo >= time.value) { "Requested time $upTo is lower than observed ${time.value}" }
events().takeWhile {
it.time <= upTo

View File

@ -13,7 +13,7 @@ import kotlin.coroutines.EmptyCoroutineContext
public class SharedTimeline<E : TimelineEvent>(
startTime: Instant,
coroutineContext: CoroutineContext = EmptyCoroutineContext
) : AbstractTimeline<E>(startTime, coroutineContext) {
) : ProducerTimeline<E>(startTime, coroutineContext) {
private val events = MutableSharedFlow<E>(replay = Channel.UNLIMITED)