GeneratingTimeline fully functional

This commit is contained in:
Alexander Nozik 2024-12-08 10:37:47 +03:00
parent b4b534df1d
commit bb09a74710
9 changed files with 176 additions and 248 deletions

View File

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-8.11.1-bin.zip
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists

View File

@ -21,6 +21,10 @@ pluginManagement {
} }
} }
plugins {
id("org.gradle.toolchains.foojay-resolver-convention") version "0.8.0"
}
dependencyResolutionManagement { dependencyResolutionManagement {
val toolsVersion: String by extra val toolsVersion: String by extra

View File

@ -28,5 +28,5 @@ kscience {
readme{ readme{
maturity = Maturity.EXPERIMENTAL maturity = Maturity.PROTOTYPE
} }

View File

@ -0,0 +1,70 @@
package space.kscience.simulation
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.datetime.Instant
public abstract class AbstractTimeline<E : TimelineEvent>(
protected val timelineScope: CoroutineScope,
protected var startTime: Instant,
) : Timeline<E> {
private val observers: MutableSet<TimelineObserver> = mutableSetOf()
private val feedbackChannel = Channel<Unit>(onBufferOverflow = BufferOverflow.DROP_OLDEST)
override val time: StateFlow<Instant> = feedbackChannel.consumeAsFlow().map {
maxOf(startTime,observers.maxOfOrNull { it.time.value } ?: startTime)
}.stateIn(timelineScope, SharingStarted.Lazily, startTime)
override suspend fun advance(toTime: Instant) {
observers.forEach {
it.collect(toTime)
}
}
/**
* Flow unobserved events starting at [time]. The flow could be interrupted if timeline changes
*/
protected abstract fun events(): Flow<E>
override suspend fun observe(collector: suspend Flow<E>.() -> Unit): TimelineObserver {
val context = currentCoroutineContext()
val observer = object : TimelineObserver {
// observed time
override val time = MutableStateFlow(startTime)
private val channel = Channel<E>()
private val collectJob = timelineScope.launch(context) {
channel.consumeAsFlow().onEach {
time.emit(it.time)
feedbackChannel.send(Unit)
}.collector()
}
override suspend fun collect(upTo: Instant) = coroutineScope {
require(upTo >= time.value) { "Requested time $upTo is lower than observed ${time.value}" }
events().takeWhile {
it.time <= upTo
}.collect {
channel.send(it)
}
}
override fun close() {
collectJob.cancel()
observers.remove(this)
}
}
observers.add(observer)
return observer
}
}

View File

@ -1,132 +1,65 @@
package space.kscience.simulation package space.kscience.simulation
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
import kotlin.time.Duration import kotlin.time.Duration
/**
* Suspend the collection of this [Flow] until event time is lower that threshold
*/
public fun <E : TimelineEvent> Flow<E>.withTimeThreshold(
threshold: Flow<Instant>
): Flow<E> = transform { event ->
threshold.first { it > event.time }
emit(event)
}
/** /**
* @param lookaheadInterval an interval for generated events ahead of the last observed event. * @param lookaheadInterval an interval for generated events ahead of the last observed event.
*/ */
public class GeneratingTimeline<E : TimelineEvent>( public class GeneratingTimeline<E : TimelineEvent>(
private val generationScope: CoroutineScope, private val generationScope: CoroutineScope,
private val initialEvent: E, private val origin: E,
private val lookaheadInterval: Duration, private val lookaheadInterval: Duration,
private val generatorChain: suspend (E) -> E private val generator: suspend FlowCollector<E>.(E) -> Unit
) : Timeline<E>, AutoCloseable { ) : AbstractTimeline<E>(generationScope, origin.time) {
// push to this channel to trigger event generation private val startEventFlow = MutableStateFlow(origin)
private val wakeupChannel = Channel<Unit>(onBufferOverflow = BufferOverflow.DROP_OLDEST)
private suspend fun kickGenerator() { private data class EventWithOrigin<E : TimelineEvent>(val origin: E, val event: E) : TimelineEvent {
wakeupChannel.send(Unit) override val time: Instant get() = event.time
}
private val mutex = Mutex()
private val history = ArrayDeque<E>()
private val lastEvent = MutableSharedFlow<E>(replay = Int.MAX_VALUE)
private val updateHistoryJob = generationScope.launch {
lastEvent.onEach {
mutex.withLock {
history.add(it)
//cleanup old events
val threshold = observedTime ?: return@withLock
while (history.isNotEmpty() && history.last().time > threshold) {
history.removeFirst()
} }
private val events: SharedFlow<E> = flow {
coroutineScope {
startEventFlow.collect { startEvent ->
emitAll(
flow { generator(startEvent) }.takeWhile { startEvent == startEventFlow.value }.map {
EventWithOrigin(startEvent, it)
} }
)
} }
} }
}.withTimeThreshold(
threshold = time.map { it + lookaheadInterval }
).buffer(Channel.UNLIMITED).mapNotNull {
//it.event
it.takeIf { it.origin == startEventFlow.value }?.event
}.shareIn(
scope = generationScope,
started = SharingStarted.Eagerly,
)
private val observers: MutableSet<TimelineObserver> = mutableSetOf() override fun events(): Flow<E> = events
override val time: Instant
get() = history.lastOrNull()?.time ?: initialEvent.time
override val observedTime: Instant?
get() = observers.minOfNotNullOrNull { it.time }
override fun flowUnobservedEvents(): Flow<E> = flow {
history.forEach { e ->
emit(e)
}
emitAll(lastEvent)
}
override suspend fun advance(toTime: Instant) {
observers.forEach {
it.collect(toTime)
}
}
private var generatorJob: Job = launchGenerator(initialEvent)
private fun launchGenerator(event: E): Job = generationScope.launch {
kickGenerator()
var currentEvent = event
// for each wakeup generate all events in lookaheadInterval
for (u in wakeupChannel) {
while (currentEvent.time < (observedTime ?: event.time) + lookaheadInterval) {
val nextEvent = generatorChain(currentEvent)
lastEvent.emit(nextEvent)
currentEvent = nextEvent
}
}
}
public suspend fun interrupt(newStart: E) { public suspend fun interrupt(newStart: E) {
check(newStart.time > (observedTime ?: Instant.DISTANT_FUTURE)) { check(newStart.time >= time.value) {
"Can't interrupt generating timeline after observed event" "Can't interrupt generating timeline after observed event"
} }
mutex.withLock { startTime = newStart.time
while (history.isNotEmpty() && history.last().time > newStart.time) { startEventFlow.emit(newStart)
history.removeLast()
}
generatorJob.cancel()
generatorJob = launchGenerator(newStart)
}
kickGenerator()
}
override fun close() {
updateHistoryJob.cancel()
generatorJob.cancel()
}
override suspend fun observe(collector: suspend Flow<E>.() -> Unit): TimelineObserver {
val observer = object : TimelineObserver {
override var time: Instant = this@GeneratingTimeline.time
override suspend fun collect(upTo: Instant) {
flowUnobservedEvents().takeWhile {
it.time <= upTo
}.onEach {
time = it.time
kickGenerator()
}.collector()
}
override fun close() {
observers.remove(this)
if(observers.isEmpty()){
this@GeneratingTimeline.close()
}
}
}
observers.add(observer)
return observer
} }
} }

View File

@ -1,51 +1,25 @@
package space.kscience.simulation package space.kscience.simulation
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
import kotlinx.datetime.Instant
public class MergedTimeline<E : TimelineEvent>( public class MergedTimeline<E : TimelineEvent>(
timelineScope: CoroutineScope,
private val timelines: List<Timeline<E>> private val timelines: List<Timeline<E>>
) : Timeline<E> { ) : AbstractTimeline<E>(timelineScope, timelines.minOf { it.time.value }) {
override val time: Instant
get() = timelines.minOfNotNullOrNull { it.time } ?: Instant.DISTANT_PAST
override val observedTime: Instant? override fun events(): Flow<E> = flow {
get() = timelines.maxOfNotNullOrNull { it.observedTime } val buffer = TODO()
//
override fun flowUnobservedEvents(): Flow<E> = timelines.map { flowUnobservedEvents() }.merge() // timelines.forEach { timeline ->
// timeline.observe {
override suspend fun advance(toTime: Instant) { // collect{
timelines.forEach { it.advance(toTime) } // buffer.add(it)
} // }
// }
// override suspend fun interrupt(atTime: Instant) {
// timelines.forEach { it.interrupt(atTime) }
// } // }
private val observers: MutableSet<TimelineObserver> = mutableSetOf()
override suspend fun observe(collector: suspend Flow<E>.() -> Unit): TimelineObserver {
val observer = object : TimelineObserver {
override var time: Instant = this@MergedTimeline.time
override suspend fun collect(upTo: Instant) = timelines
.map { flowUnobservedEvents() }
.merge()
.takeWhile { it.time <= upTo }.onEach {
time = it.time
}.collector()
override fun close() {
observers.remove(this)
} }
} }
observers.add(observer)
return observer
}
}

View File

@ -1,88 +1,30 @@
package space.kscience.simulation package space.kscience.simulation
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
/** /**
* A manually mutable [Timeline] that could be modified via [emit] method by multiple * A manually mutable [Timeline] that could be modified via [emit] method by multiple
*/ */
public class SharedTimeline<E : TimelineEvent> : Timeline<E> { public class SharedTimeline<E : TimelineEvent>(
timelineScope: CoroutineScope,
startTime: Instant
) : AbstractTimeline<E>(timelineScope, startTime) {
private val mutex = Mutex() private val events = MutableSharedFlow<E>(replay = Channel.UNLIMITED)
private val events = ArrayDeque<E>() override fun events(): Flow<E> = events
private val observers: MutableSet<TimelineObserver> = mutableSetOf()
override val time: Instant
get() = events.lastOrNull()?.time ?: Instant.DISTANT_PAST
override val observedTime: Instant?
get() = observers.minOfNotNullOrNull { it.time }
override fun flowUnobservedEvents(): Flow<E> = events.asFlow()
/** /**
* Emit new event to the timeline * Emit new event to the timeline
*/ */
public suspend fun emit(event: E): Boolean = mutex.withLock { public suspend fun emit(event: E) {
if (event.time < (observedTime ?: Instant.DISTANT_PAST)) { if (event.time < (events.replayCache.lastOrNull()?.time ?: time.value)) {
error("Can't emit event $event because there are observed events after $observedTime") error("Can't emit event $event because timeline monotony is broken")
} }
events.add(event) events.emit(event)
}
override suspend fun advance(toTime: Instant) {
observers.forEach {
it.collect(toTime)
}
}
/**
* Discard all events before [observedTime]
*/
private suspend fun cleanup(): Unit = mutex.withLock {
val threshold = observedTime ?: return@withLock
while (events.isNotEmpty() && events.last().time > threshold) {
events.removeFirst()
}
}
// /**
// * Discard unconsumed events after [atTime].
// */
// override suspend fun interrupt(atTime: Instant): Unit = mutex.withLock {
// val threshold = observedTime
// if (atTime < threshold)
// error("Timeline interrupt at time $atTime is not possible because there are observed events before $threshold")
// while (events.isNotEmpty() && events.last().time > atTime) {
// events.removeLast()
// }
// }
override suspend fun observe(collector: suspend Flow<E>.() -> Unit): TimelineObserver {
val observer = object : TimelineObserver {
val observerMutex = Mutex()
override var time: Instant = this@SharedTimeline.time
override suspend fun collect(upTo: Instant) = observerMutex.withLock {
flowUnobservedEvents().takeWhile { it.time <= upTo }.onEach {
time = it.time
}.collector()
cleanup()
}
override fun close() {
observers.remove(this)
}
}
observers.add(observer)
return observer
} }
} }

View File

@ -1,6 +1,7 @@
package space.kscience.simulation package space.kscience.simulation
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
import kotlin.time.Duration import kotlin.time.Duration
@ -21,9 +22,9 @@ public data class SimpleTimelineEvent<T>(override val time: Instant, val value:
public interface TimelineObserver : AutoCloseable { public interface TimelineObserver : AutoCloseable {
/** /**
* The subjective time of this observer * The subjective time of this observer (last observed time)
*/ */
public val time: Instant public val time: StateFlow<Instant>
/** /**
* Collect all uncollected events from [time] to [upTo]. * Collect all uncollected events from [time] to [upTo].
@ -33,6 +34,8 @@ public interface TimelineObserver : AutoCloseable {
public suspend fun collect(upTo: Instant = Instant.DISTANT_FUTURE) public suspend fun collect(upTo: Instant = Instant.DISTANT_FUTURE)
} }
public suspend fun TimelineObserver.collect(duration: Duration) = collect(time.value + duration)
/** /**
* A time-ordered sequence of events of type [E]. There time of events is strictly monotonic, meaning that the time of * A time-ordered sequence of events of type [E]. There time of events is strictly monotonic, meaning that the time of
* the next event is greater than the previous event time. * the next event is greater than the previous event time.
@ -43,29 +46,15 @@ public interface TimelineObserver : AutoCloseable {
*/ */
public interface Timeline<E : TimelineEvent> { public interface Timeline<E : TimelineEvent> {
/** /**
* A subjective time of this timeline. The time could advance without events being produced. * A subjective time of this timeline. The subjective time is the last observed time.
*/ */
public val time: Instant public val time: StateFlow<Instant>
/**
* The time of the last event that was observed by all observers
*/
public val observedTime: Instant?
/**
* Flow events from [observedTime] to [time].
*
* The resulting flow is finite and should not suspend.
*
* This method does not affect [observedTime].
*/
public fun flowUnobservedEvents(): Flow<E>
/** /**
* Attach observer to this [Timeline]. The observer collection is not triggered right away, but only on demand. * Attach observer to this [Timeline]. The observer collection is not triggered right away, but only on demand.
* *
* Each collection shifts [TimelineObserver.time] for this observer. * Each collection shifts [TimelineObserver.time] for this observer.
* The value of [observedTime] is the least of all observers [TimelineObserver.time].
*/ */
public suspend fun observe( public suspend fun observe(
collector: suspend Flow<E>.() -> Unit collector: suspend Flow<E>.() -> Unit

View File

@ -1,34 +1,50 @@
package space.kscience.simulation package space.kscience.simulation
import kotlinx.coroutines.isActive
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
import kotlin.test.Test import kotlin.test.Test
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds import kotlin.time.Duration.Companion.seconds
class TimelineTests { class TimelineTests {
@Test @Test
fun testGeneration() = runTest(timeout = 5.seconds) { fun testGeneration() = runTest{
val startTime = Instant.parse("2020-01-01T00:00:00.000Z") val startTime = Instant.parse("2020-01-01T00:00:00.000Z")
val generation = GeneratingTimeline<SimpleTimelineEvent<DoubleArray>>( val generation = GeneratingTimeline<SimpleTimelineEvent<DoubleArray>>(
this, this,
initialEvent = SimpleTimelineEvent(startTime, List(10) { it.toDouble() }.toDoubleArray()), origin = SimpleTimelineEvent(startTime, List(10) { it.toDouble() }.toDoubleArray()),
lookaheadInterval = 1.seconds lookaheadInterval = 1.seconds
) { event -> ) { event ->
val time = event.time + 0.1.seconds var time = event.time
println("Emit: $time") while (isActive) {
SimpleTimelineEvent(time, event.value.map { it + 1.0 }.toDoubleArray()) time += 0.1.seconds
println("Emit: ${time - startTime}")
emit(SimpleTimelineEvent(time, event.value.map { it + 1.0 }.toDoubleArray()))
} }
}
val result = mutableListOf<Duration>()
val collector = generation.observe { val collector = generation.observe {
collect { collect {
println("Consume: ${it.time}") println("Consume: ${it.time - startTime}")
result.add(it.time - startTime)
} }
} }
collector.collect(startTime + 2.seconds) collector.collect(2.seconds)
println("First collection complete")
collector.collect(2.seconds)
println("Second collection complete")
println("Interrupt")
generation.interrupt(SimpleTimelineEvent(startTime + 6.seconds, List(10) { it.toDouble() }.toDoubleArray()))
println("Collecting second")
collector.collect(startTime + 6.seconds + 2.5.seconds)
println(result)
collector.close() collector.close()
} }
} }