GeneratingTimeLine prototype is working

This commit is contained in:
Alexander Nozik 2024-11-21 09:42:53 +03:00
parent 284f9feb93
commit becde94403
6 changed files with 201 additions and 81 deletions

View File

@ -2,10 +2,9 @@ package space.kscience.simulation
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
@ -20,21 +19,49 @@ public class GeneratingTimeline<E : TimelineEvent>(
private val initialEvent: E, private val initialEvent: E,
private val lookaheadInterval: Duration, private val lookaheadInterval: Duration,
private val generatorChain: suspend (E) -> E private val generatorChain: suspend (E) -> E
) : Timeline<E> { ) : Timeline<E>, AutoCloseable {
// push to this channel to trigger event generation
private val wakeupChannel = Channel<Unit>(onBufferOverflow = BufferOverflow.DROP_OLDEST)
private suspend fun kickGenerator() {
wakeupChannel.send(Unit)
}
private val mutex = Mutex() private val mutex = Mutex()
private val events = ArrayDeque<E>() private val history = ArrayDeque<E>()
private val lastEvent = MutableSharedFlow<E>(replay = Int.MAX_VALUE)
private val updateHistoryJob = generationScope.launch {
lastEvent.onEach {
mutex.withLock {
history.add(it)
//cleanup old events
val threshold = observedTime ?: return@withLock
while (history.isNotEmpty() && history.last().time > threshold) {
history.removeFirst()
}
}
}
}
private val observers: MutableSet<TimelineObserver> = mutableSetOf() private val observers: MutableSet<TimelineObserver> = mutableSetOf()
override val lastEventTime: Instant? override val time: Instant
get() = events.lastOrNull()?.time get() = history.lastOrNull()?.time ?: initialEvent.time
override val observedTime: Instant override val observedTime: Instant?
get() = observers.minOfOrNull { it.lastCollectedEventTime } ?: Instant.DISTANT_PAST get() = observers.minOfNotNullOrNull { it.time }
override fun flowUnobservedEvents(): Flow<E> = events.asFlow() override fun flowUnobservedEvents(): Flow<E> = flow {
history.forEach { e ->
emit(e)
}
emitAll(lastEvent)
}
override suspend fun advance(toTime: Instant) { override suspend fun advance(toTime: Instant) {
observers.forEach { observers.forEach {
@ -42,51 +69,60 @@ public class GeneratingTimeline<E : TimelineEvent>(
} }
} }
private var generatorJob: Job = launchGenerateJob(initialEvent) private var generatorJob: Job = launchGenerator(initialEvent)
private fun launchGenerateJob(event: E): Job = generationScope.launch { private fun launchGenerator(event: E): Job = generationScope.launch {
kickGenerator()
var currentEvent = event var currentEvent = event
while(currentEvent.time < observedTime + lookaheadInterval) { // for each wakeup generate all events in lookaheadInterval
for (u in wakeupChannel) {
while (currentEvent.time < (observedTime ?: event.time) + lookaheadInterval) {
val nextEvent = generatorChain(currentEvent) val nextEvent = generatorChain(currentEvent)
mutex.withLock { lastEvent.emit(nextEvent)
events.add(nextEvent)
}
currentEvent = nextEvent currentEvent = nextEvent
} }
} }
private fun regenerate(event: E) {
generatorJob.cancel()
generatorJob = launchGenerateJob(event)
} }
/**
* Discard unconsumed events after [atTime]. public suspend fun interrupt(newStart: E) {
*/ check(newStart.time > (observedTime ?: Instant.DISTANT_FUTURE)) {
override suspend fun interrupt(atTime: Instant): Unit { "Can't interrupt generating timeline after observed event"
if (atTime < observedTime) }
error("Timeline interrupt at time $atTime is not possible because there are observed events before $observedTime")
mutex.withLock { mutex.withLock {
while (events.isNotEmpty() && events.last().time > atTime) { while (history.isNotEmpty() && history.last().time > newStart.time) {
events.removeLast() history.removeLast()
} }
generatorJob.cancel()
generatorJob = launchGenerator(newStart)
} }
kickGenerator()
}
override fun close() {
updateHistoryJob.cancel()
generatorJob.cancel()
} }
override suspend fun observe(collector: suspend Flow<E>.() -> Unit): TimelineObserver { override suspend fun observe(collector: suspend Flow<E>.() -> Unit): TimelineObserver {
val observer = object : TimelineObserver { val observer = object : TimelineObserver {
val observerMutex = Mutex() override var time: Instant = this@GeneratingTimeline.time
override var lastCollectedEventTime: Instant = Instant.DISTANT_PAST
override suspend fun collect(upTo: Instant) = observerMutex.withLock { override suspend fun collect(upTo: Instant) {
flowUnobservedEvents().takeWhile { it.time <= upTo }.onEach { flowUnobservedEvents().takeWhile {
lastCollectedEventTime = it.time it.time <= upTo
}.onEach {
time = it.time
kickGenerator()
}.collector() }.collector()
//cleanup()
} }
override fun close() { override fun close() {
observers.remove(this) observers.remove(this)
if(observers.isEmpty()){
this@GeneratingTimeline.close()
}
} }
} }

View File

@ -6,13 +6,15 @@ import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.flow.takeWhile
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
public class MergedTimeline<E : TimelineEvent>( public class MergedTimeline<E : TimelineEvent>(
private val timelines: List<Timeline<E>> private val timelines: List<Timeline<E>>
) : Timeline<E> { ) : Timeline<E> {
override val lastEventTime: Instant? override val time: Instant
get() = timelines.minOfOrNull { it.lastEventTime ?: Instant.DISTANT_PAST } get() = timelines.minOfNotNullOrNull { it.time } ?: Instant.DISTANT_PAST
override val observedTime: Instant
get() = timelines.maxOfOrNull { it.observedTime } ?: Instant.DISTANT_FUTURE override val observedTime: Instant?
get() = timelines.maxOfNotNullOrNull { it.observedTime }
override fun flowUnobservedEvents(): Flow<E> = timelines.map { flowUnobservedEvents() }.merge() override fun flowUnobservedEvents(): Flow<E> = timelines.map { flowUnobservedEvents() }.merge()
@ -20,21 +22,21 @@ public class MergedTimeline<E : TimelineEvent>(
timelines.forEach { it.advance(toTime) } timelines.forEach { it.advance(toTime) }
} }
override suspend fun interrupt(atTime: Instant) { // override suspend fun interrupt(atTime: Instant) {
timelines.forEach { it.interrupt(atTime) } // timelines.forEach { it.interrupt(atTime) }
} // }
private val observers: MutableSet<TimelineObserver> = mutableSetOf() private val observers: MutableSet<TimelineObserver> = mutableSetOf()
override suspend fun observe(collector: suspend Flow<E>.() -> Unit): TimelineObserver { override suspend fun observe(collector: suspend Flow<E>.() -> Unit): TimelineObserver {
val observer = object : TimelineObserver { val observer = object : TimelineObserver {
override var lastCollectedEventTime: Instant = Instant.DISTANT_PAST override var time: Instant = this@MergedTimeline.time
override suspend fun collect(upTo: Instant) = timelines override suspend fun collect(upTo: Instant) = timelines
.map { flowUnobservedEvents() } .map { flowUnobservedEvents() }
.merge() .merge()
.takeWhile { it.time <= upTo }.onEach { .takeWhile { it.time <= upTo }.onEach {
lastCollectedEventTime = it.time time = it.time
}.collector() }.collector()

View File

@ -19,11 +19,11 @@ public class SharedTimeline<E : TimelineEvent> : Timeline<E> {
private val observers: MutableSet<TimelineObserver> = mutableSetOf() private val observers: MutableSet<TimelineObserver> = mutableSetOf()
override val lastEventTime: Instant? override val time: Instant
get() = events.lastOrNull()?.time get() = events.lastOrNull()?.time ?: Instant.DISTANT_PAST
override val observedTime: Instant override val observedTime: Instant?
get() = observers.minOfOrNull { it.lastCollectedEventTime } ?: Instant.DISTANT_PAST get() = observers.minOfNotNullOrNull { it.time }
override fun flowUnobservedEvents(): Flow<E> = events.asFlow() override fun flowUnobservedEvents(): Flow<E> = events.asFlow()
@ -31,7 +31,9 @@ public class SharedTimeline<E : TimelineEvent> : Timeline<E> {
* Emit new event to the timeline * Emit new event to the timeline
*/ */
public suspend fun emit(event: E): Boolean = mutex.withLock { public suspend fun emit(event: E): Boolean = mutex.withLock {
if (event.time < observedTime) error("Can't emit event $event because there are observed events after $observedTime") if (event.time < (observedTime ?: Instant.DISTANT_PAST)) {
error("Can't emit event $event because there are observed events after $observedTime")
}
events.add(event) events.add(event)
} }
@ -44,33 +46,33 @@ public class SharedTimeline<E : TimelineEvent> : Timeline<E> {
/** /**
* Discard all events before [observedTime] * Discard all events before [observedTime]
*/ */
private suspend fun cleanup(): Unit { private suspend fun cleanup(): Unit = mutex.withLock {
val threshold = observedTime val threshold = observedTime ?: return@withLock
while (events.isNotEmpty() && events.last().time > threshold) { while (events.isNotEmpty() && events.last().time > threshold) {
events.removeFirst() events.removeFirst()
} }
} }
/** // /**
* Discard unconsumed events after [atTime]. // * Discard unconsumed events after [atTime].
*/ // */
override suspend fun interrupt(atTime: Instant): Unit = mutex.withLock { // override suspend fun interrupt(atTime: Instant): Unit = mutex.withLock {
val threshold = observedTime // val threshold = observedTime
if (atTime < threshold) // if (atTime < threshold)
error("Timeline interrupt at time $atTime is not possible because there are observed events before $threshold") // error("Timeline interrupt at time $atTime is not possible because there are observed events before $threshold")
while (events.isNotEmpty() && events.last().time > atTime) { // while (events.isNotEmpty() && events.last().time > atTime) {
events.removeLast() // events.removeLast()
} // }
} // }
override suspend fun observe(collector: suspend Flow<E>.() -> Unit): TimelineObserver { override suspend fun observe(collector: suspend Flow<E>.() -> Unit): TimelineObserver {
val observer = object : TimelineObserver { val observer = object : TimelineObserver {
val observerMutex = Mutex() val observerMutex = Mutex()
override var lastCollectedEventTime: Instant = Instant.DISTANT_PAST override var time: Instant = this@SharedTimeline.time
override suspend fun collect(upTo: Instant) = observerMutex.withLock { override suspend fun collect(upTo: Instant) = observerMutex.withLock {
flowUnobservedEvents().takeWhile { it.time <= upTo }.onEach { flowUnobservedEvents().takeWhile { it.time <= upTo }.onEach {
lastCollectedEventTime = it.time time = it.time
}.collector() }.collector()
cleanup() cleanup()
} }

View File

@ -2,20 +2,31 @@ package space.kscience.simulation
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
import kotlin.time.Duration
public interface TimelineEvent { public interface TimelineEvent {
public val time: Instant public val time: Instant
} }
public interface TimelineObserver: AutoCloseable { public interface TimelineInterval : TimelineEvent {
public val startTime: Instant
public val duration: Duration
override val time: Instant
get() = startTime + duration
}
public data class SimpleTimelineEvent<T>(override val time: Instant, val value: T) : TimelineEvent
public interface TimelineObserver : AutoCloseable {
/** /**
* The time of the last event collected by this collector * The subjective time of this observer
*/ */
public val lastCollectedEventTime: Instant public val time: Instant
/** /**
* Collect all uncollected events from [lastCollectedEventTime] to [upTo]. * Collect all uncollected events from [time] to [upTo].
* *
* By default, collects all events. * By default, collects all events.
*/ */
@ -32,17 +43,17 @@ public interface TimelineObserver: AutoCloseable {
*/ */
public interface Timeline<E : TimelineEvent> { public interface Timeline<E : TimelineEvent> {
/** /**
* The timestamp of the last event in a timeline * A subjective time of this timeline. The time could advance without events being produced.
*/ */
public val lastEventTime: Instant? public val time: Instant
/** /**
* The time of the last event that was observed by all observers * The time of the last event that was observed by all observers
*/ */
public val observedTime: Instant public val observedTime: Instant?
/** /**
* Flow events from [observedTime] to [lastEventTime]. * Flow events from [observedTime] to [time].
* *
* The resulting flow is finite and should not suspend. * The resulting flow is finite and should not suspend.
* *
@ -53,8 +64,8 @@ public interface Timeline<E : TimelineEvent> {
/** /**
* Attach observer to this [Timeline]. The observer collection is not triggered right away, but only on demand. * Attach observer to this [Timeline]. The observer collection is not triggered right away, but only on demand.
* *
* Each collection shifts [TimelineObserver.lastCollectedEventTime] for this observer. * Each collection shifts [TimelineObserver.time] for this observer.
* The value of [observedTime] is the least of all observers [TimelineObserver.lastCollectedEventTime]. * The value of [observedTime] is the least of all observers [TimelineObserver.time].
*/ */
public suspend fun observe( public suspend fun observe(
collector: suspend Flow<E>.() -> Unit collector: suspend Flow<E>.() -> Unit
@ -67,10 +78,10 @@ public interface Timeline<E : TimelineEvent> {
*/ */
public suspend fun advance(toTime: Instant) public suspend fun advance(toTime: Instant)
/** // /**
* Interrupt generation of this timeline and discard unconsumed events after [atTime]. // * Interrupt generation of this timeline and discard unconsumed events after [atTime].
* // *
* Throw exception if at least one observer advanced // * Throw exception if at least one observer advanced
*/ // */
public suspend fun interrupt(atTime: Instant): Unit // public suspend fun interrupt(atTime: Instant): Unit
} }

View File

@ -0,0 +1,35 @@
package space.kscience.simulation
internal inline fun <T, R : Comparable<R>> Iterable<T>.minOfNotNullOrNull(selector: (T) -> R?): R? {
val iterator = iterator()
if (!iterator.hasNext()) return null
var minValue = selector(iterator.next())
while (iterator.hasNext()) {
val v = selector(iterator.next())
when {
minValue == null -> minValue = v
v == null -> {/*do nothing*/}
minValue > v -> {
minValue = v
}
}
}
return minValue
}
internal inline fun <T, R : Comparable<R>> Iterable<T>.maxOfNotNullOrNull(selector: (T) -> R?): R? {
val iterator = iterator()
if (!iterator.hasNext()) return null
var maxValue = selector(iterator.next())
while (iterator.hasNext()) {
val v = selector(iterator.next())
when {
maxValue == null -> maxValue = v
v == null -> {/*do nothing*/}
maxValue < v -> {
maxValue = v
}
}
}
return maxValue
}

View File

@ -0,0 +1,34 @@
package space.kscience.simulation
import kotlinx.coroutines.test.runTest
import kotlinx.datetime.Instant
import kotlin.test.Test
import kotlin.time.Duration.Companion.seconds
class TimelineTests {
@Test
fun testGeneration() = runTest(timeout = 5.seconds) {
val startTime = Instant.parse("2020-01-01T00:00:00.000Z")
val generation = GeneratingTimeline<SimpleTimelineEvent<DoubleArray>>(
this,
initialEvent = SimpleTimelineEvent(startTime, List(10) { it.toDouble() }.toDoubleArray()),
lookaheadInterval = 1.seconds
) { event ->
val time = event.time + 0.1.seconds
println("Emit: $time")
SimpleTimelineEvent(time, event.value.map { it + 1.0 }.toDoubleArray())
}
val collector = generation.observe {
collect {
println("Consume: ${it.time}")
}
}
collector.collect(startTime + 2.seconds)
collector.close()
}
}