From becde944032c84074ec0b940680149988a2ac09b Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Thu, 21 Nov 2024 09:42:53 +0300 Subject: [PATCH] GeneratingTimeLine prototype is working --- .../commonMain/kotlin/GeneratingTimeline.kt | 108 ++++++++++++------ .../src/commonMain/kotlin/MergedTimeline.kt | 20 ++-- .../src/commonMain/kotlin/SharedTimeline.kt | 42 +++---- .../src/commonMain/kotlin/Timeline.kt | 43 ++++--- .../src/commonMain/kotlin/notNullUtils.kt | 35 ++++++ .../src/commonTest/kotlin/TimelineTests.kt | 34 ++++++ 6 files changed, 201 insertions(+), 81 deletions(-) create mode 100644 simulation-kt/src/commonMain/kotlin/notNullUtils.kt create mode 100644 simulation-kt/src/commonTest/kotlin/TimelineTests.kt diff --git a/simulation-kt/src/commonMain/kotlin/GeneratingTimeline.kt b/simulation-kt/src/commonMain/kotlin/GeneratingTimeline.kt index 4e9da50..f3d7d92 100644 --- a/simulation-kt/src/commonMain/kotlin/GeneratingTimeline.kt +++ b/simulation-kt/src/commonMain/kotlin/GeneratingTimeline.kt @@ -2,10 +2,9 @@ package space.kscience.simulation import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.asFlow -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.flow.takeWhile +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.* import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock @@ -20,21 +19,49 @@ public class GeneratingTimeline( private val initialEvent: E, private val lookaheadInterval: Duration, private val generatorChain: suspend (E) -> E -) : Timeline { +) : Timeline, AutoCloseable { + + // push to this channel to trigger event generation + private val wakeupChannel = Channel(onBufferOverflow = BufferOverflow.DROP_OLDEST) + + private suspend fun kickGenerator() { + wakeupChannel.send(Unit) + } private val mutex = Mutex() - private val events = ArrayDeque() + private val history = ArrayDeque() + + private val lastEvent = MutableSharedFlow(replay = Int.MAX_VALUE) + + private val updateHistoryJob = generationScope.launch { + lastEvent.onEach { + mutex.withLock { + history.add(it) + //cleanup old events + val threshold = observedTime ?: return@withLock + while (history.isNotEmpty() && history.last().time > threshold) { + history.removeFirst() + } + + } + } + } private val observers: MutableSet = mutableSetOf() - override val lastEventTime: Instant? - get() = events.lastOrNull()?.time + override val time: Instant + get() = history.lastOrNull()?.time ?: initialEvent.time - override val observedTime: Instant - get() = observers.minOfOrNull { it.lastCollectedEventTime } ?: Instant.DISTANT_PAST + override val observedTime: Instant? + get() = observers.minOfNotNullOrNull { it.time } - override fun flowUnobservedEvents(): Flow = events.asFlow() + override fun flowUnobservedEvents(): Flow = flow { + history.forEach { e -> + emit(e) + } + emitAll(lastEvent) + } override suspend fun advance(toTime: Instant) { observers.forEach { @@ -42,51 +69,60 @@ public class GeneratingTimeline( } } - private var generatorJob: Job = launchGenerateJob(initialEvent) + private var generatorJob: Job = launchGenerator(initialEvent) - private fun launchGenerateJob(event: E): Job = generationScope.launch { + private fun launchGenerator(event: E): Job = generationScope.launch { + kickGenerator() var currentEvent = event - while(currentEvent.time < observedTime + lookaheadInterval) { - val nextEvent = generatorChain(currentEvent) - mutex.withLock { - events.add(nextEvent) + // for each wakeup generate all events in lookaheadInterval + for (u in wakeupChannel) { + while (currentEvent.time < (observedTime ?: event.time) + lookaheadInterval) { + val nextEvent = generatorChain(currentEvent) + lastEvent.emit(nextEvent) + currentEvent = nextEvent } - currentEvent = nextEvent } } - private fun regenerate(event: E) { - generatorJob.cancel() - generatorJob = launchGenerateJob(event) - } - /** - * Discard unconsumed events after [atTime]. - */ - override suspend fun interrupt(atTime: Instant): Unit { - if (atTime < observedTime) - error("Timeline interrupt at time $atTime is not possible because there are observed events before $observedTime") + public suspend fun interrupt(newStart: E) { + check(newStart.time > (observedTime ?: Instant.DISTANT_FUTURE)) { + "Can't interrupt generating timeline after observed event" + } mutex.withLock { - while (events.isNotEmpty() && events.last().time > atTime) { - events.removeLast() + while (history.isNotEmpty() && history.last().time > newStart.time) { + history.removeLast() } + generatorJob.cancel() + generatorJob = launchGenerator(newStart) + } + kickGenerator() + } + + override fun close() { + updateHistoryJob.cancel() + generatorJob.cancel() } override suspend fun observe(collector: suspend Flow.() -> Unit): TimelineObserver { val observer = object : TimelineObserver { - val observerMutex = Mutex() - override var lastCollectedEventTime: Instant = Instant.DISTANT_PAST + override var time: Instant = this@GeneratingTimeline.time - override suspend fun collect(upTo: Instant) = observerMutex.withLock { - flowUnobservedEvents().takeWhile { it.time <= upTo }.onEach { - lastCollectedEventTime = it.time + override suspend fun collect(upTo: Instant) { + flowUnobservedEvents().takeWhile { + it.time <= upTo + }.onEach { + time = it.time + kickGenerator() }.collector() - //cleanup() } override fun close() { observers.remove(this) + if(observers.isEmpty()){ + this@GeneratingTimeline.close() + } } } diff --git a/simulation-kt/src/commonMain/kotlin/MergedTimeline.kt b/simulation-kt/src/commonMain/kotlin/MergedTimeline.kt index 991cdc4..c498bb8 100644 --- a/simulation-kt/src/commonMain/kotlin/MergedTimeline.kt +++ b/simulation-kt/src/commonMain/kotlin/MergedTimeline.kt @@ -6,13 +6,15 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.takeWhile import kotlinx.datetime.Instant + public class MergedTimeline( private val timelines: List> ) : Timeline { - override val lastEventTime: Instant? - get() = timelines.minOfOrNull { it.lastEventTime ?: Instant.DISTANT_PAST } - override val observedTime: Instant - get() = timelines.maxOfOrNull { it.observedTime } ?: Instant.DISTANT_FUTURE + override val time: Instant + get() = timelines.minOfNotNullOrNull { it.time } ?: Instant.DISTANT_PAST + + override val observedTime: Instant? + get() = timelines.maxOfNotNullOrNull { it.observedTime } override fun flowUnobservedEvents(): Flow = timelines.map { flowUnobservedEvents() }.merge() @@ -20,21 +22,21 @@ public class MergedTimeline( timelines.forEach { it.advance(toTime) } } - override suspend fun interrupt(atTime: Instant) { - timelines.forEach { it.interrupt(atTime) } - } +// override suspend fun interrupt(atTime: Instant) { +// timelines.forEach { it.interrupt(atTime) } +// } private val observers: MutableSet = mutableSetOf() override suspend fun observe(collector: suspend Flow.() -> Unit): TimelineObserver { val observer = object : TimelineObserver { - override var lastCollectedEventTime: Instant = Instant.DISTANT_PAST + override var time: Instant = this@MergedTimeline.time override suspend fun collect(upTo: Instant) = timelines .map { flowUnobservedEvents() } .merge() .takeWhile { it.time <= upTo }.onEach { - lastCollectedEventTime = it.time + time = it.time }.collector() diff --git a/simulation-kt/src/commonMain/kotlin/SharedTimeline.kt b/simulation-kt/src/commonMain/kotlin/SharedTimeline.kt index 067d4d2..b5e4845 100644 --- a/simulation-kt/src/commonMain/kotlin/SharedTimeline.kt +++ b/simulation-kt/src/commonMain/kotlin/SharedTimeline.kt @@ -19,11 +19,11 @@ public class SharedTimeline : Timeline { private val observers: MutableSet = mutableSetOf() - override val lastEventTime: Instant? - get() = events.lastOrNull()?.time + override val time: Instant + get() = events.lastOrNull()?.time ?: Instant.DISTANT_PAST - override val observedTime: Instant - get() = observers.minOfOrNull { it.lastCollectedEventTime } ?: Instant.DISTANT_PAST + override val observedTime: Instant? + get() = observers.minOfNotNullOrNull { it.time } override fun flowUnobservedEvents(): Flow = events.asFlow() @@ -31,7 +31,9 @@ public class SharedTimeline : Timeline { * Emit new event to the timeline */ public suspend fun emit(event: E): Boolean = mutex.withLock { - if (event.time < observedTime) error("Can't emit event $event because there are observed events after $observedTime") + if (event.time < (observedTime ?: Instant.DISTANT_PAST)) { + error("Can't emit event $event because there are observed events after $observedTime") + } events.add(event) } @@ -44,33 +46,33 @@ public class SharedTimeline : Timeline { /** * Discard all events before [observedTime] */ - private suspend fun cleanup(): Unit { - val threshold = observedTime + private suspend fun cleanup(): Unit = mutex.withLock { + val threshold = observedTime ?: return@withLock while (events.isNotEmpty() && events.last().time > threshold) { events.removeFirst() } } - /** - * Discard unconsumed events after [atTime]. - */ - override suspend fun interrupt(atTime: Instant): Unit = mutex.withLock { - val threshold = observedTime - if (atTime < threshold) - error("Timeline interrupt at time $atTime is not possible because there are observed events before $threshold") - while (events.isNotEmpty() && events.last().time > atTime) { - events.removeLast() - } - } +// /** +// * Discard unconsumed events after [atTime]. +// */ +// override suspend fun interrupt(atTime: Instant): Unit = mutex.withLock { +// val threshold = observedTime +// if (atTime < threshold) +// error("Timeline interrupt at time $atTime is not possible because there are observed events before $threshold") +// while (events.isNotEmpty() && events.last().time > atTime) { +// events.removeLast() +// } +// } override suspend fun observe(collector: suspend Flow.() -> Unit): TimelineObserver { val observer = object : TimelineObserver { val observerMutex = Mutex() - override var lastCollectedEventTime: Instant = Instant.DISTANT_PAST + override var time: Instant = this@SharedTimeline.time override suspend fun collect(upTo: Instant) = observerMutex.withLock { flowUnobservedEvents().takeWhile { it.time <= upTo }.onEach { - lastCollectedEventTime = it.time + time = it.time }.collector() cleanup() } diff --git a/simulation-kt/src/commonMain/kotlin/Timeline.kt b/simulation-kt/src/commonMain/kotlin/Timeline.kt index f9f4535..64974ff 100644 --- a/simulation-kt/src/commonMain/kotlin/Timeline.kt +++ b/simulation-kt/src/commonMain/kotlin/Timeline.kt @@ -2,20 +2,31 @@ package space.kscience.simulation import kotlinx.coroutines.flow.Flow import kotlinx.datetime.Instant +import kotlin.time.Duration public interface TimelineEvent { public val time: Instant } -public interface TimelineObserver: AutoCloseable { +public interface TimelineInterval : TimelineEvent { + public val startTime: Instant + public val duration: Duration + + override val time: Instant + get() = startTime + duration +} + +public data class SimpleTimelineEvent(override val time: Instant, val value: T) : TimelineEvent + +public interface TimelineObserver : AutoCloseable { /** - * The time of the last event collected by this collector + * The subjective time of this observer */ - public val lastCollectedEventTime: Instant + public val time: Instant /** - * Collect all uncollected events from [lastCollectedEventTime] to [upTo]. + * Collect all uncollected events from [time] to [upTo]. * * By default, collects all events. */ @@ -32,17 +43,17 @@ public interface TimelineObserver: AutoCloseable { */ public interface Timeline { /** - * The timestamp of the last event in a timeline + * A subjective time of this timeline. The time could advance without events being produced. */ - public val lastEventTime: Instant? + public val time: Instant /** * The time of the last event that was observed by all observers */ - public val observedTime: Instant + public val observedTime: Instant? /** - * Flow events from [observedTime] to [lastEventTime]. + * Flow events from [observedTime] to [time]. * * The resulting flow is finite and should not suspend. * @@ -53,8 +64,8 @@ public interface Timeline { /** * Attach observer to this [Timeline]. The observer collection is not triggered right away, but only on demand. * - * Each collection shifts [TimelineObserver.lastCollectedEventTime] for this observer. - * The value of [observedTime] is the least of all observers [TimelineObserver.lastCollectedEventTime]. + * Each collection shifts [TimelineObserver.time] for this observer. + * The value of [observedTime] is the least of all observers [TimelineObserver.time]. */ public suspend fun observe( collector: suspend Flow.() -> Unit @@ -67,10 +78,10 @@ public interface Timeline { */ public suspend fun advance(toTime: Instant) - /** - * Interrupt generation of this timeline and discard unconsumed events after [atTime]. - * - * Throw exception if at least one observer advanced - */ - public suspend fun interrupt(atTime: Instant): Unit +// /** +// * Interrupt generation of this timeline and discard unconsumed events after [atTime]. +// * +// * Throw exception if at least one observer advanced +// */ +// public suspend fun interrupt(atTime: Instant): Unit } \ No newline at end of file diff --git a/simulation-kt/src/commonMain/kotlin/notNullUtils.kt b/simulation-kt/src/commonMain/kotlin/notNullUtils.kt new file mode 100644 index 0000000..5e7d64b --- /dev/null +++ b/simulation-kt/src/commonMain/kotlin/notNullUtils.kt @@ -0,0 +1,35 @@ +package space.kscience.simulation + +internal inline fun > Iterable.minOfNotNullOrNull(selector: (T) -> R?): R? { + val iterator = iterator() + if (!iterator.hasNext()) return null + var minValue = selector(iterator.next()) + while (iterator.hasNext()) { + val v = selector(iterator.next()) + when { + minValue == null -> minValue = v + v == null -> {/*do nothing*/} + minValue > v -> { + minValue = v + } + } + } + return minValue +} + +internal inline fun > Iterable.maxOfNotNullOrNull(selector: (T) -> R?): R? { + val iterator = iterator() + if (!iterator.hasNext()) return null + var maxValue = selector(iterator.next()) + while (iterator.hasNext()) { + val v = selector(iterator.next()) + when { + maxValue == null -> maxValue = v + v == null -> {/*do nothing*/} + maxValue < v -> { + maxValue = v + } + } + } + return maxValue +} \ No newline at end of file diff --git a/simulation-kt/src/commonTest/kotlin/TimelineTests.kt b/simulation-kt/src/commonTest/kotlin/TimelineTests.kt new file mode 100644 index 0000000..a1bf632 --- /dev/null +++ b/simulation-kt/src/commonTest/kotlin/TimelineTests.kt @@ -0,0 +1,34 @@ +package space.kscience.simulation + +import kotlinx.coroutines.test.runTest +import kotlinx.datetime.Instant +import kotlin.test.Test +import kotlin.time.Duration.Companion.seconds + +class TimelineTests { + + + @Test + fun testGeneration() = runTest(timeout = 5.seconds) { + val startTime = Instant.parse("2020-01-01T00:00:00.000Z") + + val generation = GeneratingTimeline>( + this, + initialEvent = SimpleTimelineEvent(startTime, List(10) { it.toDouble() }.toDoubleArray()), + lookaheadInterval = 1.seconds + ) { event -> + val time = event.time + 0.1.seconds + println("Emit: $time") + SimpleTimelineEvent(time, event.value.map { it + 1.0 }.toDoubleArray()) + } + + val collector = generation.observe { + collect { + println("Consume: ${it.time}") + } + } + + collector.collect(startTime + 2.seconds) + collector.close() + } +} \ No newline at end of file