diff --git a/simulation-kt/src/commonMain/kotlin/AbstractTimeline.kt b/simulation-kt/src/commonMain/kotlin/AbstractTimeline.kt index 9876083..b4d243f 100644 --- a/simulation-kt/src/commonMain/kotlin/AbstractTimeline.kt +++ b/simulation-kt/src/commonMain/kotlin/AbstractTimeline.kt @@ -1,18 +1,23 @@ package space.kscience.simulation -import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.* import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.flow.* -import kotlinx.coroutines.launch import kotlinx.datetime.Instant +import kotlin.coroutines.CoroutineContext public abstract class AbstractTimeline( - protected val timelineScope: CoroutineScope, protected var startTime: Instant, -) : Timeline { + coroutineContext: CoroutineContext +) : Timeline, AutoCloseable { + + protected val timelineScope: CoroutineScope = CoroutineScope( + coroutineContext + + SupervisorJob(coroutineContext[Job]) + + CoroutineExceptionHandler{ _, throwable -> throwable.printStackTrace() } + + CoroutineName("Timeline") + ) private val observers: MutableSet = mutableSetOf() @@ -67,4 +72,9 @@ public abstract class AbstractTimeline( observers.add(observer) return observer } + + override fun close() { + observers.forEach { it.close() } + timelineScope.cancel() + } } \ No newline at end of file diff --git a/simulation-kt/src/commonMain/kotlin/GeneratingTimeline.kt b/simulation-kt/src/commonMain/kotlin/GeneratingTimeline.kt index 6f29266..5690d7c 100644 --- a/simulation-kt/src/commonMain/kotlin/GeneratingTimeline.kt +++ b/simulation-kt/src/commonMain/kotlin/GeneratingTimeline.kt @@ -1,10 +1,11 @@ package space.kscience.simulation -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.* import kotlinx.datetime.Instant +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext import kotlin.time.Duration /** @@ -21,11 +22,11 @@ public fun Flow.withTimeThreshold( * @param lookaheadInterval an interval for generated events ahead of the last observed event. */ public class GeneratingTimeline( - private val generationScope: CoroutineScope, private val origin: E, private val lookaheadInterval: Duration, + coroutineContext: CoroutineContext = EmptyCoroutineContext, private val generator: suspend FlowCollector.(E) -> Unit -) : AbstractTimeline(generationScope, origin.time) { +) : AbstractTimeline(origin.time, coroutineContext) { private val startEventFlow = MutableStateFlow(origin) @@ -46,11 +47,11 @@ public class GeneratingTimeline( }.withTimeThreshold( threshold = time.map { it + lookaheadInterval } ).buffer(Channel.UNLIMITED).mapNotNull { - //it.event + //a barrier to avoid leaking stale events after interruption from buffer it.takeIf { it.origin == startEventFlow.value }?.event }.shareIn( - scope = generationScope, - started = SharingStarted.Eagerly, + scope = timelineScope, + started = SharingStarted.Lazily, ) override fun events(): Flow = events diff --git a/simulation-kt/src/commonMain/kotlin/MergedTimeline.kt b/simulation-kt/src/commonMain/kotlin/MergedTimeline.kt index 334e8c7..0da6227 100644 --- a/simulation-kt/src/commonMain/kotlin/MergedTimeline.kt +++ b/simulation-kt/src/commonMain/kotlin/MergedTimeline.kt @@ -1,14 +1,15 @@ package space.kscience.simulation -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext public class MergedTimeline( - timelineScope: CoroutineScope, - private val timelines: List> -) : AbstractTimeline(timelineScope, timelines.minOf { it.time.value }) { + private val timelines: List>, + coroutineContext: CoroutineContext = EmptyCoroutineContext +) : AbstractTimeline(timelines.minOf { it.time.value }, coroutineContext) { override fun events(): Flow = flow { val buffer = TODO() diff --git a/simulation-kt/src/commonMain/kotlin/SharedTimeline.kt b/simulation-kt/src/commonMain/kotlin/SharedTimeline.kt index d9962d8..1b45002 100644 --- a/simulation-kt/src/commonMain/kotlin/SharedTimeline.kt +++ b/simulation-kt/src/commonMain/kotlin/SharedTimeline.kt @@ -1,18 +1,19 @@ package space.kscience.simulation -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.datetime.Instant +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext /** * A manually mutable [Timeline] that could be modified via [emit] method by multiple */ public class SharedTimeline( - timelineScope: CoroutineScope, - startTime: Instant -) : AbstractTimeline(timelineScope, startTime) { + startTime: Instant, + coroutineContext: CoroutineContext = EmptyCoroutineContext +) : AbstractTimeline(startTime, coroutineContext) { private val events = MutableSharedFlow(replay = Channel.UNLIMITED) diff --git a/simulation-kt/src/commonMain/kotlin/Timeline.kt b/simulation-kt/src/commonMain/kotlin/Timeline.kt index 27219cd..20a0746 100644 --- a/simulation-kt/src/commonMain/kotlin/Timeline.kt +++ b/simulation-kt/src/commonMain/kotlin/Timeline.kt @@ -34,7 +34,10 @@ public interface TimelineObserver : AutoCloseable { public suspend fun collect(upTo: Instant = Instant.DISTANT_FUTURE) } -public suspend fun TimelineObserver.collect(duration: Duration) = collect(time.value + duration) +/** + * Collect events for a fixed [duration] since last observed time + */ +public suspend fun TimelineObserver.collect(duration: Duration): Unit = collect(time.value + duration) /** * A time-ordered sequence of events of type [E]. There time of events is strictly monotonic, meaning that the time of @@ -66,11 +69,13 @@ public interface Timeline { * This method suspends until all advancement is done */ public suspend fun advance(toTime: Instant) - -// /** -// * Interrupt generation of this timeline and discard unconsumed events after [atTime]. -// * -// * Throw exception if at least one observer advanced -// */ -// public suspend fun interrupt(atTime: Instant): Unit } + +/** + * Perform [collector] action on each event + */ +public suspend fun Timeline.observeEach( + collector: suspend (E) -> Unit +): TimelineObserver = observe { + collect(collector) +} \ No newline at end of file diff --git a/simulation-kt/src/commonTest/kotlin/TimelineTests.kt b/simulation-kt/src/commonTest/kotlin/TimelineTests.kt index a052d64..b9f53dd 100644 --- a/simulation-kt/src/commonTest/kotlin/TimelineTests.kt +++ b/simulation-kt/src/commonTest/kotlin/TimelineTests.kt @@ -11,40 +11,38 @@ class TimelineTests { @Test - fun testGeneration() = runTest{ + fun testGeneration() = runTest(timeout = 1.seconds) { val startTime = Instant.parse("2020-01-01T00:00:00.000Z") - val generation = GeneratingTimeline>( - this, - origin = SimpleTimelineEvent(startTime, List(10) { it.toDouble() }.toDoubleArray()), + val generation = GeneratingTimeline( + origin = SimpleTimelineEvent(startTime, Unit), lookaheadInterval = 1.seconds ) { event -> var time = event.time while (isActive) { time += 0.1.seconds println("Emit: ${time - startTime}") - emit(SimpleTimelineEvent(time, event.value.map { it + 1.0 }.toDoubleArray())) + emit(SimpleTimelineEvent(time, Unit)) } } val result = mutableListOf() - val collector = generation.observe { - collect { - println("Consume: ${it.time - startTime}") - result.add(it.time - startTime) - } + val observer = generation.observeEach { + println("Consume: ${it.time - startTime}") + result.add(it.time - startTime) } - collector.collect(2.seconds) + observer.collect(2.seconds) println("First collection complete") - collector.collect(2.seconds) + observer.collect(2.seconds) println("Second collection complete") println("Interrupt") - generation.interrupt(SimpleTimelineEvent(startTime + 6.seconds, List(10) { it.toDouble() }.toDoubleArray())) - println("Collecting second") - collector.collect(startTime + 6.seconds + 2.5.seconds) + generation.interrupt(SimpleTimelineEvent(startTime + 6.seconds, Unit)) + println("Collecting after interruption") + observer.collect(startTime + 6.seconds + 2.5.seconds) println(result) - collector.close() + generation.close() + } } \ No newline at end of file