Introduce virtual time manager

This commit is contained in:
Alexander Nozik 2025-03-21 20:22:06 +03:00
parent ed3e1c13e0
commit 8e55d1e22e
26 changed files with 277 additions and 187 deletions
controls-constructor/src/commonMain/kotlin/space/kscience/controls/constructor
controls-core/src/commonMain/kotlin/space/kscience/controls
controls-jupyter
controls-vision
build.gradle.kts
src
commonMain/kotlin
jsMain/kotlin
jvmMain/kotlin
controls-visualisation-compose
build.gradle.kts
src/commonMain/kotlin
demo
all-things
build.gradle.kts
src/main/kotlin/space/kscience/controls/demo
constructor
build.gradle.kts
src/jvmMain/kotlin
many-devices/src/main/kotlin/space/kscience/controls/demo
motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster
gradle.properties
gradle
simulation-kt/src
commonMain/kotlin
commonTest/kotlin

@ -3,14 +3,14 @@ package space.kscience.controls.constructor
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.newCoroutineContext import kotlinx.coroutines.newCoroutineContext
import space.kscience.controls.time.AsyncTimeProvider import space.kscience.controls.time.clock
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
public abstract class ModelConstructor( public abstract class ModelConstructor(
final override val context: Context, final override val context: Context,
vararg dependencies: DeviceState<*>, vararg dependencies: DeviceState<*>,
) : StateContainer, CoroutineScope, AsyncTimeProvider{ ) : StateContainer, CoroutineScope{
@OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class) @OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class)
override val coroutineContext: CoroutineContext = context.newCoroutineContext(SupervisorJob()) override val coroutineContext: CoroutineContext = context.newCoroutineContext(SupervisorJob())
@ -31,4 +31,6 @@ public abstract class ModelConstructor(
override fun unregisterElement(constructorElement: ConstructorElement) { override fun unregisterElement(constructorElement: ConstructorElement) {
_constructorElements.remove(constructorElement) _constructorElements.remove(constructorElement)
} }
} }
public val ModelConstructor.clock get() = context.clock

@ -25,7 +25,7 @@ public class TimerState(
private val clock = MutableStateFlow(initialValue) private val clock = MutableStateFlow(initialValue)
private val updateJob = clockManager.context.launch(clockManager.asDispatcher()) { private val updateJob = clockManager.context.launch(clockManager.dispatcher) {
while (isActive) { while (isActive) {
clock.value = clockManager.clock.now() clock.value = clockManager.clock.now()
delay(tick) delay(tick)

@ -5,9 +5,6 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
import space.kscience.controls.api.Device.Companion.DEVICE_TARGET import space.kscience.controls.api.Device.Companion.DEVICE_TARGET
import space.kscience.controls.time.AsyncClock
import space.kscience.controls.time.AsyncTimeProvider
import space.kscience.controls.time.clock
import space.kscience.dataforge.context.ContextAware import space.kscience.dataforge.context.ContextAware
import space.kscience.dataforge.context.info import space.kscience.dataforge.context.info
import space.kscience.dataforge.context.logger import space.kscience.dataforge.context.logger
@ -23,7 +20,7 @@ import space.kscience.dataforge.names.parseAsName
* When canceled, cancels all running processes. * When canceled, cancels all running processes.
*/ */
@DfType(DEVICE_TARGET) @DfType(DEVICE_TARGET)
public interface Device : ContextAware, WithLifeCycle, CoroutineScope, AsyncTimeProvider { public interface Device : ContextAware, WithLifeCycle, CoroutineScope {
/** /**
* Initial configuration meta for the device * Initial configuration meta for the device
@ -71,8 +68,6 @@ public interface Device : ContextAware, WithLifeCycle, CoroutineScope, AsyncTime
*/ */
override suspend fun start(): Unit = Unit override suspend fun start(): Unit = Unit
override val clock: AsyncClock get() = context.clock
/** /**
* Close and terminate the device. This function does not wait for the device to be closed. * Close and terminate the device. This function does not wait for the device to be closed.
*/ */

@ -51,7 +51,7 @@ public val MetaConverter.Companion.instant: MetaConverter<Instant> get() = Insta
public fun Instant.toMeta(): Meta = Meta(toString()) public fun Instant.toMeta(): Meta = Meta(toString())
public val Meta.instant: Instant? get() = value?.string?.let { Instant.parse(it) } public val Meta?.instant: Instant? get() = this?.value?.string?.let { Instant.parse(it) }
/** /**
* An [IOFormat] for [Instant] * An [IOFormat] for [Instant]

@ -4,7 +4,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flow
import space.kscience.controls.api.Device import space.kscience.controls.api.Device
import space.kscience.controls.time.getCoroutineDispatcher import space.kscience.controls.time.coroutineDispatcher
import kotlin.time.Duration import kotlin.time.Duration
/** /**
@ -16,7 +16,7 @@ public fun <D : Device> D.doRecurring(
task: suspend D.() -> Unit, task: suspend D.() -> Unit,
): Job { ): Job {
val taskName = debugTaskName ?: "task[${task.hashCode().toString(16)}]" val taskName = debugTaskName ?: "task[${task.hashCode().toString(16)}]"
val dispatcher = getCoroutineDispatcher() val dispatcher = coroutineDispatcher
return launch(CoroutineName(taskName) + dispatcher) { return launch(CoroutineName(taskName) + dispatcher) {
while (isActive) { while (isActive) {
delay(interval) delay(interval)

@ -4,20 +4,24 @@ import kotlinx.coroutines.*
import kotlinx.datetime.Clock import kotlinx.datetime.Clock
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
import space.kscience.controls.api.Device import space.kscience.controls.api.Device
import space.kscience.controls.instant
import space.kscience.dataforge.context.* import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.double import space.kscience.dataforge.meta.double
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.math.roundToLong import kotlin.math.roundToLong
import kotlin.time.Duration import kotlin.time.Duration
@OptIn(InternalCoroutinesApi::class) @OptIn(InternalCoroutinesApi::class)
private class CompressedTimeDispatcher( private class CompressedTimeDispatcher(
val clockManager: ClockManager, val coroutineContext: CoroutineContext,
val dispatcher: CoroutineDispatcher,
val compression: Double, val compression: Double,
) : CoroutineDispatcher(), Delay { ) : CoroutineDispatcher(), Delay {
val dispatcher = coroutineContext[CoroutineDispatcher] ?: Dispatchers.Default
@InternalCoroutinesApi @InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) { override fun dispatchYield(context: CoroutineContext, block: Runnable) {
dispatcher.dispatchYield(context, block) dispatcher.dispatchYield(context, block)
@ -25,14 +29,6 @@ private class CompressedTimeDispatcher(
override fun isDispatchNeeded(context: CoroutineContext): Boolean = dispatcher.isDispatchNeeded(context) override fun isDispatchNeeded(context: CoroutineContext): Boolean = dispatcher.isDispatchNeeded(context)
// @Deprecated(
// "Deprecated for good. Override 'limitedParallelism(parallelism: Int, name: String?)' instead",
// replaceWith = ReplaceWith("limitedParallelism(parallelism, null)"),
// level = DeprecationLevel.HIDDEN
// )
// @ExperimentalCoroutinesApi
// override fun limitedParallelism(parallelism: Int): CoroutineDispatcher = dispatcher.limitedParallelism(parallelism)
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher = override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher =
dispatcher.limitedParallelism(parallelism, name) dispatcher.limitedParallelism(parallelism, name)
@ -40,22 +36,21 @@ private class CompressedTimeDispatcher(
dispatcher.dispatch(context, block) dispatcher.dispatch(context, block)
} }
private val delay = ((dispatcher as? Delay) ?: (Dispatchers.Default as Delay)) private val parentDelay = ((dispatcher as? Delay) ?: (Dispatchers.Default as Delay))
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
delay.scheduleResumeAfterDelay((timeMillis / compression).roundToLong(), continuation) parentDelay.scheduleResumeAfterDelay((timeMillis / compression).roundToLong(), continuation)
} }
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
return delay.invokeOnTimeout((timeMillis / compression).roundToLong(), block, context) parentDelay.invokeOnTimeout((timeMillis / compression).roundToLong(), block, context)
}
} }
private class CompressedClock( private class CompressedClock(
val start: Instant,
val compression: Double,
val baseClock: Clock = Clock.System, val baseClock: Clock = Clock.System,
val compression: Double,
val start: Instant = baseClock.now(),
) : Clock { ) : Clock {
override fun now(): Instant { override fun now(): Instant {
val elapsed = (baseClock.now() - start) val elapsed = (baseClock.now() - start)
@ -63,38 +58,44 @@ private class CompressedClock(
} }
} }
public class ClockManager : AbstractPlugin(), AsyncTimeProvider { public sealed interface ClockMode {
public data object System : ClockMode
public data class Compressed(val compression: Double) : ClockMode
public data class Virtual(val manager: VirtualTimeManager) : ClockMode
}
public class ClockManager : AbstractPlugin() {
override val tag: PluginTag get() = Companion.tag override val tag: PluginTag get() = Companion.tag
public val timeCompression: Double by meta.double(1.0) public val clockMode: ClockMode = when (meta["clock.mode"].string) {
null, "system" -> ClockMode.System
override val clock: AsyncClock by lazy { "virtual" -> ClockMode.Virtual(VirtualTimeManager(meta["clock.start"]?.instant ?: Clock.System.now()))
if (timeCompression == 1.0) { else -> ClockMode.Compressed(meta["clock.compression"].double ?: 1.0)
AsyncClock.real(Clock.System)
} else {
AsyncClock.real(CompressedClock(Clock.System.now(), timeCompression))
}
} }
public val clock: Clock = when (clockMode) {
is ClockMode.Compressed -> CompressedClock(Clock.System, clockMode.compression)
ClockMode.System -> Clock.System
is ClockMode.Virtual -> clockMode.manager
}
/** /**
* Provide a [CoroutineDispatcher] with compressed time based on given [dispatcher] * Provide a [CoroutineDispatcher] with compressed time based on context dispatcher
*/ */
public fun asDispatcher( public val dispatcher: CoroutineDispatcher = when (clockMode) {
dispatcher: CoroutineDispatcher = Dispatchers.Default, ClockMode.System -> context.coroutineContext[CoroutineDispatcher] ?: Dispatchers.Default
): CoroutineDispatcher = if (timeCompression == 1.0) { is ClockMode.Compressed -> CompressedTimeDispatcher(context.coroutineContext, clockMode.compression)
dispatcher is ClockMode.Virtual -> VirtualTimeDispatcher(context.coroutineContext, clockMode.manager)
} else {
CompressedTimeDispatcher(this, dispatcher, timeCompression)
} }
public fun scheduleWithFixedDelay(tick: Duration, block: suspend () -> Unit): Job = context.launch(asDispatcher()) { public fun scheduleWithFixedDelay(tick: Duration, block: suspend () -> Unit): Job = context.launch(dispatcher) {
while (isActive) { while (isActive) {
delay(tick) delay(tick)
block() block()
} }
} }
public companion object : PluginFactory<ClockManager> { public companion object : PluginFactory<ClockManager> {
override val tag: PluginTag = PluginTag("clock", group = PluginTag.DATAFORGE_GROUP) override val tag: PluginTag = PluginTag("clock", group = PluginTag.DATAFORGE_GROUP)
@ -102,10 +103,14 @@ public class ClockManager : AbstractPlugin(), AsyncTimeProvider {
} }
} }
public val Context.clock: AsyncClock get() = plugins[ClockManager]?.clock ?: AsyncClock.real(Clock.System) public val Context.clock: Clock get() = plugins[ClockManager]?.clock ?: Clock.System
public fun Device.getCoroutineDispatcher(dispatcher: CoroutineDispatcher = Dispatchers.Default): CoroutineDispatcher = public val Device.clock: Clock get() = context.clock
context.plugins[ClockManager]?.asDispatcher(dispatcher) ?: dispatcher
public val Device.coroutineDispatcher: CoroutineDispatcher
get() = context.plugins[ClockManager]?.dispatcher
?: context.coroutineContext[CoroutineDispatcher]
?: Dispatchers.Default
public fun ContextBuilder.withTimeCompression(compression: Double) { public fun ContextBuilder.withTimeCompression(compression: Double) {
require(compression > 0.0) { "Time compression must be greater than zero." } require(compression > 0.0) { "Time compression must be greater than zero." }

@ -0,0 +1,100 @@
package space.kscience.controls.time
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.first
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlin.coroutines.CoroutineContext
import kotlin.time.Duration.Companion.milliseconds
public class VirtualTimeManager(
startTime: Instant,
) : Clock {
private val _time = MutableStateFlow(startTime)
public val time: StateFlow<Instant> get() = _time
override fun now(): Instant = _time.value
private val markerTimes = mutableMapOf<Any, Instant>()
/**
* Set target of [handle] timeline to [to] and wait for it to happen
*/
public suspend fun advanceTime(handle: Any, to: Instant) {
val currentMarkerTime = markerTimes[handle] ?: now()
require(to > currentMarkerTime) { "The advanced time for marker `$handle` $to is less that current marker time $currentMarkerTime" }
markerTimes[handle] = to
// advance time if necessary
_time.emit(markerTimes.values.min())
// wait for time to exceed marker time
time.first { it >= to }
}
}
@OptIn(InternalCoroutinesApi::class)
public class VirtualTimeDispatcher internal constructor(
private val coroutineContext: CoroutineContext,
private val virtualTimeManager: VirtualTimeManager
) : CoroutineDispatcher(), Delay {
private val scope = CoroutineScope(coroutineContext)
public val dispatcher: CoroutineDispatcher =
coroutineContext[CoroutineDispatcher] ?: Dispatchers.Default
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = dispatcher.dispatch(context, block)
override fun limitedParallelism(
parallelism: Int,
name: String?
): CoroutineDispatcher = VirtualTimeDispatcher(
coroutineContext = coroutineContext + dispatcher.limitedParallelism(parallelism, name),
virtualTimeManager = virtualTimeManager
)
override fun isDispatchNeeded(context: CoroutineContext): Boolean = dispatcher.isDispatchNeeded(context)
@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
dispatcher.dispatchYield(context, block)
}
override fun toString(): String = dispatcher.toString()
override fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? = dispatcher[key]
override fun scheduleResumeAfterDelay(
timeMillis: Long,
continuation: CancellableContinuation<Unit>
) {
val handle = continuation.context[Job] ?: error("Can't use VirtualTimeDispatcher without Job")
val scheduledJob = scope.launch {
virtualTimeManager.advanceTime(handle, virtualTimeManager.time.value + timeMillis.milliseconds)
dispatcher.dispatch(
continuation.context,
Runnable {
@OptIn(ExperimentalCoroutinesApi::class)
with(dispatcher) { with(continuation) { resumeUndispatched(Unit) } }
}
)
}
continuation.disposeOnCancellation {
scheduledJob.cancel()
}
}
}
public fun CoroutineContext.withVirtualTime(
virtualTimeManager: VirtualTimeManager
): CoroutineContext = if (this[Job] != null) {
this
} else {
//add job if it is not present
plus(Job(null))
}.plus(VirtualTimeDispatcher(this, virtualTimeManager))

@ -5,7 +5,6 @@ plugins {
kscience { kscience {
fullStack("js/controls-jupyter.js") fullStack("js/controls-jupyter.js")
useKtor()
useContextReceivers() useContextReceivers()
jupyterLibrary("space.kscience.controls.jupyter.ControlsJupyter") jupyterLibrary("space.kscience.controls.jupyter.ControlsJupyter")
dependencies { dependencies {
@ -17,6 +16,7 @@ kscience {
//FIXME remove after VisionForge 0.5 //FIXME remove after VisionForge 0.5
api("org.jetbrains.kotlin-wrappers:kotlin-extensions:1.0.1-pre.823") api("org.jetbrains.kotlin-wrappers:kotlin-extensions:1.0.1-pre.823")
} }
jvmMain { jvmMain {
implementation(spclibs.logback.classic) implementation(spclibs.logback.classic)
} }

@ -1,7 +1,7 @@
import space.kscience.plotly.PlotlyPlugin
import space.kscience.visionforge.html.runVisionClient import space.kscience.visionforge.html.runVisionClient
import space.kscience.visionforge.jupyter.VFNotebookClient import space.kscience.visionforge.jupyter.VFNotebookClient
import space.kscience.visionforge.markup.MarkupPlugin import space.kscience.visionforge.markup.MarkupPlugin
import space.kscience.visionforge.plotly.PlotlyPlugin
public fun main(): Unit = runVisionClient { public fun main(): Unit = runVisionClient {
// plugin(DeviceManager) // plugin(DeviceManager)

@ -9,13 +9,12 @@ description = """
kscience { kscience {
fullStack("js/controls-vision.js") fullStack("js/controls-vision.js")
useKtor()
useSerialization() useSerialization()
useContextReceivers() useContextReceivers()
commonMain { commonMain {
api(projects.controlsCore) api(projects.controlsCore)
api(projects.controlsConstructor) api(projects.controlsConstructor)
api(libs.visionforge.plotly) api(libs.plotlykt.core)
api(libs.visionforge.markdown) api(libs.visionforge.markdown)
// api("space.kscience:tables-kt:0.2.1") // api("space.kscience:tables-kt:0.2.1")
// api("space.kscience:visionforge-tables:$visionforgeVersion") // api("space.kscience:visionforge-tables:$visionforgeVersion")

@ -19,12 +19,7 @@ import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.* import space.kscience.dataforge.meta.*
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import space.kscience.plotly.Plot import space.kscience.plotly.Plot
import space.kscience.plotly.bar import space.kscience.plotly.models.*
import space.kscience.plotly.models.Bar
import space.kscience.plotly.models.Scatter
import space.kscience.plotly.models.Trace
import space.kscience.plotly.models.TraceValues
import space.kscience.plotly.scatter
import kotlin.time.Duration import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds import kotlin.time.Duration.Companion.seconds

@ -1,8 +1,8 @@
package space.kscience.controls.vision package space.kscience.controls.vision
import space.kscience.plotly.PlotlyPlugin
import space.kscience.visionforge.html.runVisionClient import space.kscience.visionforge.html.runVisionClient
import space.kscience.visionforge.markup.MarkupPlugin import space.kscience.visionforge.markup.MarkupPlugin
import space.kscience.visionforge.plotly.PlotlyPlugin
public fun main(): Unit = runVisionClient { public fun main(): Unit = runVisionClient {
plugin(PlotlyPlugin) plugin(PlotlyPlugin)

@ -6,22 +6,17 @@ import io.ktor.server.engine.embeddedServer
import io.ktor.server.http.content.staticResources import io.ktor.server.http.content.staticResources
import io.ktor.server.routing.Routing import io.ktor.server.routing.Routing
import io.ktor.server.routing.routing import io.ktor.server.routing.routing
import kotlinx.html.TagConsumer
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.plotly.Plot import space.kscience.plotly.PlotlyPlugin
import space.kscience.plotly.PlotlyConfig
import space.kscience.visionforge.html.HtmlVisionFragment import space.kscience.visionforge.html.HtmlVisionFragment
import space.kscience.visionforge.html.VisionPage import space.kscience.visionforge.html.VisionPage
import space.kscience.visionforge.html.VisionTagConsumer
import space.kscience.visionforge.markup.MarkupPlugin import space.kscience.visionforge.markup.MarkupPlugin
import space.kscience.visionforge.plotly.PlotlyPlugin
import space.kscience.visionforge.plotly.plotly
import space.kscience.visionforge.server.VisionRoute import space.kscience.visionforge.server.VisionRoute
import space.kscience.visionforge.server.openInBrowser import space.kscience.visionforge.server.openInBrowser
import space.kscience.visionforge.server.visionPage import space.kscience.visionforge.server.visionPage
import space.kscience.visionforge.visionManager import space.kscience.visionforge.visionManager
public fun Context.showDashboard( public suspend fun Context.showDashboard(
port: Int = 7777, port: Int = 7777,
routes: Routing.() -> Unit = {}, routes: Routing.() -> Unit = {},
configurationBuilder: VisionRoute.() -> Unit = {}, configurationBuilder: VisionRoute.() -> Unit = {},
@ -43,7 +38,7 @@ public fun Context.showDashboard(
visionPage( visionPage(
visualisationContext.visionManager, visualisationContext.visionManager,
VisionPage.scriptHeader("js/controls-vision.js"), VisionPage.scriptHeader("js/controls-vision.js"),
configurationBuilder = configurationBuilder, routeConfiguration = configurationBuilder,
visionFragment = visionFragment visionFragment = visionFragment
) )
}.also { }.also {
@ -60,12 +55,12 @@ public fun Context.showDashboard(
} }
} }
context(VisionTagConsumer<*>) //context(consumer: VisionTagConsumer<*>)
public fun TagConsumer<*>.plot( //public fun TagConsumer<*>.plot(
config: PlotlyConfig = PlotlyConfig(), // config: PlotlyConfig = PlotlyConfig(),
block: Plot.() -> Unit, // block: Plot.() -> Unit,
) { //) {
vision { // vision {
plotly(config, block) // plotly(config, block)
} // }
} //}

@ -13,7 +13,6 @@ description = """
kscience { kscience {
jvm() jvm()
useKtor()
useSerialization() useSerialization()
useContextReceivers() useContextReceivers()
commonMain { commonMain {

@ -10,6 +10,7 @@ import io.github.koalaplot.core.xygraph.DefaultPoint
import io.github.koalaplot.core.xygraph.XYGraphScope import io.github.koalaplot.core.xygraph.XYGraphScope
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
import space.kscience.controls.api.Device import space.kscience.controls.api.Device
import space.kscience.controls.api.PropertyChangedMessage import space.kscience.controls.api.PropertyChangedMessage
@ -19,7 +20,6 @@ import space.kscience.controls.constructor.units.NumericalValue
import space.kscience.controls.constructor.values import space.kscience.controls.constructor.values
import space.kscience.controls.spec.DevicePropertySpec import space.kscience.controls.spec.DevicePropertySpec
import space.kscience.controls.spec.name import space.kscience.controls.spec.name
import space.kscience.controls.time.AsyncClock
import space.kscience.controls.time.ValueWithTime import space.kscience.controls.time.ValueWithTime
import space.kscience.controls.time.clock import space.kscience.controls.time.clock
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
@ -41,7 +41,7 @@ internal fun <T> Flow<ValueWithTime<T>>.collectAndTrim(
maxAge: Duration = defaultMaxAge, maxAge: Duration = defaultMaxAge,
maxPoints: Int = defaultMaxPoints, maxPoints: Int = defaultMaxPoints,
minPoints: Int = defaultMinPoints, minPoints: Int = defaultMinPoints,
clock: AsyncClock = Global.clock, clock: Clock = Global.clock,
): Flow<List<ValueWithTime<T>>> { ): Flow<List<ValueWithTime<T>>> {
require(maxPoints > 2) require(maxPoints > 2)
require(minPoints > 0) require(minPoints > 0)
@ -222,7 +222,7 @@ public fun XYGraphScope<Instant, Double>.PlotAveragedDeviceProperty(
var points by remember { mutableStateOf<List<ValueWithTime<Double>>>(emptyList()) } var points by remember { mutableStateOf<List<ValueWithTime<Double>>>(emptyList()) }
LaunchedEffect(device, propertyName, startValue, maxAge, maxPoints, minPoints, averagingInterval) { LaunchedEffect(device, propertyName, startValue, maxAge, maxPoints, minPoints, averagingInterval) {
val clock: AsyncClock = device.clock val clock: Clock = device.clock
var lastValue = startValue var lastValue = startValue
device.propertyMessageFlow(propertyName) device.propertyMessageFlow(propertyName)
.chunkedByPeriod(averagingInterval) .chunkedByPeriod(averagingInterval)

@ -36,7 +36,7 @@ dependencies {
kotlin{ kotlin{
jvmToolchain(17) jvmToolchain(17)
compilerOptions { compilerOptions {
freeCompilerArgs.addAll("-Xjvm-default=all", "-Xopt-in=kotlin.RequiresOptIn") freeCompilerArgs.addAll("-Xjvm-default=all", "-Xopt-in=kotlin.RequiresOptIn", "-Xcontext-parameters")
} }
} }

@ -9,7 +9,8 @@ import androidx.compose.ui.unit.dp
import androidx.compose.ui.window.Window import androidx.compose.ui.window.Window
import androidx.compose.ui.window.application import androidx.compose.ui.window.application
import androidx.compose.ui.window.rememberWindowState import androidx.compose.ui.window.rememberWindowState
import io.ktor.server.engine.ApplicationEngine import io.ktor.server.application.port
import io.ktor.server.engine.EmbeddedServer
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
@ -46,8 +47,8 @@ private val json = Json { prettyPrint = true }
class DemoController : ContextAware { class DemoController : ContextAware {
var device: DemoDevice? = null var device: DemoDevice? = null
var magixServer: ApplicationEngine? = null var magixServer: EmbeddedServer<*,*>? = null
var visualizer: ApplicationEngine? = null var visualizer: EmbeddedServer<*,*>? = null
val opcUaServer: OpcUaServer = OpcUaServer { val opcUaServer: OpcUaServer = OpcUaServer {
setApplicationName(LocalizedText.english("space.kscience.controls.opcua")) setApplicationName(LocalizedText.english("space.kscience.controls.opcua"))
@ -174,7 +175,7 @@ fun DemoControls(controller: DemoController) {
onClick = { onClick = {
controller.visualizer?.run { controller.visualizer?.run {
val host = "localhost"//environment.connectors.first().host val host = "localhost"//environment.connectors.first().host
val port = environment.connectors.first().port val port = environment.config.port
val uri = URI("http", null, host, port, "/", null, null) val uri = URI("http", null, host, port, "/", null, null)
Desktop.getDesktop().browse(uri) Desktop.getDesktop().browse(uri)
} }

@ -1,6 +1,6 @@
package space.kscience.controls.demo package space.kscience.controls.demo
import io.ktor.server.engine.ApplicationEngine import io.ktor.server.engine.EmbeddedServer
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
@ -18,9 +18,8 @@ import space.kscience.plotly.Plotly
import space.kscience.plotly.layout import space.kscience.plotly.layout
import space.kscience.plotly.models.Trace import space.kscience.plotly.models.Trace
import space.kscience.plotly.plot import space.kscience.plotly.plot
import space.kscience.plotly.server.PlotlyUpdateMode
import space.kscience.plotly.server.serve
import space.kscience.plotly.trace import space.kscience.plotly.trace
import space.kscience.visionforge.plotly.serveSinglePage
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
/** /**
@ -53,7 +52,7 @@ suspend fun Trace.updateXYFrom(flow: Flow<Iterable<Pair<Double, Double>>>) {
} }
fun CoroutineScope.startDemoDeviceServer(magixEndpoint: MagixEndpoint): ApplicationEngine { fun CoroutineScope.startDemoDeviceServer(magixEndpoint: MagixEndpoint): EmbeddedServer<*, *> {
//share subscription to a parse message only once //share subscription to a parse message only once
val subscription = magixEndpoint.subscribe(DeviceManager.magixFormat).shareIn(this, SharingStarted.Lazily) val subscription = magixEndpoint.subscribe(DeviceManager.magixFormat).shareIn(this, SharingStarted.Lazily)
@ -69,70 +68,68 @@ fun CoroutineScope.startDemoDeviceServer(magixEndpoint: MagixEndpoint): Applicat
(payload as? PropertyChangedMessage)?.takeIf { it.property == DemoDevice.coordinates.name } (payload as? PropertyChangedMessage)?.takeIf { it.property == DemoDevice.coordinates.name }
}.map { it.value } }.map { it.value }
return Plotly.serve(port = 9091, scope = this) { return Plotly.serveSinglePage(port = 9091, routeConfiguration = {
updateMode = PlotlyUpdateMode.PUSH
updateInterval = 100 updateInterval = 100
page { container -> }) {
link { link {
rel = "stylesheet" rel = "stylesheet"
href = "https://stackpath.bootstrapcdn.com/bootstrap/4.5.0/css/bootstrap.min.css" href = "https://stackpath.bootstrapcdn.com/bootstrap/4.5.0/css/bootstrap.min.css"
attributes["integrity"] = "sha384-9aIt2nRpC12Uk9gS9baDl411NQApFmC26EwAOH8WgZl5MYYxFfc+NcPb1dKGj7Sk" attributes["integrity"] = "sha384-9aIt2nRpC12Uk9gS9baDl411NQApFmC26EwAOH8WgZl5MYYxFfc+NcPb1dKGj7Sk"
attributes["crossorigin"] = "anonymous" attributes["crossorigin"] = "anonymous"
}
div("row") {
div("col-6") {
plot(renderer = container) {
layout {
title = "sin property"
xaxis.title = "point index"
yaxis.title = "sin"
}
trace {
launch {
val flow: Flow<Iterable<Double>> = sinFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
}
}
}
}
div("col-6") {
plot(renderer = container) {
layout {
title = "cos property"
xaxis.title = "point index"
yaxis.title = "cos"
}
trace {
launch {
val flow: Flow<Iterable<Double>> = cosFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
}
}
}
}
}
div("row") {
div("col-12") {
plot(renderer = container) {
layout {
title = "cos vs sin"
xaxis.title = "sin"
yaxis.title = "cos"
}
trace {
name = "non-synchronized"
launch {
val flow: Flow<Iterable<Pair<Double, Double>>> = sinCosFlow.mapNotNull {
it["x"].double!! to it["y"].double!!
}.windowed(30)
updateXYFrom(flow)
}
}
}
}
}
} }
div("row") {
div("col-6") {
plot{
layout {
title = "sin property"
xaxis.title = "point index"
yaxis.title = "sin"
}
trace {
launch {
val flow: Flow<Iterable<Double>> = sinFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
}
}
}
}
div("col-6") {
plot{
layout {
title = "cos property"
xaxis.title = "point index"
yaxis.title = "cos"
}
trace {
launch {
val flow: Flow<Iterable<Double>> = cosFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
}
}
}
}
}
div("row") {
div("col-12") {
plot{
layout {
title = "cos vs sin"
xaxis.title = "sin"
yaxis.title = "cos"
}
trace {
name = "non-synchronized"
launch {
val flow: Flow<Iterable<Pair<Double, Double>>> = sinCosFlow.mapNotNull {
it["x"].double!! to it["y"].double!!
}.windowed(30)
updateXYFrom(flow)
}
}
}
}
}
} }
} }

@ -9,7 +9,6 @@ plugins {
kscience { kscience {
jvm() jvm()
useKtor()
useSerialization() useSerialization()
useContextReceivers() useContextReceivers()
commonMain { commonMain {

@ -271,12 +271,12 @@ fun main() = application {
second(400.dp) { second(400.dp) {
ChartLayout { ChartLayout {
XYGraph<Instant, Double>( XYGraph<Instant, Double>(
xAxisModel = remember { TimeAxisModel.recent(maxAge, clock) }, xAxisModel = remember { TimeAxisModel.recent(maxAge, context.clock) },
yAxisModel = rememberDoubleLinearAxisModel((range.start - 1.0)..(range.endInclusive + 1.0)), yAxisModel = rememberDoubleLinearAxisModel((range.start - 1.0)..(range.endInclusive + 1.0)),
xAxisTitle = { Text("Time in seconds relative to current") }, xAxisTitle = { Text("Time in seconds relative to current") },
xAxisLabels = { it: Instant -> xAxisLabels = { it: Instant ->
Text( Text(
(clock.now() - it).toDouble( (context.clock.now() - it).toDouble(
DurationUnit.SECONDS DurationUnit.SECONDS
).toString(2) ).toString(2)
) )

@ -29,10 +29,10 @@ import space.kscience.plotly.Plotly
import space.kscience.plotly.PlotlyConfig import space.kscience.plotly.PlotlyConfig
import space.kscience.plotly.layout import space.kscience.plotly.layout
import space.kscience.plotly.models.Bar import space.kscience.plotly.models.Bar
import space.kscience.plotly.models.invoke
import space.kscience.plotly.plot import space.kscience.plotly.plot
import space.kscience.plotly.server.PlotlyUpdateMode import space.kscience.visionforge.plotly.serveSinglePage
import space.kscience.plotly.server.serve import space.kscience.visionforge.server.openInBrowser
import space.kscience.plotly.server.show
import space.kscince.magix.zmq.ZmqMagixFlowPlugin import space.kscince.magix.zmq.ZmqMagixFlowPlugin
import space.kscince.magix.zmq.zmq import space.kscince.magix.zmq.zmq
import kotlin.random.Random import kotlin.random.Random
@ -117,24 +117,22 @@ suspend fun main() {
} }
} }
val application = Plotly.serve(port = 9091) { val application = Plotly.serveSinglePage(port = 9091, routeConfiguration = {
updateMode = PlotlyUpdateMode.PUSH
updateInterval = 1000 updateInterval = 1000
}) {
page { container -> plot(config = PlotlyConfig { saveAsSvg() }) {
plot(renderer = container, config = PlotlyConfig { saveAsSvg() }) { layout {
layout {
// title = "Latest event" // title = "Latest event"
xaxis.title = "Device number" xaxis.title = "Device number"
yaxis.title = "Maximum latency in ms" yaxis.title = "Maximum latency in ms"
}
traces(trace)
} }
traces(trace)
} }
} }
application.show()
application.openInBrowser()
while (readlnOrNull().isNullOrBlank()) { while (readlnOrNull().isNullOrBlank()) {

@ -4,9 +4,9 @@ import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.aSocket import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel import io.ktor.network.sockets.openWriteChannel
import io.ktor.util.InternalAPI
import io.ktor.util.moveToByteArray import io.ktor.util.moveToByteArray
import io.ktor.utils.io.writeAvailable import io.ktor.utils.io.read
import io.ktor.utils.io.writeByteArray
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
@ -17,7 +17,6 @@ val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
throwable.printStackTrace() throwable.printStackTrace()
} }
@OptIn(InternalAPI::class)
fun Context.launchPiDebugServer(port: Int, axes: List<String>): Job = launch(exceptionHandler) { fun Context.launchPiDebugServer(port: Int, axes: List<String>): Job = launch(exceptionHandler) {
val virtualDevice = PiMotionMasterVirtualDevice(this@launchPiDebugServer, axes) val virtualDevice = PiMotionMasterVirtualDevice(this@launchPiDebugServer, axes)
aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().bind("localhost", port).use { server -> aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().bind("localhost", port).use { server ->
@ -32,7 +31,7 @@ fun Context.launchPiDebugServer(port: Int, axes: List<String>): Job = launch(exc
val sendJob = virtualDevice.subscribe().onEach { val sendJob = virtualDevice.subscribe().onEach {
//println("Sending: ${it.decodeToString()}") //println("Sending: ${it.decodeToString()}")
output.writeAvailable(it) output.writeByteArray(it)
output.flush() output.flush()
}.launchIn(this) }.launchIn(this)

@ -9,4 +9,4 @@ org.gradle.jvmargs=-Xmx4096m
org.jetbrains.dokka.experimental.gradle.pluginMode=V2Enabled org.jetbrains.dokka.experimental.gradle.pluginMode=V2Enabled
toolsVersion=0.16.1-kotlin-2.1.0 toolsVersion=0.17.1-kotlin-2.1.20

@ -1,6 +1,6 @@
[versions] [versions]
dataforge = "0.10.0" dataforge = "0.10.1"
rsocket = "0.16.0" rsocket = "0.16.0"
xodus = "2.0.1" xodus = "2.0.1"
@ -10,8 +10,6 @@ fazecast = "2.10.3"
tornadofx = "1.7.20" tornadofx = "1.7.20"
plotlykt = "0.7.2"
logback = "1.2.11" logback = "1.2.11"
hivemq = "1.3.1" hivemq = "1.3.1"
@ -29,7 +27,7 @@ pi4j-ktx = "2.4.0"
plc4j = "0.12.0" plc4j = "0.12.0"
visionforge = "0.4.2" visionforge = "0.5.0"
[libraries] [libraries]
@ -50,7 +48,9 @@ jSerialComm = { module = "com.fazecast:jSerialComm", version.ref = "fazecast" }
tornadofx = { module = "no.tornado:tornadofx", version.ref = "tornadofx" } tornadofx = { module = "no.tornado:tornadofx", version.ref = "tornadofx" }
plotlykt-server = { module = "space.kscience:plotlykt-server", version.ref = "plotlykt" } plotlykt-core = { module = "space.kscience:plotly-kt-core", version.ref = "visionforge" }
plotlykt-server = { module = "space.kscience:plotly-kt-server", version.ref = "visionforge" }
logback-classic = { module = "ch.qos.logback:logback-classic", version.ref = "logback" } logback-classic = { module = "ch.qos.logback:logback-classic", version.ref = "logback" }
@ -74,7 +74,6 @@ pi4j-plugin-pigpio = { module = "com.pi4j:pi4j-plugin-pigpio", version.ref = "pi
plc4j-spi = { module = "org.apache.plc4x:plc4j-spi", version.ref = "plc4j" } plc4j-spi = { module = "org.apache.plc4x:plc4j-spi", version.ref = "plc4j" }
visionforge-jupiter = { module = "space.kscience:visionforge-jupyter", version.ref = "visionforge" } visionforge-jupiter = { module = "space.kscience:visionforge-jupyter", version.ref = "visionforge" }
visionforge-plotly = { module = "space.kscience:visionforge-plotly", version.ref = "visionforge" }
visionforge-markdown = { module = "space.kscience:visionforge-markdown", version.ref = "visionforge" } visionforge-markdown = { module = "space.kscience:visionforge-markdown", version.ref = "visionforge" }
visionforge-server = { module = "space.kscience:visionforge-server", version.ref = "visionforge" } visionforge-server = { module = "space.kscience:visionforge-server", version.ref = "visionforge" }
visionforge-compose-html = { module = "space.kscience:visionforge-compose-html", version.ref = "visionforge" } visionforge-compose-html = { module = "space.kscience:visionforge-compose-html", version.ref = "visionforge" }

@ -9,6 +9,9 @@ import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
/**
* A timeline that could be forked. The events from the fork appear in parent timeline events, but not vise versa.
*/
public interface ForkingTimeline<E : Any> : CollectingTimeline<E> { public interface ForkingTimeline<E : Any> : CollectingTimeline<E> {
public suspend fun fork(): ForkingTimeline<E> public suspend fun fork(): ForkingTimeline<E>
} }
@ -84,11 +87,12 @@ public class TreeTimeline<E : Any>(
override suspend fun collect(upTo: Instant) = mutex.withLock { override suspend fun collect(upTo: Instant) = mutex.withLock {
require(upTo >= time.value) { "Requested time $upTo is lower than observed ${time.value}" } require(upTo >= time.value) { "Requested time $upTo is lower than observed ${time.value}" }
events().takeWhile { TODO("Not yet implemented")
timeOf(it) <= upTo // events().takeWhile {
}.collect { // timeOf(it) <= upTo
channel.send(it) // }.collect {
} // channel.send(it)
// }
} }
override fun close() { override fun close() {

@ -1,6 +1,7 @@
package space.kscience.simulation package space.kscience.simulation
import kotlinx.coroutines.isActive import kotlinx.coroutines.isActive
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
import kotlin.test.Test import kotlin.test.Test
@ -14,6 +15,8 @@ class TimelineTests {
fun testGeneration() = runTest(timeout = 1.seconds) { fun testGeneration() = runTest(timeout = 1.seconds) {
val startTime = Instant.parse("2020-01-01T00:00:00.000Z") val startTime = Instant.parse("2020-01-01T00:00:00.000Z")
StandardTestDispatcher()
val generation = GeneratingTimeline( val generation = GeneratingTimeline(
origin = TimelineEvent(startTime, Unit), origin = TimelineEvent(startTime, Unit),
lookaheadInterval = 1.seconds, lookaheadInterval = 1.seconds,