diff --git a/settings.gradle.kts b/settings.gradle.kts index 7e84c71..702ce9b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -52,6 +52,7 @@ dependencyResolutionManagement { } include( + ":simulation-kt", ":controls-core", ":controls-ports-ktor", ":controls-serial", diff --git a/simulation-kt/build.gradle.kts b/simulation-kt/build.gradle.kts new file mode 100644 index 0000000..c30480a --- /dev/null +++ b/simulation-kt/build.gradle.kts @@ -0,0 +1,32 @@ +import space.kscience.gradle.Maturity + +plugins { + id("space.kscience.gradle.mpp") + `maven-publish` +} + +description = """ + Core interfaces for building a device server +""".trimIndent() + +kscience { + jvm() + js() + native() + wasm() + useCoroutines() + useContextReceivers() + + commonMain { + api(spclibs.kotlinx.datetime) + } + + jvmTest{ + implementation(spclibs.logback.classic) + } +} + + +readme{ + maturity = Maturity.EXPERIMENTAL +} \ No newline at end of file diff --git a/simulation-kt/src/commonMain/kotlin/GeneratingTimeline.kt b/simulation-kt/src/commonMain/kotlin/GeneratingTimeline.kt new file mode 100644 index 0000000..4e9da50 --- /dev/null +++ b/simulation-kt/src/commonMain/kotlin/GeneratingTimeline.kt @@ -0,0 +1,96 @@ +package space.kscience.simulation + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.takeWhile +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.datetime.Instant +import kotlin.time.Duration + +/** + * @param lookaheadInterval an interval for generated events ahead of the last observed event. + */ +public class GeneratingTimeline( + private val generationScope: CoroutineScope, + private val initialEvent: E, + private val lookaheadInterval: Duration, + private val generatorChain: suspend (E) -> E +) : Timeline { + + private val mutex = Mutex() + + private val events = ArrayDeque() + + private val observers: MutableSet = mutableSetOf() + + override val lastEventTime: Instant? + get() = events.lastOrNull()?.time + + override val observedTime: Instant + get() = observers.minOfOrNull { it.lastCollectedEventTime } ?: Instant.DISTANT_PAST + + override fun flowUnobservedEvents(): Flow = events.asFlow() + + override suspend fun advance(toTime: Instant) { + observers.forEach { + it.collect(toTime) + } + } + + private var generatorJob: Job = launchGenerateJob(initialEvent) + + private fun launchGenerateJob(event: E): Job = generationScope.launch { + var currentEvent = event + while(currentEvent.time < observedTime + lookaheadInterval) { + val nextEvent = generatorChain(currentEvent) + mutex.withLock { + events.add(nextEvent) + } + currentEvent = nextEvent + } + } + + private fun regenerate(event: E) { + generatorJob.cancel() + generatorJob = launchGenerateJob(event) + } + + /** + * Discard unconsumed events after [atTime]. + */ + override suspend fun interrupt(atTime: Instant): Unit { + if (atTime < observedTime) + error("Timeline interrupt at time $atTime is not possible because there are observed events before $observedTime") + mutex.withLock { + while (events.isNotEmpty() && events.last().time > atTime) { + events.removeLast() + } + } + } + + override suspend fun observe(collector: suspend Flow.() -> Unit): TimelineObserver { + val observer = object : TimelineObserver { + val observerMutex = Mutex() + override var lastCollectedEventTime: Instant = Instant.DISTANT_PAST + + override suspend fun collect(upTo: Instant) = observerMutex.withLock { + flowUnobservedEvents().takeWhile { it.time <= upTo }.onEach { + lastCollectedEventTime = it.time + }.collector() + //cleanup() + } + + override fun close() { + observers.remove(this) + } + + } + observers.add(observer) + return observer + } +} \ No newline at end of file diff --git a/simulation-kt/src/commonMain/kotlin/MergedTimeline.kt b/simulation-kt/src/commonMain/kotlin/MergedTimeline.kt new file mode 100644 index 0000000..991cdc4 --- /dev/null +++ b/simulation-kt/src/commonMain/kotlin/MergedTimeline.kt @@ -0,0 +1,49 @@ +package space.kscience.simulation + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.merge +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.takeWhile +import kotlinx.datetime.Instant + +public class MergedTimeline( + private val timelines: List> +) : Timeline { + override val lastEventTime: Instant? + get() = timelines.minOfOrNull { it.lastEventTime ?: Instant.DISTANT_PAST } + override val observedTime: Instant + get() = timelines.maxOfOrNull { it.observedTime } ?: Instant.DISTANT_FUTURE + + override fun flowUnobservedEvents(): Flow = timelines.map { flowUnobservedEvents() }.merge() + + override suspend fun advance(toTime: Instant) { + timelines.forEach { it.advance(toTime) } + } + + override suspend fun interrupt(atTime: Instant) { + timelines.forEach { it.interrupt(atTime) } + } + + private val observers: MutableSet = mutableSetOf() + + override suspend fun observe(collector: suspend Flow.() -> Unit): TimelineObserver { + val observer = object : TimelineObserver { + override var lastCollectedEventTime: Instant = Instant.DISTANT_PAST + + override suspend fun collect(upTo: Instant) = timelines + .map { flowUnobservedEvents() } + .merge() + .takeWhile { it.time <= upTo }.onEach { + lastCollectedEventTime = it.time + }.collector() + + + override fun close() { + observers.remove(this) + } + + } + observers.add(observer) + return observer + } +} \ No newline at end of file diff --git a/simulation-kt/src/commonMain/kotlin/SharedTimeline.kt b/simulation-kt/src/commonMain/kotlin/SharedTimeline.kt new file mode 100644 index 0000000..067d4d2 --- /dev/null +++ b/simulation-kt/src/commonMain/kotlin/SharedTimeline.kt @@ -0,0 +1,86 @@ +package space.kscience.simulation + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.takeWhile +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.datetime.Instant + +/** + * A manually mutable [Timeline] that could be modified via [emit] method by multiple + */ +public class SharedTimeline : Timeline { + + private val mutex = Mutex() + + private val events = ArrayDeque() + + private val observers: MutableSet = mutableSetOf() + + override val lastEventTime: Instant? + get() = events.lastOrNull()?.time + + override val observedTime: Instant + get() = observers.minOfOrNull { it.lastCollectedEventTime } ?: Instant.DISTANT_PAST + + override fun flowUnobservedEvents(): Flow = events.asFlow() + + /** + * Emit new event to the timeline + */ + public suspend fun emit(event: E): Boolean = mutex.withLock { + if (event.time < observedTime) error("Can't emit event $event because there are observed events after $observedTime") + events.add(event) + } + + override suspend fun advance(toTime: Instant) { + observers.forEach { + it.collect(toTime) + } + } + + /** + * Discard all events before [observedTime] + */ + private suspend fun cleanup(): Unit { + val threshold = observedTime + while (events.isNotEmpty() && events.last().time > threshold) { + events.removeFirst() + } + } + + /** + * Discard unconsumed events after [atTime]. + */ + override suspend fun interrupt(atTime: Instant): Unit = mutex.withLock { + val threshold = observedTime + if (atTime < threshold) + error("Timeline interrupt at time $atTime is not possible because there are observed events before $threshold") + while (events.isNotEmpty() && events.last().time > atTime) { + events.removeLast() + } + } + + override suspend fun observe(collector: suspend Flow.() -> Unit): TimelineObserver { + val observer = object : TimelineObserver { + val observerMutex = Mutex() + override var lastCollectedEventTime: Instant = Instant.DISTANT_PAST + + override suspend fun collect(upTo: Instant) = observerMutex.withLock { + flowUnobservedEvents().takeWhile { it.time <= upTo }.onEach { + lastCollectedEventTime = it.time + }.collector() + cleanup() + } + + override fun close() { + observers.remove(this) + } + + } + observers.add(observer) + return observer + } +} \ No newline at end of file diff --git a/simulation-kt/src/commonMain/kotlin/Timeline.kt b/simulation-kt/src/commonMain/kotlin/Timeline.kt new file mode 100644 index 0000000..f9f4535 --- /dev/null +++ b/simulation-kt/src/commonMain/kotlin/Timeline.kt @@ -0,0 +1,76 @@ +package space.kscience.simulation + +import kotlinx.coroutines.flow.Flow +import kotlinx.datetime.Instant + + +public interface TimelineEvent { + public val time: Instant +} + +public interface TimelineObserver: AutoCloseable { + /** + * The time of the last event collected by this collector + */ + public val lastCollectedEventTime: Instant + + /** + * Collect all uncollected events from [lastCollectedEventTime] to [upTo]. + * + * By default, collects all events. + */ + public suspend fun collect(upTo: Instant = Instant.DISTANT_FUTURE) +} + +/** + * A time-ordered sequence of events of type [E]. There time of events is strictly monotonic, meaning that the time of + * the next event is greater than the previous event time. + * + * Timeline guarantees that all collectors could read all events when they need. Meaning that all unread events are cached. + * + * Timeline guarantees that already read events won't change, but unread events could change. + */ +public interface Timeline { + /** + * The timestamp of the last event in a timeline + */ + public val lastEventTime: Instant? + + /** + * The time of the last event that was observed by all observers + */ + public val observedTime: Instant + + /** + * Flow events from [observedTime] to [lastEventTime]. + * + * The resulting flow is finite and should not suspend. + * + * This method does not affect [observedTime]. + */ + public fun flowUnobservedEvents(): Flow + + /** + * Attach observer to this [Timeline]. The observer collection is not triggered right away, but only on demand. + * + * Each collection shifts [TimelineObserver.lastCollectedEventTime] for this observer. + * The value of [observedTime] is the least of all observers [TimelineObserver.lastCollectedEventTime]. + */ + public suspend fun observe( + collector: suspend Flow.() -> Unit + ): TimelineObserver + + /** + * Advance simulation time to [toTime]. This method forces all observers to collect all events in the given range. + * + * This method suspends until all advancement is done + */ + public suspend fun advance(toTime: Instant) + + /** + * Interrupt generation of this timeline and discard unconsumed events after [atTime]. + * + * Throw exception if at least one observer advanced + */ + public suspend fun interrupt(atTime: Instant): Unit +} \ No newline at end of file