diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 48c0a02..81aa1c0 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.11.1-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/settings.gradle.kts b/settings.gradle.kts index 702ce9b..1c6b8b2 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -21,6 +21,10 @@ pluginManagement { } } +plugins { + id("org.gradle.toolchains.foojay-resolver-convention") version "0.8.0" +} + dependencyResolutionManagement { val toolsVersion: String by extra diff --git a/simulation-kt/build.gradle.kts b/simulation-kt/build.gradle.kts index c30480a..b365d98 100644 --- a/simulation-kt/build.gradle.kts +++ b/simulation-kt/build.gradle.kts @@ -28,5 +28,5 @@ kscience { readme{ - maturity = Maturity.EXPERIMENTAL + maturity = Maturity.PROTOTYPE } \ No newline at end of file diff --git a/simulation-kt/src/commonMain/kotlin/AbstractTimeline.kt b/simulation-kt/src/commonMain/kotlin/AbstractTimeline.kt new file mode 100644 index 0000000..9876083 --- /dev/null +++ b/simulation-kt/src/commonMain/kotlin/AbstractTimeline.kt @@ -0,0 +1,70 @@ +package space.kscience.simulation + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.launch +import kotlinx.datetime.Instant + +public abstract class AbstractTimeline( + protected val timelineScope: CoroutineScope, + protected var startTime: Instant, +) : Timeline { + + private val observers: MutableSet = mutableSetOf() + + private val feedbackChannel = Channel(onBufferOverflow = BufferOverflow.DROP_OLDEST) + + override val time: StateFlow = feedbackChannel.consumeAsFlow().map { + maxOf(startTime,observers.maxOfOrNull { it.time.value } ?: startTime) + }.stateIn(timelineScope, SharingStarted.Lazily, startTime) + + override suspend fun advance(toTime: Instant) { + observers.forEach { + it.collect(toTime) + } + } + + /** + * Flow unobserved events starting at [time]. The flow could be interrupted if timeline changes + */ + protected abstract fun events(): Flow + + override suspend fun observe(collector: suspend Flow.() -> Unit): TimelineObserver { + val context = currentCoroutineContext() + val observer = object : TimelineObserver { + // observed time + override val time = MutableStateFlow(startTime) + + private val channel = Channel() + + private val collectJob = timelineScope.launch(context) { + channel.consumeAsFlow().onEach { + time.emit(it.time) + feedbackChannel.send(Unit) + }.collector() + } + + + override suspend fun collect(upTo: Instant) = coroutineScope { + require(upTo >= time.value) { "Requested time $upTo is lower than observed ${time.value}" } + events().takeWhile { + it.time <= upTo + }.collect { + channel.send(it) + } + } + + override fun close() { + collectJob.cancel() + observers.remove(this) + } + + } + observers.add(observer) + return observer + } +} \ No newline at end of file diff --git a/simulation-kt/src/commonMain/kotlin/GeneratingTimeline.kt b/simulation-kt/src/commonMain/kotlin/GeneratingTimeline.kt index f3d7d92..6f29266 100644 --- a/simulation-kt/src/commonMain/kotlin/GeneratingTimeline.kt +++ b/simulation-kt/src/commonMain/kotlin/GeneratingTimeline.kt @@ -1,132 +1,65 @@ package space.kscience.simulation import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.* -import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock import kotlinx.datetime.Instant import kotlin.time.Duration +/** + * Suspend the collection of this [Flow] until event time is lower that threshold + */ +public fun Flow.withTimeThreshold( + threshold: Flow +): Flow = transform { event -> + threshold.first { it > event.time } + emit(event) +} + /** * @param lookaheadInterval an interval for generated events ahead of the last observed event. */ public class GeneratingTimeline( private val generationScope: CoroutineScope, - private val initialEvent: E, + private val origin: E, private val lookaheadInterval: Duration, - private val generatorChain: suspend (E) -> E -) : Timeline, AutoCloseable { + private val generator: suspend FlowCollector.(E) -> Unit +) : AbstractTimeline(generationScope, origin.time) { - // push to this channel to trigger event generation - private val wakeupChannel = Channel(onBufferOverflow = BufferOverflow.DROP_OLDEST) + private val startEventFlow = MutableStateFlow(origin) - private suspend fun kickGenerator() { - wakeupChannel.send(Unit) + private data class EventWithOrigin(val origin: E, val event: E) : TimelineEvent { + override val time: Instant get() = event.time } - private val mutex = Mutex() - - private val history = ArrayDeque() - - private val lastEvent = MutableSharedFlow(replay = Int.MAX_VALUE) - - private val updateHistoryJob = generationScope.launch { - lastEvent.onEach { - mutex.withLock { - history.add(it) - //cleanup old events - val threshold = observedTime ?: return@withLock - while (history.isNotEmpty() && history.last().time > threshold) { - history.removeFirst() - } - + private val events: SharedFlow = flow { + coroutineScope { + startEventFlow.collect { startEvent -> + emitAll( + flow { generator(startEvent) }.takeWhile { startEvent == startEventFlow.value }.map { + EventWithOrigin(startEvent, it) + } + ) } } - } - - private val observers: MutableSet = mutableSetOf() - - override val time: Instant - get() = history.lastOrNull()?.time ?: initialEvent.time - - override val observedTime: Instant? - get() = observers.minOfNotNullOrNull { it.time } - - override fun flowUnobservedEvents(): Flow = flow { - history.forEach { e -> - emit(e) - } - emitAll(lastEvent) - } - - override suspend fun advance(toTime: Instant) { - observers.forEach { - it.collect(toTime) - } - } - - private var generatorJob: Job = launchGenerator(initialEvent) - - private fun launchGenerator(event: E): Job = generationScope.launch { - kickGenerator() - var currentEvent = event - // for each wakeup generate all events in lookaheadInterval - for (u in wakeupChannel) { - while (currentEvent.time < (observedTime ?: event.time) + lookaheadInterval) { - val nextEvent = generatorChain(currentEvent) - lastEvent.emit(nextEvent) - currentEvent = nextEvent - } - } - } + }.withTimeThreshold( + threshold = time.map { it + lookaheadInterval } + ).buffer(Channel.UNLIMITED).mapNotNull { + //it.event + it.takeIf { it.origin == startEventFlow.value }?.event + }.shareIn( + scope = generationScope, + started = SharingStarted.Eagerly, + ) + override fun events(): Flow = events public suspend fun interrupt(newStart: E) { - check(newStart.time > (observedTime ?: Instant.DISTANT_FUTURE)) { + check(newStart.time >= time.value) { "Can't interrupt generating timeline after observed event" } - mutex.withLock { - while (history.isNotEmpty() && history.last().time > newStart.time) { - history.removeLast() - } - generatorJob.cancel() - generatorJob = launchGenerator(newStart) - - } - kickGenerator() - } - - override fun close() { - updateHistoryJob.cancel() - generatorJob.cancel() - } - - override suspend fun observe(collector: suspend Flow.() -> Unit): TimelineObserver { - val observer = object : TimelineObserver { - override var time: Instant = this@GeneratingTimeline.time - - override suspend fun collect(upTo: Instant) { - flowUnobservedEvents().takeWhile { - it.time <= upTo - }.onEach { - time = it.time - kickGenerator() - }.collector() - } - - override fun close() { - observers.remove(this) - if(observers.isEmpty()){ - this@GeneratingTimeline.close() - } - } - - } - observers.add(observer) - return observer + startTime = newStart.time + startEventFlow.emit(newStart) } } \ No newline at end of file diff --git a/simulation-kt/src/commonMain/kotlin/MergedTimeline.kt b/simulation-kt/src/commonMain/kotlin/MergedTimeline.kt index c498bb8..334e8c7 100644 --- a/simulation-kt/src/commonMain/kotlin/MergedTimeline.kt +++ b/simulation-kt/src/commonMain/kotlin/MergedTimeline.kt @@ -1,51 +1,25 @@ package space.kscience.simulation +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.merge -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.flow.takeWhile -import kotlinx.datetime.Instant +import kotlinx.coroutines.flow.flow public class MergedTimeline( + timelineScope: CoroutineScope, private val timelines: List> -) : Timeline { - override val time: Instant - get() = timelines.minOfNotNullOrNull { it.time } ?: Instant.DISTANT_PAST +) : AbstractTimeline(timelineScope, timelines.minOf { it.time.value }) { - override val observedTime: Instant? - get() = timelines.maxOfNotNullOrNull { it.observedTime } - - override fun flowUnobservedEvents(): Flow = timelines.map { flowUnobservedEvents() }.merge() - - override suspend fun advance(toTime: Instant) { - timelines.forEach { it.advance(toTime) } + override fun events(): Flow = flow { + val buffer = TODO() +// +// timelines.forEach { timeline -> +// timeline.observe { +// collect{ +// buffer.add(it) +// } +// } +// } } -// override suspend fun interrupt(atTime: Instant) { -// timelines.forEach { it.interrupt(atTime) } -// } - - private val observers: MutableSet = mutableSetOf() - - override suspend fun observe(collector: suspend Flow.() -> Unit): TimelineObserver { - val observer = object : TimelineObserver { - override var time: Instant = this@MergedTimeline.time - - override suspend fun collect(upTo: Instant) = timelines - .map { flowUnobservedEvents() } - .merge() - .takeWhile { it.time <= upTo }.onEach { - time = it.time - }.collector() - - - override fun close() { - observers.remove(this) - } - - } - observers.add(observer) - return observer - } } \ No newline at end of file diff --git a/simulation-kt/src/commonMain/kotlin/SharedTimeline.kt b/simulation-kt/src/commonMain/kotlin/SharedTimeline.kt index b5e4845..d9962d8 100644 --- a/simulation-kt/src/commonMain/kotlin/SharedTimeline.kt +++ b/simulation-kt/src/commonMain/kotlin/SharedTimeline.kt @@ -1,88 +1,30 @@ package space.kscience.simulation +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.asFlow -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.flow.takeWhile -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.datetime.Instant /** * A manually mutable [Timeline] that could be modified via [emit] method by multiple */ -public class SharedTimeline : Timeline { +public class SharedTimeline( + timelineScope: CoroutineScope, + startTime: Instant +) : AbstractTimeline(timelineScope, startTime) { - private val mutex = Mutex() + private val events = MutableSharedFlow(replay = Channel.UNLIMITED) - private val events = ArrayDeque() - - private val observers: MutableSet = mutableSetOf() - - override val time: Instant - get() = events.lastOrNull()?.time ?: Instant.DISTANT_PAST - - override val observedTime: Instant? - get() = observers.minOfNotNullOrNull { it.time } - - override fun flowUnobservedEvents(): Flow = events.asFlow() + override fun events(): Flow = events /** * Emit new event to the timeline */ - public suspend fun emit(event: E): Boolean = mutex.withLock { - if (event.time < (observedTime ?: Instant.DISTANT_PAST)) { - error("Can't emit event $event because there are observed events after $observedTime") + public suspend fun emit(event: E) { + if (event.time < (events.replayCache.lastOrNull()?.time ?: time.value)) { + error("Can't emit event $event because timeline monotony is broken") } - events.add(event) - } - - override suspend fun advance(toTime: Instant) { - observers.forEach { - it.collect(toTime) - } - } - - /** - * Discard all events before [observedTime] - */ - private suspend fun cleanup(): Unit = mutex.withLock { - val threshold = observedTime ?: return@withLock - while (events.isNotEmpty() && events.last().time > threshold) { - events.removeFirst() - } - } - -// /** -// * Discard unconsumed events after [atTime]. -// */ -// override suspend fun interrupt(atTime: Instant): Unit = mutex.withLock { -// val threshold = observedTime -// if (atTime < threshold) -// error("Timeline interrupt at time $atTime is not possible because there are observed events before $threshold") -// while (events.isNotEmpty() && events.last().time > atTime) { -// events.removeLast() -// } -// } - - override suspend fun observe(collector: suspend Flow.() -> Unit): TimelineObserver { - val observer = object : TimelineObserver { - val observerMutex = Mutex() - override var time: Instant = this@SharedTimeline.time - - override suspend fun collect(upTo: Instant) = observerMutex.withLock { - flowUnobservedEvents().takeWhile { it.time <= upTo }.onEach { - time = it.time - }.collector() - cleanup() - } - - override fun close() { - observers.remove(this) - } - - } - observers.add(observer) - return observer + events.emit(event) } } \ No newline at end of file diff --git a/simulation-kt/src/commonMain/kotlin/Timeline.kt b/simulation-kt/src/commonMain/kotlin/Timeline.kt index 64974ff..27219cd 100644 --- a/simulation-kt/src/commonMain/kotlin/Timeline.kt +++ b/simulation-kt/src/commonMain/kotlin/Timeline.kt @@ -1,6 +1,7 @@ package space.kscience.simulation import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.StateFlow import kotlinx.datetime.Instant import kotlin.time.Duration @@ -21,9 +22,9 @@ public data class SimpleTimelineEvent(override val time: Instant, val value: public interface TimelineObserver : AutoCloseable { /** - * The subjective time of this observer + * The subjective time of this observer (last observed time) */ - public val time: Instant + public val time: StateFlow /** * Collect all uncollected events from [time] to [upTo]. @@ -33,6 +34,8 @@ public interface TimelineObserver : AutoCloseable { public suspend fun collect(upTo: Instant = Instant.DISTANT_FUTURE) } +public suspend fun TimelineObserver.collect(duration: Duration) = collect(time.value + duration) + /** * A time-ordered sequence of events of type [E]. There time of events is strictly monotonic, meaning that the time of * the next event is greater than the previous event time. @@ -43,29 +46,15 @@ public interface TimelineObserver : AutoCloseable { */ public interface Timeline { /** - * A subjective time of this timeline. The time could advance without events being produced. + * A subjective time of this timeline. The subjective time is the last observed time. */ - public val time: Instant + public val time: StateFlow - /** - * The time of the last event that was observed by all observers - */ - public val observedTime: Instant? - - /** - * Flow events from [observedTime] to [time]. - * - * The resulting flow is finite and should not suspend. - * - * This method does not affect [observedTime]. - */ - public fun flowUnobservedEvents(): Flow /** * Attach observer to this [Timeline]. The observer collection is not triggered right away, but only on demand. * * Each collection shifts [TimelineObserver.time] for this observer. - * The value of [observedTime] is the least of all observers [TimelineObserver.time]. */ public suspend fun observe( collector: suspend Flow.() -> Unit @@ -84,4 +73,4 @@ public interface Timeline { // * Throw exception if at least one observer advanced // */ // public suspend fun interrupt(atTime: Instant): Unit -} \ No newline at end of file +} diff --git a/simulation-kt/src/commonTest/kotlin/TimelineTests.kt b/simulation-kt/src/commonTest/kotlin/TimelineTests.kt index a1bf632..a052d64 100644 --- a/simulation-kt/src/commonTest/kotlin/TimelineTests.kt +++ b/simulation-kt/src/commonTest/kotlin/TimelineTests.kt @@ -1,34 +1,50 @@ package space.kscience.simulation +import kotlinx.coroutines.isActive import kotlinx.coroutines.test.runTest import kotlinx.datetime.Instant import kotlin.test.Test +import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds class TimelineTests { @Test - fun testGeneration() = runTest(timeout = 5.seconds) { + fun testGeneration() = runTest{ val startTime = Instant.parse("2020-01-01T00:00:00.000Z") val generation = GeneratingTimeline>( this, - initialEvent = SimpleTimelineEvent(startTime, List(10) { it.toDouble() }.toDoubleArray()), + origin = SimpleTimelineEvent(startTime, List(10) { it.toDouble() }.toDoubleArray()), lookaheadInterval = 1.seconds ) { event -> - val time = event.time + 0.1.seconds - println("Emit: $time") - SimpleTimelineEvent(time, event.value.map { it + 1.0 }.toDoubleArray()) - } - - val collector = generation.observe { - collect { - println("Consume: ${it.time}") + var time = event.time + while (isActive) { + time += 0.1.seconds + println("Emit: ${time - startTime}") + emit(SimpleTimelineEvent(time, event.value.map { it + 1.0 }.toDoubleArray())) } } - collector.collect(startTime + 2.seconds) + val result = mutableListOf() + + val collector = generation.observe { + collect { + println("Consume: ${it.time - startTime}") + result.add(it.time - startTime) + } + } + + collector.collect(2.seconds) + println("First collection complete") + collector.collect(2.seconds) + println("Second collection complete") + println("Interrupt") + generation.interrupt(SimpleTimelineEvent(startTime + 6.seconds, List(10) { it.toDouble() }.toDoubleArray())) + println("Collecting second") + collector.collect(startTime + 6.seconds + 2.5.seconds) + println(result) collector.close() } } \ No newline at end of file