GeneratingTimeline fully functional

This commit is contained in:
Alexander Nozik 2024-12-08 11:12:08 +03:00
parent bb09a74710
commit 203a8c1570
6 changed files with 60 additions and 44 deletions

View File

@ -1,18 +1,23 @@
package space.kscience.simulation package space.kscience.simulation
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
import kotlin.coroutines.CoroutineContext
public abstract class AbstractTimeline<E : TimelineEvent>( public abstract class AbstractTimeline<E : TimelineEvent>(
protected val timelineScope: CoroutineScope,
protected var startTime: Instant, protected var startTime: Instant,
) : Timeline<E> { coroutineContext: CoroutineContext
) : Timeline<E>, AutoCloseable {
protected val timelineScope: CoroutineScope = CoroutineScope(
coroutineContext +
SupervisorJob(coroutineContext[Job]) +
CoroutineExceptionHandler{ _, throwable -> throwable.printStackTrace() } +
CoroutineName("Timeline")
)
private val observers: MutableSet<TimelineObserver> = mutableSetOf() private val observers: MutableSet<TimelineObserver> = mutableSetOf()
@ -67,4 +72,9 @@ public abstract class AbstractTimeline<E : TimelineEvent>(
observers.add(observer) observers.add(observer)
return observer return observer
} }
override fun close() {
observers.forEach { it.close() }
timelineScope.cancel()
}
} }

View File

@ -1,10 +1,11 @@
package space.kscience.simulation package space.kscience.simulation
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.time.Duration import kotlin.time.Duration
/** /**
@ -21,11 +22,11 @@ public fun <E : TimelineEvent> Flow<E>.withTimeThreshold(
* @param lookaheadInterval an interval for generated events ahead of the last observed event. * @param lookaheadInterval an interval for generated events ahead of the last observed event.
*/ */
public class GeneratingTimeline<E : TimelineEvent>( public class GeneratingTimeline<E : TimelineEvent>(
private val generationScope: CoroutineScope,
private val origin: E, private val origin: E,
private val lookaheadInterval: Duration, private val lookaheadInterval: Duration,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
private val generator: suspend FlowCollector<E>.(E) -> Unit private val generator: suspend FlowCollector<E>.(E) -> Unit
) : AbstractTimeline<E>(generationScope, origin.time) { ) : AbstractTimeline<E>(origin.time, coroutineContext) {
private val startEventFlow = MutableStateFlow(origin) private val startEventFlow = MutableStateFlow(origin)
@ -46,11 +47,11 @@ public class GeneratingTimeline<E : TimelineEvent>(
}.withTimeThreshold( }.withTimeThreshold(
threshold = time.map { it + lookaheadInterval } threshold = time.map { it + lookaheadInterval }
).buffer(Channel.UNLIMITED).mapNotNull { ).buffer(Channel.UNLIMITED).mapNotNull {
//it.event //a barrier to avoid leaking stale events after interruption from buffer
it.takeIf { it.origin == startEventFlow.value }?.event it.takeIf { it.origin == startEventFlow.value }?.event
}.shareIn( }.shareIn(
scope = generationScope, scope = timelineScope,
started = SharingStarted.Eagerly, started = SharingStarted.Lazily,
) )
override fun events(): Flow<E> = events override fun events(): Flow<E> = events

View File

@ -1,14 +1,15 @@
package space.kscience.simulation package space.kscience.simulation
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flow
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
public class MergedTimeline<E : TimelineEvent>( public class MergedTimeline<E : TimelineEvent>(
timelineScope: CoroutineScope, private val timelines: List<Timeline<E>>,
private val timelines: List<Timeline<E>> coroutineContext: CoroutineContext = EmptyCoroutineContext
) : AbstractTimeline<E>(timelineScope, timelines.minOf { it.time.value }) { ) : AbstractTimeline<E>(timelines.minOf { it.time.value }, coroutineContext) {
override fun events(): Flow<E> = flow { override fun events(): Flow<E> = flow {
val buffer = TODO() val buffer = TODO()

View File

@ -1,18 +1,19 @@
package space.kscience.simulation package space.kscience.simulation
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
/** /**
* A manually mutable [Timeline] that could be modified via [emit] method by multiple * A manually mutable [Timeline] that could be modified via [emit] method by multiple
*/ */
public class SharedTimeline<E : TimelineEvent>( public class SharedTimeline<E : TimelineEvent>(
timelineScope: CoroutineScope, startTime: Instant,
startTime: Instant coroutineContext: CoroutineContext = EmptyCoroutineContext
) : AbstractTimeline<E>(timelineScope, startTime) { ) : AbstractTimeline<E>(startTime, coroutineContext) {
private val events = MutableSharedFlow<E>(replay = Channel.UNLIMITED) private val events = MutableSharedFlow<E>(replay = Channel.UNLIMITED)

View File

@ -34,7 +34,10 @@ public interface TimelineObserver : AutoCloseable {
public suspend fun collect(upTo: Instant = Instant.DISTANT_FUTURE) public suspend fun collect(upTo: Instant = Instant.DISTANT_FUTURE)
} }
public suspend fun TimelineObserver.collect(duration: Duration) = collect(time.value + duration) /**
* Collect events for a fixed [duration] since last observed time
*/
public suspend fun TimelineObserver.collect(duration: Duration): Unit = collect(time.value + duration)
/** /**
* A time-ordered sequence of events of type [E]. There time of events is strictly monotonic, meaning that the time of * A time-ordered sequence of events of type [E]. There time of events is strictly monotonic, meaning that the time of
@ -66,11 +69,13 @@ public interface Timeline<E : TimelineEvent> {
* This method suspends until all advancement is done * This method suspends until all advancement is done
*/ */
public suspend fun advance(toTime: Instant) public suspend fun advance(toTime: Instant)
}
// /**
// * Interrupt generation of this timeline and discard unconsumed events after [atTime]. /**
// * * Perform [collector] action on each event
// * Throw exception if at least one observer advanced */
// */ public suspend fun <E : TimelineEvent> Timeline<E>.observeEach(
// public suspend fun interrupt(atTime: Instant): Unit collector: suspend (E) -> Unit
): TimelineObserver = observe {
collect(collector)
} }

View File

@ -11,40 +11,38 @@ class TimelineTests {
@Test @Test
fun testGeneration() = runTest{ fun testGeneration() = runTest(timeout = 1.seconds) {
val startTime = Instant.parse("2020-01-01T00:00:00.000Z") val startTime = Instant.parse("2020-01-01T00:00:00.000Z")
val generation = GeneratingTimeline<SimpleTimelineEvent<DoubleArray>>( val generation = GeneratingTimeline(
this, origin = SimpleTimelineEvent(startTime, Unit),
origin = SimpleTimelineEvent(startTime, List(10) { it.toDouble() }.toDoubleArray()),
lookaheadInterval = 1.seconds lookaheadInterval = 1.seconds
) { event -> ) { event ->
var time = event.time var time = event.time
while (isActive) { while (isActive) {
time += 0.1.seconds time += 0.1.seconds
println("Emit: ${time - startTime}") println("Emit: ${time - startTime}")
emit(SimpleTimelineEvent(time, event.value.map { it + 1.0 }.toDoubleArray())) emit(SimpleTimelineEvent(time, Unit))
} }
} }
val result = mutableListOf<Duration>() val result = mutableListOf<Duration>()
val collector = generation.observe { val observer = generation.observeEach {
collect {
println("Consume: ${it.time - startTime}") println("Consume: ${it.time - startTime}")
result.add(it.time - startTime) result.add(it.time - startTime)
} }
}
collector.collect(2.seconds) observer.collect(2.seconds)
println("First collection complete") println("First collection complete")
collector.collect(2.seconds) observer.collect(2.seconds)
println("Second collection complete") println("Second collection complete")
println("Interrupt") println("Interrupt")
generation.interrupt(SimpleTimelineEvent(startTime + 6.seconds, List(10) { it.toDouble() }.toDoubleArray())) generation.interrupt(SimpleTimelineEvent(startTime + 6.seconds, Unit))
println("Collecting second") println("Collecting after interruption")
collector.collect(startTime + 6.seconds + 2.5.seconds) observer.collect(startTime + 6.seconds + 2.5.seconds)
println(result) println(result)
collector.close() generation.close()
} }
} }