Simulation initial commit

This commit is contained in:
Alexander Nozik 2024-11-13 17:11:53 +03:00
parent 89d78c43bb
commit 284f9feb93
6 changed files with 340 additions and 0 deletions

View File

@ -52,6 +52,7 @@ dependencyResolutionManagement {
} }
include( include(
":simulation-kt",
":controls-core", ":controls-core",
":controls-ports-ktor", ":controls-ports-ktor",
":controls-serial", ":controls-serial",

View File

@ -0,0 +1,32 @@
import space.kscience.gradle.Maturity
plugins {
id("space.kscience.gradle.mpp")
`maven-publish`
}
description = """
Core interfaces for building a device server
""".trimIndent()
kscience {
jvm()
js()
native()
wasm()
useCoroutines()
useContextReceivers()
commonMain {
api(spclibs.kotlinx.datetime)
}
jvmTest{
implementation(spclibs.logback.classic)
}
}
readme{
maturity = Maturity.EXPERIMENTAL
}

View File

@ -0,0 +1,96 @@
package space.kscience.simulation
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Instant
import kotlin.time.Duration
/**
* @param lookaheadInterval an interval for generated events ahead of the last observed event.
*/
public class GeneratingTimeline<E : TimelineEvent>(
private val generationScope: CoroutineScope,
private val initialEvent: E,
private val lookaheadInterval: Duration,
private val generatorChain: suspend (E) -> E
) : Timeline<E> {
private val mutex = Mutex()
private val events = ArrayDeque<E>()
private val observers: MutableSet<TimelineObserver> = mutableSetOf()
override val lastEventTime: Instant?
get() = events.lastOrNull()?.time
override val observedTime: Instant
get() = observers.minOfOrNull { it.lastCollectedEventTime } ?: Instant.DISTANT_PAST
override fun flowUnobservedEvents(): Flow<E> = events.asFlow()
override suspend fun advance(toTime: Instant) {
observers.forEach {
it.collect(toTime)
}
}
private var generatorJob: Job = launchGenerateJob(initialEvent)
private fun launchGenerateJob(event: E): Job = generationScope.launch {
var currentEvent = event
while(currentEvent.time < observedTime + lookaheadInterval) {
val nextEvent = generatorChain(currentEvent)
mutex.withLock {
events.add(nextEvent)
}
currentEvent = nextEvent
}
}
private fun regenerate(event: E) {
generatorJob.cancel()
generatorJob = launchGenerateJob(event)
}
/**
* Discard unconsumed events after [atTime].
*/
override suspend fun interrupt(atTime: Instant): Unit {
if (atTime < observedTime)
error("Timeline interrupt at time $atTime is not possible because there are observed events before $observedTime")
mutex.withLock {
while (events.isNotEmpty() && events.last().time > atTime) {
events.removeLast()
}
}
}
override suspend fun observe(collector: suspend Flow<E>.() -> Unit): TimelineObserver {
val observer = object : TimelineObserver {
val observerMutex = Mutex()
override var lastCollectedEventTime: Instant = Instant.DISTANT_PAST
override suspend fun collect(upTo: Instant) = observerMutex.withLock {
flowUnobservedEvents().takeWhile { it.time <= upTo }.onEach {
lastCollectedEventTime = it.time
}.collector()
//cleanup()
}
override fun close() {
observers.remove(this)
}
}
observers.add(observer)
return observer
}
}

View File

@ -0,0 +1,49 @@
package space.kscience.simulation
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
import kotlinx.datetime.Instant
public class MergedTimeline<E : TimelineEvent>(
private val timelines: List<Timeline<E>>
) : Timeline<E> {
override val lastEventTime: Instant?
get() = timelines.minOfOrNull { it.lastEventTime ?: Instant.DISTANT_PAST }
override val observedTime: Instant
get() = timelines.maxOfOrNull { it.observedTime } ?: Instant.DISTANT_FUTURE
override fun flowUnobservedEvents(): Flow<E> = timelines.map { flowUnobservedEvents() }.merge()
override suspend fun advance(toTime: Instant) {
timelines.forEach { it.advance(toTime) }
}
override suspend fun interrupt(atTime: Instant) {
timelines.forEach { it.interrupt(atTime) }
}
private val observers: MutableSet<TimelineObserver> = mutableSetOf()
override suspend fun observe(collector: suspend Flow<E>.() -> Unit): TimelineObserver {
val observer = object : TimelineObserver {
override var lastCollectedEventTime: Instant = Instant.DISTANT_PAST
override suspend fun collect(upTo: Instant) = timelines
.map { flowUnobservedEvents() }
.merge()
.takeWhile { it.time <= upTo }.onEach {
lastCollectedEventTime = it.time
}.collector()
override fun close() {
observers.remove(this)
}
}
observers.add(observer)
return observer
}
}

View File

@ -0,0 +1,86 @@
package space.kscience.simulation
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Instant
/**
* A manually mutable [Timeline] that could be modified via [emit] method by multiple
*/
public class SharedTimeline<E : TimelineEvent> : Timeline<E> {
private val mutex = Mutex()
private val events = ArrayDeque<E>()
private val observers: MutableSet<TimelineObserver> = mutableSetOf()
override val lastEventTime: Instant?
get() = events.lastOrNull()?.time
override val observedTime: Instant
get() = observers.minOfOrNull { it.lastCollectedEventTime } ?: Instant.DISTANT_PAST
override fun flowUnobservedEvents(): Flow<E> = events.asFlow()
/**
* Emit new event to the timeline
*/
public suspend fun emit(event: E): Boolean = mutex.withLock {
if (event.time < observedTime) error("Can't emit event $event because there are observed events after $observedTime")
events.add(event)
}
override suspend fun advance(toTime: Instant) {
observers.forEach {
it.collect(toTime)
}
}
/**
* Discard all events before [observedTime]
*/
private suspend fun cleanup(): Unit {
val threshold = observedTime
while (events.isNotEmpty() && events.last().time > threshold) {
events.removeFirst()
}
}
/**
* Discard unconsumed events after [atTime].
*/
override suspend fun interrupt(atTime: Instant): Unit = mutex.withLock {
val threshold = observedTime
if (atTime < threshold)
error("Timeline interrupt at time $atTime is not possible because there are observed events before $threshold")
while (events.isNotEmpty() && events.last().time > atTime) {
events.removeLast()
}
}
override suspend fun observe(collector: suspend Flow<E>.() -> Unit): TimelineObserver {
val observer = object : TimelineObserver {
val observerMutex = Mutex()
override var lastCollectedEventTime: Instant = Instant.DISTANT_PAST
override suspend fun collect(upTo: Instant) = observerMutex.withLock {
flowUnobservedEvents().takeWhile { it.time <= upTo }.onEach {
lastCollectedEventTime = it.time
}.collector()
cleanup()
}
override fun close() {
observers.remove(this)
}
}
observers.add(observer)
return observer
}
}

View File

@ -0,0 +1,76 @@
package space.kscience.simulation
import kotlinx.coroutines.flow.Flow
import kotlinx.datetime.Instant
public interface TimelineEvent {
public val time: Instant
}
public interface TimelineObserver: AutoCloseable {
/**
* The time of the last event collected by this collector
*/
public val lastCollectedEventTime: Instant
/**
* Collect all uncollected events from [lastCollectedEventTime] to [upTo].
*
* By default, collects all events.
*/
public suspend fun collect(upTo: Instant = Instant.DISTANT_FUTURE)
}
/**
* A time-ordered sequence of events of type [E]. There time of events is strictly monotonic, meaning that the time of
* the next event is greater than the previous event time.
*
* Timeline guarantees that all collectors could read all events when they need. Meaning that all unread events are cached.
*
* Timeline guarantees that already read events won't change, but unread events could change.
*/
public interface Timeline<E : TimelineEvent> {
/**
* The timestamp of the last event in a timeline
*/
public val lastEventTime: Instant?
/**
* The time of the last event that was observed by all observers
*/
public val observedTime: Instant
/**
* Flow events from [observedTime] to [lastEventTime].
*
* The resulting flow is finite and should not suspend.
*
* This method does not affect [observedTime].
*/
public fun flowUnobservedEvents(): Flow<E>
/**
* Attach observer to this [Timeline]. The observer collection is not triggered right away, but only on demand.
*
* Each collection shifts [TimelineObserver.lastCollectedEventTime] for this observer.
* The value of [observedTime] is the least of all observers [TimelineObserver.lastCollectedEventTime].
*/
public suspend fun observe(
collector: suspend Flow<E>.() -> Unit
): TimelineObserver
/**
* Advance simulation time to [toTime]. This method forces all observers to collect all events in the given range.
*
* This method suspends until all advancement is done
*/
public suspend fun advance(toTime: Instant)
/**
* Interrupt generation of this timeline and discard unconsumed events after [atTime].
*
* Throw exception if at least one observer advanced
*/
public suspend fun interrupt(atTime: Instant): Unit
}