Compare commits

...

7 Commits

80 changed files with 1132 additions and 550 deletions
CHANGELOG.md
controls-constructor
build.gradle.kts
src
commonMain/kotlin/space/kscience/controls/constructor
commonTest/kotlin/space/kscience/controls/constructor
controls-core/src
commonMain/kotlin/space/kscience/controls
commonTest/kotlin/space/kscience/controls/api
jsMain/kotlin/space/kscience/controls/time
jvmMain/kotlin/space/kscience/controls/time
jvmTest/kotlin/space/kscience/controls/time
nativeMain/kotlin/space/kscience/controls/time
wasmJsMain/kotlin/space/kscience/controls
controls-jupyter
controls-magix
build.gradle.kts
src
commonMain/kotlin/space/kscience/controls/client
extra
controls-opcua/src/main/kotlin/space/kscience/controls/opcua/client
controls-ports-ktor/src/jvmMain/kotlin/space/kscience/controls/ports
controls-server/src/jvmMain/kotlin/space/kscience/controls/server
controls-storage
controls-xodus/src/jvmTest/kotlin
src/commonMain/kotlin/space/kscience/controls/storage
controls-vision
controls-visualisation-compose
build.gradle.kts
src/commonMain/kotlin
demo
all-things
build.gradle.kts
src/main/kotlin/space/kscience/controls/demo
car/src/jvmMain/kotlin/space/kscience/controls/demo/car
constructor
device-collective/src/jvmMain/kotlin
many-devices
build.gradle.kts
src/main/kotlin/space/kscience/controls/demo
motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster
gradle.properties
gradle
magix
magix-api
magix-java-endpoint
magix-rsocket
build.gradle.kts
src
commonMain/kotlin/space/kscience/magix/rsocket
jvmMain/kotlin/space/kscience/magix/rsocket
linuxX64Main/kotlin/rsocket
magix-server
build.gradle.kts
src/jvmMain/kotlin/space/kscience/magix/server
simulation-kt/src

@ -18,6 +18,7 @@
- `DeviceHub` now works with `Name` instead of `NameToken`. Tree-like structure is made using `Path`. Device messages no longer have access to sub-devices.
- Add some utility methods to ports. Synchronous port response could be now consumed as `Source`.
- `DeviceLifecycleState` is replaced by `LifecycleState`.
- Time is now mandatory first field of all device messages
### Deprecated

@ -14,8 +14,10 @@ kscience{
wasm()
useCoroutines()
useSerialization()
commonMain {
api(projects.controlsCore)
api(projects.simulationKt)
}
commonTest{

@ -5,7 +5,7 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.*
import kotlinx.datetime.Instant
import space.kscience.controls.api.Device
import space.kscience.controls.manager.ClockManager
import space.kscience.controls.time.ClockManager
import space.kscience.dataforge.context.ContextAware
import space.kscience.dataforge.context.request
import kotlin.time.Duration

@ -2,11 +2,13 @@ package space.kscience.controls.constructor
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.datetime.Clock
import space.kscience.controls.api.*
import space.kscience.controls.api.LifecycleState.*
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.install
import space.kscience.controls.spec.DevicePropertySpec
import space.kscience.controls.time.clock
import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Laminate
import space.kscience.dataforge.meta.Meta
@ -16,7 +18,6 @@ import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.get
import space.kscience.dataforge.names.parseAsName
import kotlin.collections.set
import kotlin.coroutines.CoroutineContext
@ -66,6 +67,7 @@ public open class DeviceGroup(
context.launch {
sharedMessageFlow.emit(
DeviceErrorMessage(
time = clock.now(),
errorMessage = throwable.message,
errorType = throwable::class.simpleName,
errorStackTrace = throwable.stackTraceToString()
@ -109,8 +111,9 @@ public open class DeviceGroup(
state.valueFlow.map(converter::convert).onEach {
sharedMessageFlow.emit(
PropertyChangedMessage(
descriptor.name,
it
time = clock.now(),
property = descriptor.name,
value = it
)
)
}.launchIn(this)
@ -172,7 +175,7 @@ public open class DeviceGroup(
private suspend fun setLifecycleState(lifecycleState: LifecycleState) {
this.lifecycleState = lifecycleState
sharedMessageFlow.emit(
DeviceLifeCycleMessage(lifecycleState)
DeviceLifeCycleMessage(clock.now(), lifecycleState)
)
}
@ -194,6 +197,8 @@ public open class DeviceGroup(
super.stop()
}
override val clock: Clock = context.clock
public companion object {
}

@ -3,13 +3,14 @@ package space.kscience.controls.constructor
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.newCoroutineContext
import space.kscience.controls.time.clock
import space.kscience.dataforge.context.Context
import kotlin.coroutines.CoroutineContext
public abstract class ModelConstructor(
final override val context: Context,
vararg dependencies: DeviceState<*>,
) : StateContainer, CoroutineScope {
) : StateContainer, CoroutineScope{
@OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class)
override val coroutineContext: CoroutineContext = context.newCoroutineContext(SupervisorJob())
@ -30,4 +31,6 @@ public abstract class ModelConstructor(
override fun unregisterElement(constructorElement: ConstructorElement) {
_constructorElements.remove(constructorElement)
}
}
}
public val ModelConstructor.clock get() = context.clock

@ -6,7 +6,7 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.datetime.Instant
import space.kscience.controls.manager.ClockManager
import space.kscience.controls.time.ClockManager
import kotlin.time.Duration
/**
@ -20,14 +20,15 @@ import kotlin.time.Duration
public class TimerState(
public val clockManager: ClockManager,
public val tick: Duration,
initialValue: Instant = Instant.DISTANT_PAST,
) : DeviceState<Instant> {
private val clock = MutableStateFlow(clockManager.clock.now())
private val clock = MutableStateFlow(initialValue)
private val updateJob = clockManager.context.launch(clockManager.asDispatcher()) {
private val updateJob = clockManager.context.launch(clockManager.dispatcher) {
while (isActive) {
delay(tick)
clock.value = clockManager.clock.now()
delay(tick)
}
}

@ -7,7 +7,6 @@ import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import space.kscience.controls.constructor.*
import space.kscience.controls.constructor.units.*
import space.kscience.controls.manager.clock
import space.kscience.dataforge.context.Context
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
@ -48,8 +47,6 @@ public class PidRegulator<P : UnitsOfMeasurement, O : UnitsOfMeasurement>(
val mutex = Mutex()
val clock = context.clock
var lastTime = clock.now()
while (isActive) {

@ -4,7 +4,7 @@ import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.test.runTest
import space.kscience.controls.manager.ClockManager
import space.kscience.controls.time.ClockManager
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.context.request
import kotlin.test.Test

@ -4,6 +4,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.*
import kotlinx.datetime.Clock
import space.kscience.controls.api.Device.Companion.DEVICE_TARGET
import space.kscience.dataforge.context.ContextAware
import space.kscience.dataforge.context.info
@ -68,6 +69,11 @@ public interface Device : ContextAware, WithLifeCycle, CoroutineScope {
*/
override suspend fun start(): Unit = Unit
/**
* Clock associated with this device
*/
public val clock: Clock
/**
* Close and terminate the device. This function does not wait for the device to be closed.
*/

@ -2,9 +2,7 @@
package space.kscience.controls.api
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlinx.serialization.EncodeDefault
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
@ -31,10 +29,12 @@ public sealed class DeviceMessage {
public companion object {
public fun error(
time: Instant,
cause: Throwable,
sourceDevice: Name,
targetDevice: Name? = null,
): DeviceErrorMessage = DeviceErrorMessage(
time = time,
errorMessage = cause.message,
errorType = cause::class.simpleName,
errorStackTrace = cause.stackTraceToString(),
@ -54,12 +54,12 @@ public sealed class DeviceMessage {
@Serializable
@SerialName("property.changed")
public data class PropertyChangedMessage(
override val time: Instant,
public val property: String,
public val value: Meta,
override val sourceDevice: Name = Name.EMPTY,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
@ -70,12 +70,12 @@ public data class PropertyChangedMessage(
@Serializable
@SerialName("property.set")
public data class PropertySetMessage(
override val time: Instant,
public val property: String,
public val value: Meta,
override val sourceDevice: Name? = null,
override val targetDevice: Name?,
override val comment: String? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
}
@ -87,11 +87,11 @@ public data class PropertySetMessage(
@Serializable
@SerialName("property.get")
public data class PropertyGetMessage(
override val time: Instant,
public val property: String,
override val sourceDevice: Name? = null,
override val targetDevice: Name,
override val comment: String? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
}
@ -102,10 +102,10 @@ public data class PropertyGetMessage(
@Serializable
@SerialName("description.get")
public data class GetDescriptionMessage(
override val time: Instant,
override val sourceDevice: Name? = null,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
}
@ -116,13 +116,13 @@ public data class GetDescriptionMessage(
@Serializable
@SerialName("description")
public data class DescriptionMessage(
override val time: Instant,
val description: Meta,
val properties: Collection<PropertyDescriptor>,
val actions: Collection<ActionDescriptor>,
override val sourceDevice: Name,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
@ -135,13 +135,13 @@ public data class DescriptionMessage(
@Serializable
@SerialName("action.execute")
public data class ActionExecuteMessage(
override val time: Instant,
public val action: String,
public val argument: Meta?,
public val requestId: String,
override val sourceDevice: Name? = null,
override val targetDevice: Name,
override val comment: String? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
}
@ -154,13 +154,13 @@ public data class ActionExecuteMessage(
@Serializable
@SerialName("action.result")
public data class ActionResultMessage(
override val time: Instant,
public val action: String,
public val result: Meta?,
public val requestId: String,
override val sourceDevice: Name,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
@ -176,12 +176,12 @@ public data class ActionResultMessage(
@Serializable
@SerialName("binary.notification")
public data class BinaryNotificationMessage(
override val time: Instant,
val contentId: String,
val contentMeta: Meta,
override val sourceDevice: Name,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
@ -193,10 +193,10 @@ public data class BinaryNotificationMessage(
@Serializable
@SerialName("empty")
public data class EmptyDeviceMessage(
override val time: Instant,
override val sourceDevice: Name? = null,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
}
@ -207,12 +207,12 @@ public data class EmptyDeviceMessage(
@Serializable
@SerialName("log")
public data class DeviceLogMessage(
override val time: Instant,
val message: String,
val data: Meta? = null,
override val sourceDevice: Name = Name.EMPTY,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
@ -223,13 +223,13 @@ public data class DeviceLogMessage(
@Serializable
@SerialName("error")
public data class DeviceErrorMessage(
override val time: Instant,
public val errorMessage: String?,
public val errorType: String? = null,
public val errorStackTrace: String? = null,
override val sourceDevice: Name = Name.EMPTY,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
@ -240,11 +240,11 @@ public data class DeviceErrorMessage(
@Serializable
@SerialName("lifecycle")
public data class DeviceLifeCycleMessage(
override val time: Instant,
val state: LifecycleState,
override val sourceDevice: Name = Name.EMPTY,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
}

@ -1,7 +1,16 @@
package space.kscience.controls.misc
package space.kscience.controls
import kotlinx.datetime.Instant
import kotlinx.io.Sink
import kotlinx.io.Source
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.io.IOFormat
import space.kscience.dataforge.io.IOFormatFactory
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import kotlin.reflect.KType
import kotlin.reflect.typeOf
import kotlin.time.Duration
import kotlin.time.DurationUnit
import kotlin.time.toDuration
@ -40,6 +49,33 @@ private object InstantConverter : MetaConverter<Instant> {
public val MetaConverter.Companion.instant: MetaConverter<Instant> get() = InstantConverter
public fun Instant.toMeta(): Meta = Meta(toString())
public val Meta?.instant: Instant? get() = this?.value?.string?.let { Instant.parse(it) }
/**
* An [IOFormat] for [Instant]
*/
public object InstantIOFormat : IOFormat<Instant>, IOFormatFactory<Instant> {
override fun build(context: Context, meta: Meta): IOFormat<Instant> = this
override val name: Name = "instant".asName()
override val type: KType get() = typeOf<Instant>()
override fun writeTo(sink: Sink, obj: Instant) {
sink.writeLong(obj.epochSeconds)
sink.writeInt(obj.nanosecondsOfSecond)
}
override fun readFrom(source: Source): Instant {
val seconds = source.readLong()
val nanoseconds = source.readInt()
return Instant.fromEpochSeconds(seconds, nanoseconds)
}
}
private object DoubleRangeConverter : MetaConverter<ClosedFloatingPointRange<Double>> {
override fun readOrNull(source: Meta): ClosedFloatingPointRange<Double>? =
source.value?.doubleArray?.let { (start, end) ->
@ -52,11 +88,3 @@ private object DoubleRangeConverter : MetaConverter<ClosedFloatingPointRange<Dou
}
public val MetaConverter.Companion.doubleRange: MetaConverter<ClosedFloatingPointRange<Double>> get() = DoubleRangeConverter
private object StringListConverter : MetaConverter<List<String>> {
override fun convert(obj: List<String>): Meta = Meta(obj.map { it.asValue() }.asValue())
override fun readOrNull(source: Meta): List<String>? = source.stringList ?: source["@jsonArray"]?.stringList
}
public val MetaConverter.Companion.stringList: MetaConverter<List<String>> get() = StringListConverter

@ -1,109 +0,0 @@
package space.kscience.controls.manager
import kotlinx.coroutines.*
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import space.kscience.controls.api.Device
import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.double
import kotlin.coroutines.CoroutineContext
import kotlin.math.roundToLong
import kotlin.time.Duration
@OptIn(InternalCoroutinesApi::class)
private class CompressedTimeDispatcher(
val clockManager: ClockManager,
val dispatcher: CoroutineDispatcher,
val compression: Double,
) : CoroutineDispatcher(), Delay {
@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
dispatcher.dispatchYield(context, block)
}
override fun isDispatchNeeded(context: CoroutineContext): Boolean = dispatcher.isDispatchNeeded(context)
@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher = dispatcher.limitedParallelism(parallelism)
override fun dispatch(context: CoroutineContext, block: Runnable) {
dispatcher.dispatch(context, block)
}
private val delay = ((dispatcher as? Delay) ?: (Dispatchers.Default as Delay))
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
delay.scheduleResumeAfterDelay((timeMillis / compression).roundToLong(), continuation)
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
return delay.invokeOnTimeout((timeMillis / compression).roundToLong(), block, context)
}
}
private class CompressedClock(
val start: Instant,
val compression: Double,
val baseClock: Clock = Clock.System,
) : Clock {
override fun now(): Instant {
val elapsed = (baseClock.now() - start)
return start + elapsed / compression
}
}
public class ClockManager : AbstractPlugin() {
override val tag: PluginTag get() = Companion.tag
public val timeCompression: Double by meta.double(1.0)
public val clock: Clock by lazy {
if (timeCompression == 1.0) {
Clock.System
} else {
CompressedClock(Clock.System.now(), timeCompression)
}
}
/**
* Provide a [CoroutineDispatcher] with compressed time based on given [dispatcher]
*/
public fun asDispatcher(
dispatcher: CoroutineDispatcher = Dispatchers.Default,
): CoroutineDispatcher = if (timeCompression == 1.0) {
dispatcher
} else {
CompressedTimeDispatcher(this, dispatcher, timeCompression)
}
public fun scheduleWithFixedDelay(tick: Duration, block: suspend () -> Unit): Job = context.launch(asDispatcher()) {
while (isActive) {
delay(tick)
block()
}
}
public companion object : PluginFactory<ClockManager> {
override val tag: PluginTag = PluginTag("clock", group = PluginTag.DATAFORGE_GROUP)
override fun build(context: Context, meta: Meta): ClockManager = ClockManager()
}
}
public val Context.clock: Clock get() = plugins[ClockManager]?.clock ?: Clock.System
public val Device.clock: Clock get() = context.clock
public fun Device.getCoroutineDispatcher(dispatcher: CoroutineDispatcher = Dispatchers.Default): CoroutineDispatcher =
context.plugins[ClockManager]?.asDispatcher(dispatcher) ?: dispatcher
public fun ContextBuilder.withTimeCompression(compression: Double) {
require(compression > 0.0) { "Time compression must be greater than zero." }
plugin(ClockManager) {
"timeCompression" put compression
}
}

@ -15,25 +15,28 @@ public suspend fun Device.respondMessage(deviceTarget: Name, request: DeviceMess
when (request) {
is PropertyGetMessage -> {
PropertyChangedMessage(
time = clock.now(),
property = request.property,
value = getOrReadProperty(request.property),
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice
targetDevice = request.sourceDevice,
)
}
is PropertySetMessage -> {
writeProperty(request.property, request.value)
PropertyChangedMessage(
time = clock.now(),
property = request.property,
value = getOrReadProperty(request.property),
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice
targetDevice = request.sourceDevice,
)
}
is ActionExecuteMessage -> {
ActionResultMessage(
time = clock.now(),
action = request.action,
result = execute(request.action, request.argument),
requestId = request.requestId,
@ -44,6 +47,7 @@ public suspend fun Device.respondMessage(deviceTarget: Name, request: DeviceMess
is GetDescriptionMessage -> {
DescriptionMessage(
time = clock.now(),
description = meta,
properties = propertyDescriptors,
actions = actionDescriptors,
@ -60,10 +64,15 @@ public suspend fun Device.respondMessage(deviceTarget: Name, request: DeviceMess
is EmptyDeviceMessage,
is DeviceLogMessage,
is DeviceLifeCycleMessage,
-> null
-> null
}
} catch (ex: Exception) {
DeviceMessage.error(ex, sourceDevice = deviceTarget, targetDevice = request.sourceDevice)
DeviceMessage.error(
time = clock.now(),
cause = ex,
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice
)
}
/**
@ -82,7 +91,14 @@ public suspend fun DeviceHub.respondHubMessage(request: DeviceMessage): List<Dev
listOfNotNull(device.respondMessage(targetName, request))
}
} catch (ex: Exception) {
listOf(DeviceMessage.error(ex, sourceDevice = Name.EMPTY, targetDevice = request.sourceDevice))
listOf(
DeviceMessage.error(
time = request.time, //FIXME add actual time
cause = ex,
sourceDevice = Name.EMPTY,
targetDevice = request.sourceDevice
)
)
}
}

@ -1,42 +0,0 @@
package space.kscience.controls.misc
import kotlinx.datetime.Instant
import kotlinx.io.Sink
import kotlinx.io.Source
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.io.IOFormat
import space.kscience.dataforge.io.IOFormatFactory
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import kotlin.reflect.KType
import kotlin.reflect.typeOf
/**
* An [IOFormat] for [Instant]
*/
public object InstantIOFormat : IOFormat<Instant>, IOFormatFactory<Instant> {
override fun build(context: Context, meta: Meta): IOFormat<Instant> = this
override val name: Name = "instant".asName()
override val type: KType get() = typeOf<Instant>()
override fun writeTo(sink: Sink, obj: Instant) {
sink.writeLong(obj.epochSeconds)
sink.writeInt(obj.nanosecondsOfSecond)
}
override fun readFrom(source: Source): Instant {
val seconds = source.readLong()
val nanoseconds = source.readInt()
return Instant.fromEpochSeconds(seconds, nanoseconds)
}
}
public fun Instant.toMeta(): Meta = Meta(toString())
public val Meta.instant: Instant? get() = value?.string?.let { Instant.parse(it) }

@ -6,7 +6,9 @@ import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Clock
import space.kscience.controls.api.*
import space.kscience.controls.time.clock
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.debug
import space.kscience.dataforge.context.error
@ -72,6 +74,7 @@ public abstract class DeviceBase<D : Device>(
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
@OptIn(ExperimentalCoroutinesApi::class)
override val coroutineContext: CoroutineContext = context.newCoroutineContext(
SupervisorJob(context.coroutineContext[Job]) +
CoroutineName("Device $id") +
@ -79,6 +82,7 @@ public abstract class DeviceBase<D : Device>(
launch {
sharedMessageFlow.emit(
DeviceErrorMessage(
time = clock.now(),
errorMessage = throwable.message,
errorType = throwable::class.simpleName,
errorStackTrace = throwable.stackTraceToString()
@ -112,7 +116,7 @@ public abstract class DeviceBase<D : Device>(
logicalState[propertyName] = value
}
if (value != null) {
sharedMessageFlow.emit(PropertyChangedMessage(propertyName, value))
sharedMessageFlow.emit(PropertyChangedMessage(clock.now(), propertyName, value))
}
}
}
@ -194,7 +198,7 @@ public abstract class DeviceBase<D : Device>(
private suspend fun setLifecycleState(lifecycleState: LifecycleState) {
this.lifecycleState = lifecycleState
sharedMessageFlow.emit(
DeviceLifeCycleMessage(lifecycleState)
DeviceLifeCycleMessage(clock.now(), lifecycleState)
)
}
@ -223,6 +227,7 @@ public abstract class DeviceBase<D : Device>(
super.stop()
}
override val clock: Clock = context.clock
abstract override fun toString(): String

@ -4,7 +4,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import space.kscience.controls.api.Device
import space.kscience.controls.manager.getCoroutineDispatcher
import space.kscience.controls.time.coroutineDispatcher
import kotlin.time.Duration
/**
@ -16,7 +16,7 @@ public fun <D : Device> D.doRecurring(
task: suspend D.() -> Unit,
): Job {
val taskName = debugTaskName ?: "task[${task.hashCode().toString(16)}]"
val dispatcher = getCoroutineDispatcher()
val dispatcher = coroutineDispatcher
return launch(CoroutineName(taskName) + dispatcher) {
while (isActive) {
delay(interval)

@ -0,0 +1,127 @@
package space.kscience.controls.time
import kotlinx.coroutines.*
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import space.kscience.controls.api.Device
import space.kscience.controls.instant
import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.double
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import kotlin.coroutines.CoroutineContext
import kotlin.math.roundToLong
import kotlin.time.Duration
@OptIn(InternalCoroutinesApi::class)
private class CompressedTimeDispatcher(
val coroutineContext: CoroutineContext,
val compression: Double,
) : CoroutineDispatcher(), Delay {
val dispatcher = coroutineContext[CoroutineDispatcher] ?: Dispatchers.Default
@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
dispatcher.dispatchYield(context, block)
}
override fun isDispatchNeeded(context: CoroutineContext): Boolean = dispatcher.isDispatchNeeded(context)
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher =
dispatcher.limitedParallelism(parallelism, name)
override fun dispatch(context: CoroutineContext, block: Runnable) {
dispatcher.dispatch(context, block)
}
private val parentDelay = ((dispatcher as? Delay) ?: (Dispatchers.Default as Delay))
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
parentDelay.scheduleResumeAfterDelay((timeMillis / compression).roundToLong(), continuation)
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
parentDelay.invokeOnTimeout((timeMillis / compression).roundToLong(), block, context)
}
private class CompressedClock(
val baseClock: Clock = Clock.System,
val compression: Double,
val start: Instant = baseClock.now(),
) : Clock {
override fun now(): Instant {
val elapsed = (baseClock.now() - start)
return start + elapsed / compression
}
}
internal expect fun resolveClock(meta: Meta): Clock?
public sealed interface ClockMode {
public object System : ClockMode
public class Custom(public val clock: Clock) : ClockMode
public class Compressed(public val compression: Double) : ClockMode
public class Virtual(public val manager: VirtualTimeManager) : ClockMode
}
public class ClockManager : AbstractPlugin() {
override val tag: PluginTag get() = Companion.tag
public val clockMode: ClockMode = when (meta["clock.mode"].string) {
null, "system" -> ClockMode.System
"virtual" -> ClockMode.Virtual(VirtualTimeManager(meta["clock.start"]?.instant ?: Clock.System.now()))
"compressed" -> ClockMode.Compressed(meta["clock.compression"].double ?: 1.0)
else -> ClockMode.Custom(resolveClock(meta) ?: error("Can't resolve clock for $meta"))
}
public val clock: Clock = when (clockMode) {
ClockMode.System -> Clock.System
is ClockMode.Custom -> clockMode.clock
is ClockMode.Compressed -> CompressedClock(Clock.System, clockMode.compression)
is ClockMode.Virtual -> clockMode.manager
}
/**
* Provide a [CoroutineDispatcher] with compressed time based on context dispatcher
*/
public val dispatcher: CoroutineDispatcher by lazy {
when (clockMode) {
is ClockMode.System, is ClockMode.Custom -> context.coroutineContext[CoroutineDispatcher]
?: Dispatchers.Default
is ClockMode.Compressed -> CompressedTimeDispatcher(context.coroutineContext, clockMode.compression)
is ClockMode.Virtual -> VirtualTimeDispatcher(context.coroutineContext, clockMode.manager)
}
}
public fun scheduleWithFixedDelay(tick: Duration, block: suspend () -> Unit): Job = context.launch(dispatcher) {
while (isActive) {
delay(tick)
block()
}
}
public companion object : PluginFactory<ClockManager> {
override val tag: PluginTag = PluginTag("clock", group = PluginTag.DATAFORGE_GROUP)
override fun build(context: Context, meta: Meta): ClockManager = ClockManager()
}
}
public val Context.clock: Clock get() = plugins[ClockManager]?.clock ?: Clock.System
public val Device.coroutineDispatcher: CoroutineDispatcher
get() = context.plugins[ClockManager]?.dispatcher
?: context.coroutineContext[CoroutineDispatcher]
?: Dispatchers.Default
public fun ContextBuilder.withTimeCompression(compression: Double) {
require(compression > 0.0) { "Time compression must be greater than zero." }
plugin(ClockManager) {
"timeCompression" put compression
}
}

@ -1,4 +1,4 @@
package space.kscience.controls.misc
package space.kscience.controls.time
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.*

@ -1,8 +1,11 @@
package space.kscience.controls.misc
package space.kscience.controls.time
import kotlinx.datetime.Instant
import kotlinx.io.Sink
import kotlinx.io.Source
import space.kscience.controls.InstantIOFormat
import space.kscience.controls.instant
import space.kscience.controls.toMeta
import space.kscience.dataforge.io.IOFormat
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaConverter

@ -0,0 +1,159 @@
package space.kscience.controls.time
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlin.coroutines.CoroutineContext
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
public class VirtualTimeManager(
startTime: Instant,
) : Clock {
private val _time = MutableStateFlow(startTime)
private fun advanceTime() {
markerTimes.values.minOrNull()?.let {
_time.value = it
}
}
public val time: StateFlow<Instant> get() = _time
override fun now(): Instant = _time.value
private val markerTimes = mutableMapOf<Any, Instant>()
private val mutex = Mutex()
/**
* Read current time for the given [handle]. Handle time is always lower or equals to global manager time
*/
public suspend fun readTime(handle: Any): Instant = markerTimes[handle] ?: time.value
/**
* Set target of [handle] timeline to [to] and wait for it to happen
*/
public suspend fun advanceTimeTo(handle: Any, to: Instant) {
val currentMarkerTime = readTime(handle)
//if it is already the last instant - bypass
if (currentMarkerTime == to) return
// require that time is in the future
require(to > currentMarkerTime) { "The advanced time for marker `$handle` $to is less that current marker time $currentMarkerTime" }
// println("$handle locked at $currentMarkerTime")
mutex.withLock {
if(handle is Job && handle !in markerTimes.keys) {
//clear job marker on completion
handle.invokeOnCompletion {
markerTimes.remove(handle)
advanceTime()
}
}
markerTimes[handle] = to
// advance time if necessary
advanceTime()
}
// wait for time to exceed marker time
if (time.value < to) {
time.takeWhile {
it < to
}.collect()
}
// println("$handle unlocked at $currentMarkerTime")
}
/**
* Mark given [handle] as idle so its time could advance to the time after all other handles. Then wait for the time to advance.
*/
public suspend fun pass(handle: Any) {
advanceTimeTo(handle, markerTimes.values.max())
mutex.withLock {
markerTimes.remove(handle)
}
}
/**
* Mark the whole manager as idle and advance time to the maximum of all handles. Don't wait for time to advance
*/
public suspend fun pass(){
_time.value = markerTimes.values.max()
mutex.withLock {
markerTimes.clear()
}
}
}
public suspend fun VirtualTimeManager.advanceTimeBy(handle: Any, duration: Duration) {
advanceTimeTo(handle, readTime(handle) + duration)
}
@OptIn(InternalCoroutinesApi::class)
public class VirtualTimeDispatcher internal constructor(
private val coroutineContext: CoroutineContext,
private val virtualTimeManager: VirtualTimeManager
) : CoroutineDispatcher(), Delay {
private val scope = CoroutineScope(coroutineContext)
private val dispatcher: CoroutineDispatcher = coroutineContext[CoroutineDispatcher] ?: Dispatchers.Default
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = dispatcher.dispatch(context, block)
override fun limitedParallelism(
parallelism: Int,
name: String?
): CoroutineDispatcher = VirtualTimeDispatcher(
coroutineContext = dispatcher.limitedParallelism(parallelism, name),
virtualTimeManager = virtualTimeManager
)
override fun isDispatchNeeded(context: CoroutineContext): Boolean = dispatcher.isDispatchNeeded(context)
@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
dispatcher.dispatchYield(context, block)
}
override fun toString(): String = "VirtualTimeDispatcher($virtualTimeManager)"
override fun scheduleResumeAfterDelay(
timeMillis: Long,
continuation: CancellableContinuation<Unit>
) {
val handle = continuation.context[Job] ?: error("Can't use VirtualTimeDispatcher without Job")
val scheduledJob = scope.launch {
virtualTimeManager.advanceTimeBy(handle, timeMillis.milliseconds)
dispatcher.dispatch(
continuation.context,
Runnable {
@OptIn(ExperimentalCoroutinesApi::class)
with(dispatcher) { with(continuation) { resumeUndispatched(Unit) } }
}
)
}
continuation.disposeOnCancellation {
scheduledJob.cancel()
}
}
}
public fun CoroutineContext.withVirtualTime(
virtualTimeManager: VirtualTimeManager
): CoroutineContext = if (this[Job] != null) {
this
} else {
//add job if it is not present
plus(Job(null))
}.plus(VirtualTimeDispatcher(this, virtualTimeManager))

@ -1,15 +1,15 @@
package space.kscience.controls.api
import kotlinx.serialization.encodeToString
import kotlinx.datetime.Clock
import kotlinx.serialization.json.Json
import space.kscience.controls.misc.asMeta
import space.kscience.controls.asMeta
import kotlin.test.Test
import kotlin.test.assertEquals
class MessageTest {
@Test
fun messageSerialization() {
val changedMessage = PropertyChangedMessage("test", 22.0.asMeta())
val changedMessage = PropertyChangedMessage(Clock.System.now(),"test", 22.0.asMeta())
val json = Json.encodeToString(changedMessage)
val reconstructed: PropertyChangedMessage = Json.decodeFromString(json)
assertEquals(changedMessage.time, reconstructed.time)

@ -0,0 +1,6 @@
package space.kscience.controls.time
import kotlinx.datetime.Clock
import space.kscience.dataforge.meta.Meta
internal actual fun resolveClock(meta: Meta): Clock? = null

@ -0,0 +1,17 @@
package space.kscience.controls.time
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlinx.datetime.toKotlinInstant
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
internal actual fun resolveClock(meta: Meta): Clock? = when (meta["clock.mode"].string) {
"jvm" -> NanoClock
else -> null
}
public object NanoClock: Clock {
override fun now(): Instant = java.time.Instant.now().toKotlinInstant()
}

@ -0,0 +1,56 @@
package space.kscience.controls.time
import kotlinx.coroutines.*
import kotlinx.datetime.Instant
import kotlin.test.Test
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.milliseconds
private data class TimedResult(val time: Instant, val marker: String)
class VirtualTimeTest {
@Test
fun manualAdvance(): Unit {
val timeManager = VirtualTimeManager(Instant.fromEpochMilliseconds(0L))
val collector = mutableListOf<TimedResult>()
runBlocking(Dispatchers.Default) {
withTimeout(500) {
repeat(3) { series ->
launch {
timeManager.advanceTimeBy(series, 100.milliseconds * (series + 1))
repeat(10) { number ->
collector.add(TimedResult(timeManager.now(),"$series.$number"))
timeManager.advanceTimeBy(series, 2000.milliseconds)
}
timeManager.pass(series)
}
}
}
}
println(collector.joinToString("\n"))
assertTrue { collector.sortedBy { it.time } == collector }
}
@Test
fun contextAdvance(): Unit {
val timeManager = VirtualTimeManager(Instant.fromEpochMilliseconds(0L))
val collector = mutableListOf<TimedResult>()
runBlocking(Dispatchers.Default.withVirtualTime(timeManager)) {
withTimeout(500) {
repeat(3) { series ->
launch {
delay(100.milliseconds * (series + 1))
repeat((series + 1) * 10) { number ->
collector.add(TimedResult(timeManager.now(),"$series.$number"))
println(collector.last())
delay(2000.milliseconds)
}
//timeManager.pass(this)
}
}
}
}
println(collector.joinToString("\n"))
assertTrue { collector.sortedBy { it.time } == collector }
}
}

@ -0,0 +1,6 @@
package space.kscience.controls.time
import kotlinx.datetime.Clock
import space.kscience.dataforge.meta.Meta
internal actual fun resolveClock(meta: Meta): Clock? = null

@ -0,0 +1,6 @@
package space.kscience.controls.time
import kotlinx.datetime.Clock
import space.kscience.dataforge.meta.Meta
internal actual fun resolveClock(meta: Meta): Clock? = null

@ -5,13 +5,18 @@ plugins {
kscience {
fullStack("js/controls-jupyter.js")
useKtor()
useContextReceivers()
jupyterLibrary("space.kscience.controls.jupyter.ControlsJupyter")
dependencies {
implementation(projects.controlsVision)
implementation(libs.visionforge.jupiter)
}
jsMain{
//FIXME remove after VisionForge 0.5
api("org.jetbrains.kotlin-wrappers:kotlin-extensions:1.0.1-pre.823")
}
jvmMain {
implementation(spclibs.logback.classic)
}

@ -1,7 +1,7 @@
import space.kscience.plotly.PlotlyPlugin
import space.kscience.visionforge.html.runVisionClient
import space.kscience.visionforge.jupyter.VFNotebookClient
import space.kscience.visionforge.markup.MarkupPlugin
import space.kscience.visionforge.plotly.PlotlyPlugin
public fun main(): Unit = runVisionClient {
// plugin(DeviceManager)

@ -2,8 +2,8 @@ package space.kscience.controls.jupyter
import org.jetbrains.kotlinx.jupyter.api.declare
import org.jetbrains.kotlinx.jupyter.api.libraries.resources
import space.kscience.controls.manager.ClockManager
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.time.ClockManager
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.plotly.Plot

@ -13,7 +13,7 @@ kscience {
jvm()
js()
native()
wasm()
// wasm()
useCoroutines()
useSerialization {
json()

@ -8,10 +8,12 @@ import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Clock
import space.kscience.controls.api.*
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.spec.DevicePropertySpec
import space.kscience.controls.spec.name
import space.kscience.controls.time.clock
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
@ -67,7 +69,7 @@ public class DeviceClient internal constructor(
override suspend fun readProperty(propertyName: String): Meta {
send(
PropertyGetMessage(propertyName, targetDevice = deviceName)
PropertyGetMessage(clock.now(), propertyName, targetDevice = deviceName)
)
return messageFlow.filterIsInstance<PropertyChangedMessage>().first {
it.property == propertyName
@ -84,14 +86,25 @@ public class DeviceClient internal constructor(
override suspend fun writeProperty(propertyName: String, value: Meta) {
send(
PropertySetMessage(propertyName, value, targetDevice = deviceName)
PropertySetMessage(
time = clock.now(),
property = propertyName,
value = value,
targetDevice = deviceName
)
)
}
override suspend fun execute(actionName: String, argument: Meta?): Meta? {
val id = stringUID()
send(
ActionExecuteMessage(actionName, argument, id, targetDevice = deviceName)
ActionExecuteMessage(
time = clock.now(),
action = actionName,
argument = argument,
requestId = id,
targetDevice = deviceName
)
)
return messageFlow.filterIsInstance<ActionResultMessage>().first {
it.action == actionName && it.requestId == id
@ -103,6 +116,8 @@ public class DeviceClient internal constructor(
@DFExperimental
override val lifecycleState: LifecycleState get() = lifecycleStateFlow.value
override val clock: Clock = context.clock
}
/**
@ -135,7 +150,7 @@ public suspend fun MagixEndpoint.remoteDevice(
send(
format = DeviceManager.magixFormat,
payload = GetDescriptionMessage(targetDevice = deviceName),
payload = GetDescriptionMessage(Clock.System.now(), targetDevice = deviceName),
source = thisEndpoint,
target = deviceEndpoint,
id = stringUID()
@ -196,7 +211,7 @@ public suspend fun MagixEndpoint.remoteDeviceHub(
send(
format = DeviceManager.magixFormat,
payload = GetDescriptionMessage(targetDevice = null),
payload = GetDescriptionMessage(Clock.System.now(), targetDevice = null),
source = thisEndpoint,
target = deviceEndpoint,
id = stringUID()
@ -214,7 +229,7 @@ public suspend fun MagixEndpoint.requestDeviceUpdate(
) {
send(
format = DeviceManager.magixFormat,
payload = GetDescriptionMessage(),
payload = GetDescriptionMessage(Clock.System.now()),
source = thisEndpoint,
target = deviceEndpoint,
id = stringUID()
@ -247,6 +262,7 @@ public suspend fun <T> MagixEndpoint.sendControlsPropertyChange(
value: T,
) {
val message = PropertySetMessage(
Clock.System.now(),
property = propertySpec.name,
value = propertySpec.converter.convert(value),
targetDevice = deviceName

@ -13,8 +13,8 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.*
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.*
import org.opcfoundation.opcua.binaryschema.EnumeratedType
import org.opcfoundation.opcua.binaryschema.StructuredType
import space.kscience.controls.misc.instant
import space.kscience.controls.misc.toMeta
import space.kscience.controls.instant
import space.kscience.controls.toMeta
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName

@ -5,8 +5,8 @@ import io.ktor.network.sockets.SocketOptions
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.utils.io.consumeEachBufferRange
import io.ktor.utils.io.writeAvailable
import io.ktor.utils.io.read
import io.ktor.utils.io.writeByteArray
import kotlinx.coroutines.*
import space.kscience.controls.api.LifecycleState
import space.kscience.dataforge.context.Context
@ -15,7 +15,6 @@ import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.string
import java.nio.ByteBuffer
import kotlin.coroutines.CoroutineContext
public class KtorTcpPort internal constructor(
@ -42,21 +41,22 @@ public class KtorTcpPort internal constructor(
override fun onOpen() {
listenerJob = scope.launch {
val input = futureSocket.await().openReadChannel()
input.consumeEachBufferRange { buffer: ByteBuffer, last ->
val array = ByteArray(buffer.remaining())
buffer.get(array)
receive(array)
!last && isActive
while (!input.isClosedForRead && isActive) {
input.read { arraySource, begin, endExclusive ->
val array = arraySource.copyOfRange(begin, endExclusive)
receive(array)
array.size
}
}
}
}
override suspend fun write(data: ByteArray) {
writeChannel.await().writeAvailable(data)
writeChannel.await().writeByteArray(data)
}
override val lifecycleState: LifecycleState
get() = if(listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
get() = if (listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
override suspend fun stop() {
listenerJob?.cancel()

@ -1,11 +1,13 @@
package space.kscience.controls.ports
import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.*
import io.ktor.utils.io.ByteWriteChannel
import io.ktor.utils.io.consumeEachBufferRange
import io.ktor.utils.io.writeAvailable
import io.ktor.network.sockets.Datagram
import io.ktor.network.sockets.InetSocketAddress
import io.ktor.network.sockets.SocketOptions
import io.ktor.network.sockets.aSocket
import io.ktor.utils.io.core.ByteReadPacket
import kotlinx.coroutines.*
import kotlinx.io.readByteArray
import space.kscience.controls.api.LifecycleState
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
@ -36,30 +38,28 @@ public class KtorUdpPort internal constructor(
)
}
private val writeChannel: Deferred<ByteWriteChannel> = scope.async(Dispatchers.IO, start = CoroutineStart.LAZY) {
futureSocket.await().openWriteChannel(true)
}
// private val writeChannel= scope.async(Dispatchers.IO, start = CoroutineStart.LAZY) {
// futureSocket.await().outgoing
// }
private var listenerJob: Job? = null
override fun onOpen() {
listenerJob = scope.launch {
val input = futureSocket.await().openReadChannel()
input.consumeEachBufferRange { buffer, last ->
val array = ByteArray(buffer.remaining())
buffer.get(array)
receive(array)
!last && isActive
val input = futureSocket.await().incoming
for (datagram in input) {
receive(datagram.packet.readByteArray())
}
}
}
override suspend fun write(data: ByteArray) {
writeChannel.await().writeAvailable(data)
val socket = futureSocket.await()
socket.send(Datagram(ByteReadPacket(data), socket.remoteAddress))
}
override val lifecycleState: LifecycleState
get() = if(listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
get() = if (listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
override suspend fun stop() {
listenerJob?.cancel()

@ -2,9 +2,12 @@ package space.kscience.controls.server
import io.ktor.http.HttpStatusCode
import io.ktor.server.application.*
import io.ktor.server.application.Application
import io.ktor.server.application.ApplicationStarted
import io.ktor.server.application.install
import io.ktor.server.application.pluginOrNull
import io.ktor.server.cio.CIO
import io.ktor.server.engine.ApplicationEngine
import io.ktor.server.engine.EmbeddedServer
import io.ktor.server.engine.embeddedServer
import io.ktor.server.html.respondHtml
import io.ktor.server.plugins.statuspages.StatusPages
@ -21,7 +24,6 @@ import io.ktor.server.websocket.WebSockets
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.html.*
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.buildJsonArray
import kotlinx.serialization.json.encodeToJsonElement
@ -63,10 +65,10 @@ public fun CoroutineScope.startDeviceServer(
manager: DeviceManager,
port: Int = MagixEndpoint.DEFAULT_MAGIX_HTTP_PORT,
host: String = "localhost",
): ApplicationEngine = embeddedServer(CIO, port, host, module = { deviceServerModule(manager) }).start()
): EmbeddedServer<*, *> = embeddedServer(CIO, port, host, module = { deviceServerModule(manager) }).start()
public fun ApplicationEngine.whenStarted(callback: Application.() -> Unit) {
environment.monitor.subscribe(ApplicationStarted, callback)
public fun EmbeddedServer<*, *>.whenStarted(callback: Application.() -> Unit) {
monitor.subscribe(ApplicationStarted, callback)
}
@ -171,6 +173,7 @@ public fun Application.deviceManagerModule(
val target: String by call.parameters
val property: String by call.parameters
val request = PropertyGetMessage(
time = kotlinx.datetime.Clock.System.now(),
sourceDevice = WEB_SERVER_TARGET,
targetDevice = Name.parse(target),
property = property,
@ -190,6 +193,7 @@ public fun Application.deviceManagerModule(
val json = Json.parseToJsonElement(body)
val request = PropertySetMessage(
time = kotlinx.datetime.Clock.System.now(),
sourceDevice = WEB_SERVER_TARGET,
targetDevice = Name.parse(target),
property = property,

@ -23,21 +23,21 @@ internal class PropertyHistoryTest {
private val propertyChangedMessages = listOf(
PropertyChangedMessage(
time = Instant.fromEpochMilliseconds(1000),
"speed",
Meta.EMPTY,
time = Instant.fromEpochMilliseconds(1000),
sourceDevice = Name.of("virtual-car")
),
PropertyChangedMessage(
time = Instant.fromEpochMilliseconds(1500),
"acceleration",
Meta.EMPTY,
time = Instant.fromEpochMilliseconds(1500),
sourceDevice = Name.of("virtual-car")
),
PropertyChangedMessage(
time = Instant.fromEpochMilliseconds(2000),
"speed",
Meta.EMPTY,
time = Instant.fromEpochMilliseconds(2000),
sourceDevice = Name.of("magix-virtual-car")
)
)

@ -5,8 +5,8 @@ import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.datetime.Instant
import space.kscience.controls.api.PropertyChangedMessage
import space.kscience.controls.misc.PropertyHistory
import space.kscience.controls.misc.ValueWithTime
import space.kscience.controls.time.PropertyHistory
import space.kscience.controls.time.ValueWithTime
import space.kscience.dataforge.meta.MetaConverter
public fun <T> DeviceMessageStorage.propertyHistory(

@ -9,18 +9,22 @@ description = """
kscience {
fullStack("js/controls-vision.js")
useKtor()
useSerialization()
useContextReceivers()
commonMain {
api(projects.controlsCore)
api(projects.controlsConstructor)
api(libs.visionforge.plotly)
api(libs.plotlykt.core)
api(libs.visionforge.markdown)
// api("space.kscience:tables-kt:0.2.1")
// api("space.kscience:visionforge-tables:$visionforgeVersion")
}
jsMain{
//FIXME remove after VisionForge 0.5
api("org.jetbrains.kotlin-wrappers:kotlin-extensions:1.0.1-pre.823")
}
jvmMain{
api(libs.visionforge.server)
api(spclibs.ktor.server.cio)

@ -2,7 +2,7 @@ package space.kscience.controls.vision
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import space.kscience.controls.misc.doubleRange
import space.kscience.controls.doubleRange
import space.kscience.dataforge.meta.MetaConverter
import space.kscience.dataforge.meta.convertable
import space.kscience.dataforge.meta.double

@ -11,20 +11,15 @@ import kotlinx.datetime.Instant
import space.kscience.controls.api.Device
import space.kscience.controls.api.propertyMessageFlow
import space.kscience.controls.constructor.DeviceState
import space.kscience.controls.manager.clock
import space.kscience.controls.misc.ValueWithTime
import space.kscience.controls.spec.DevicePropertySpec
import space.kscience.controls.spec.name
import space.kscience.controls.time.ValueWithTime
import space.kscience.controls.time.clock
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.plotly.Plot
import space.kscience.plotly.bar
import space.kscience.plotly.models.Bar
import space.kscience.plotly.models.Scatter
import space.kscience.plotly.models.Trace
import space.kscience.plotly.models.TraceValues
import space.kscience.plotly.scatter
import space.kscience.plotly.models.*
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds

@ -1,8 +1,8 @@
package space.kscience.controls.vision
import space.kscience.plotly.PlotlyPlugin
import space.kscience.visionforge.html.runVisionClient
import space.kscience.visionforge.markup.MarkupPlugin
import space.kscience.visionforge.plotly.PlotlyPlugin
public fun main(): Unit = runVisionClient {
plugin(PlotlyPlugin)

@ -1,33 +1,27 @@
package space.kscience.controls.vision
import io.ktor.server.cio.CIO
import io.ktor.server.engine.ApplicationEngine
import io.ktor.server.engine.EmbeddedServer
import io.ktor.server.engine.embeddedServer
import io.ktor.server.http.content.staticResources
import io.ktor.server.routing.Routing
import io.ktor.server.routing.routing
import kotlinx.html.TagConsumer
import space.kscience.dataforge.context.Context
import space.kscience.plotly.Plot
import space.kscience.plotly.PlotlyConfig
import space.kscience.plotly.PlotlyPlugin
import space.kscience.visionforge.html.HtmlVisionFragment
import space.kscience.visionforge.html.VisionPage
import space.kscience.visionforge.html.VisionTagConsumer
import space.kscience.visionforge.markup.MarkupPlugin
import space.kscience.visionforge.plotly.PlotlyPlugin
import space.kscience.visionforge.plotly.plotly
import space.kscience.visionforge.server.VisionRoute
import space.kscience.visionforge.server.close
import space.kscience.visionforge.server.openInBrowser
import space.kscience.visionforge.server.visionPage
import space.kscience.visionforge.visionManager
public fun Context.showDashboard(
public suspend fun Context.showDashboard(
port: Int = 7777,
routes: Routing.() -> Unit = {},
configurationBuilder: VisionRoute.() -> Unit = {},
visionFragment: HtmlVisionFragment,
): ApplicationEngine {
): EmbeddedServer<*, *> {
//create a sub-context for visualization
val visualisationContext = buildContext {
plugin(PlotlyPlugin)
@ -44,7 +38,7 @@ public fun Context.showDashboard(
visionPage(
visualisationContext.visionManager,
VisionPage.scriptHeader("js/controls-vision.js"),
configurationBuilder = configurationBuilder,
routeConfiguration = configurationBuilder,
visionFragment = visionFragment
)
}.also {
@ -57,16 +51,16 @@ public fun Context.showDashboard(
//
}
it.close()
it.stop()
}
}
context(VisionTagConsumer<*>)
public fun TagConsumer<*>.plot(
config: PlotlyConfig = PlotlyConfig(),
block: Plot.() -> Unit,
) {
vision {
plotly(config, block)
}
}
//context(consumer: VisionTagConsumer<*>)
//public fun TagConsumer<*>.plot(
// config: PlotlyConfig = PlotlyConfig(),
// block: Plot.() -> Unit,
//) {
// vision {
// plotly(config, block)
// }
//}

@ -13,7 +13,6 @@ description = """
kscience {
jvm()
useKtor()
useSerialization()
useContextReceivers()
commonMain {

@ -18,11 +18,12 @@ import space.kscience.controls.api.propertyMessageFlow
import space.kscience.controls.constructor.DeviceState
import space.kscience.controls.constructor.units.NumericalValue
import space.kscience.controls.constructor.values
import space.kscience.controls.manager.clock
import space.kscience.controls.misc.ValueWithTime
import space.kscience.controls.spec.DevicePropertySpec
import space.kscience.controls.spec.name
import space.kscience.controls.time.ValueWithTime
import space.kscience.controls.time.clock
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.double
import kotlin.time.Duration
@ -40,7 +41,7 @@ internal fun <T> Flow<ValueWithTime<T>>.collectAndTrim(
maxAge: Duration = defaultMaxAge,
maxPoints: Int = defaultMaxPoints,
minPoints: Int = defaultMinPoints,
clock: Clock = Clock.System,
clock: Clock = Global.clock,
): Flow<List<ValueWithTime<T>>> {
require(maxPoints > 2)
require(minPoints > 0)
@ -221,7 +222,7 @@ public fun XYGraphScope<Instant, Double>.PlotAveragedDeviceProperty(
var points by remember { mutableStateOf<List<ValueWithTime<Double>>>(emptyList()) }
LaunchedEffect(device, propertyName, startValue, maxAge, maxPoints, minPoints, averagingInterval) {
val clock = device.clock
val clock: Clock = device.clock
var lastValue = startValue
device.propertyMessageFlow(propertyName)
.chunkedByPeriod(averagingInterval)

@ -36,7 +36,7 @@ dependencies {
kotlin{
jvmToolchain(17)
compilerOptions {
freeCompilerArgs.addAll("-Xjvm-default=all", "-Xopt-in=kotlin.RequiresOptIn")
freeCompilerArgs.addAll("-Xjvm-default=all", "-Xopt-in=kotlin.RequiresOptIn", "-Xcontext-parameters")
}
}

@ -9,11 +9,13 @@ import androidx.compose.ui.unit.dp
import androidx.compose.ui.window.Window
import androidx.compose.ui.window.application
import androidx.compose.ui.window.rememberWindowState
import io.ktor.server.engine.ApplicationEngine
import io.ktor.server.application.port
import io.ktor.server.engine.EmbeddedServer
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.datetime.Clock
import kotlinx.serialization.json.Json
import org.eclipse.milo.opcua.sdk.server.OpcUaServer
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText
@ -46,8 +48,8 @@ private val json = Json { prettyPrint = true }
class DemoController : ContextAware {
var device: DemoDevice? = null
var magixServer: ApplicationEngine? = null
var visualizer: ApplicationEngine? = null
var magixServer: EmbeddedServer<*, *>? = null
var visualizer: EmbeddedServer<*, *>? = null
val opcUaServer: OpcUaServer = OpcUaServer {
setApplicationName(LocalizedText.english("space.kscience.controls.opcua"))
@ -96,7 +98,7 @@ class DemoController : ContextAware {
// send description request
listenerEndpoint.send(
format = DeviceManager.magixFormat,
payload = GetDescriptionMessage(),
payload = GetDescriptionMessage(Clock.System.now()),
source = "listener",
// target = "demoDevice"
)
@ -174,7 +176,7 @@ fun DemoControls(controller: DemoController) {
onClick = {
controller.visualizer?.run {
val host = "localhost"//environment.connectors.first().host
val port = environment.connectors.first().port
val port = environment.config.port
val uri = URI("http", null, host, port, "/", null, null)
Desktop.getDesktop().browse(uri)
}

@ -1,6 +1,6 @@
package space.kscience.controls.demo
import io.ktor.server.engine.ApplicationEngine
import io.ktor.server.engine.EmbeddedServer
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
@ -18,9 +18,8 @@ import space.kscience.plotly.Plotly
import space.kscience.plotly.layout
import space.kscience.plotly.models.Trace
import space.kscience.plotly.plot
import space.kscience.plotly.server.PlotlyUpdateMode
import space.kscience.plotly.server.serve
import space.kscience.plotly.trace
import space.kscience.visionforge.plotly.serveSinglePage
import java.util.concurrent.ConcurrentLinkedQueue
/**
@ -53,7 +52,7 @@ suspend fun Trace.updateXYFrom(flow: Flow<Iterable<Pair<Double, Double>>>) {
}
fun CoroutineScope.startDemoDeviceServer(magixEndpoint: MagixEndpoint): ApplicationEngine {
fun CoroutineScope.startDemoDeviceServer(magixEndpoint: MagixEndpoint): EmbeddedServer<*, *> {
//share subscription to a parse message only once
val subscription = magixEndpoint.subscribe(DeviceManager.magixFormat).shareIn(this, SharingStarted.Lazily)
@ -69,70 +68,68 @@ fun CoroutineScope.startDemoDeviceServer(magixEndpoint: MagixEndpoint): Applicat
(payload as? PropertyChangedMessage)?.takeIf { it.property == DemoDevice.coordinates.name }
}.map { it.value }
return Plotly.serve(port = 9091, scope = this) {
updateMode = PlotlyUpdateMode.PUSH
return Plotly.serveSinglePage(port = 9091, routeConfiguration = {
updateInterval = 100
page { container ->
link {
rel = "stylesheet"
href = "https://stackpath.bootstrapcdn.com/bootstrap/4.5.0/css/bootstrap.min.css"
attributes["integrity"] = "sha384-9aIt2nRpC12Uk9gS9baDl411NQApFmC26EwAOH8WgZl5MYYxFfc+NcPb1dKGj7Sk"
attributes["crossorigin"] = "anonymous"
}
div("row") {
div("col-6") {
plot(renderer = container) {
layout {
title = "sin property"
xaxis.title = "point index"
yaxis.title = "sin"
}
trace {
launch {
val flow: Flow<Iterable<Double>> = sinFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
}
}
}
}
div("col-6") {
plot(renderer = container) {
layout {
title = "cos property"
xaxis.title = "point index"
yaxis.title = "cos"
}
trace {
launch {
val flow: Flow<Iterable<Double>> = cosFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
}
}
}
}
}
div("row") {
div("col-12") {
plot(renderer = container) {
layout {
title = "cos vs sin"
xaxis.title = "sin"
yaxis.title = "cos"
}
trace {
name = "non-synchronized"
launch {
val flow: Flow<Iterable<Pair<Double, Double>>> = sinCosFlow.mapNotNull {
it["x"].double!! to it["y"].double!!
}.windowed(30)
updateXYFrom(flow)
}
}
}
}
}
}) {
link {
rel = "stylesheet"
href = "https://stackpath.bootstrapcdn.com/bootstrap/4.5.0/css/bootstrap.min.css"
attributes["integrity"] = "sha384-9aIt2nRpC12Uk9gS9baDl411NQApFmC26EwAOH8WgZl5MYYxFfc+NcPb1dKGj7Sk"
attributes["crossorigin"] = "anonymous"
}
div("row") {
div("col-6") {
plot{
layout {
title = "sin property"
xaxis.title = "point index"
yaxis.title = "sin"
}
trace {
launch {
val flow: Flow<Iterable<Double>> = sinFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
}
}
}
}
div("col-6") {
plot{
layout {
title = "cos property"
xaxis.title = "point index"
yaxis.title = "cos"
}
trace {
launch {
val flow: Flow<Iterable<Double>> = cosFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
}
}
}
}
}
div("row") {
div("col-12") {
plot{
layout {
title = "cos vs sin"
xaxis.title = "sin"
yaxis.title = "cos"
}
trace {
name = "non-synchronized"
launch {
val flow: Flow<Iterable<Pair<Double, Double>>> = sinCosFlow.mapNotNull {
it["x"].double!! to it["y"].double!!
}.windowed(30)
updateXYFrom(flow)
}
}
}
}
}
}
}

@ -37,8 +37,10 @@ data class Vector2D(var x: Double = 0.0, var y: Double = 0.0) : MetaRepr {
}
}
open class VirtualCar(context: Context, meta: Meta) : DeviceBySpec<VirtualCar>(IVirtualCar, context, meta),
IVirtualCar {
open class VirtualCar(
context: Context,
meta: Meta
) : DeviceBySpec<VirtualCar>(IVirtualCar, context, meta), IVirtualCar {
private val clock = context.clock
private val timeScale = 1e-3

@ -9,7 +9,6 @@ plugins {
kscience {
jvm()
useKtor()
useSerialization()
useContextReceivers()
commonMain {

@ -42,7 +42,11 @@ import space.kscience.controls.constructor.onTimer
import space.kscience.controls.constructor.units.Kilograms
import space.kscience.controls.constructor.units.Meters
import space.kscience.controls.constructor.units.NumericalValue
import space.kscience.controls.manager.*
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.hubMessageFlow
import space.kscience.controls.manager.install
import space.kscience.controls.time.ClockManager
import space.kscience.controls.time.clock
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.request
import java.awt.Dimension
@ -182,8 +186,6 @@ fun main() = application {
}.collect()
}
val clock = remember { context.clock }
Window(title = "Pid regulator simulator", onCloseRequest = ::exitApplication) {
window.minimumSize = Dimension(800, 400)
MaterialTheme {
@ -269,12 +271,12 @@ fun main() = application {
second(400.dp) {
ChartLayout {
XYGraph<Instant, Double>(
xAxisModel = remember { TimeAxisModel.recent(maxAge, clock) },
xAxisModel = remember { TimeAxisModel.recent(maxAge, context.clock) },
yAxisModel = rememberDoubleLinearAxisModel((range.start - 1.0)..(range.endInclusive + 1.0)),
xAxisTitle = { Text("Time in seconds relative to current") },
xAxisLabels = { it: Instant ->
Text(
(clock.now() - it).toDouble(
(context.clock.now() - it).toDouble(
DurationUnit.SECONDS
).toString(2)
)

@ -31,8 +31,8 @@ import space.kscience.controls.constructor.devices.angle
import space.kscience.controls.constructor.models.Leadscrew
import space.kscience.controls.constructor.models.coerceIn
import space.kscience.controls.constructor.units.*
import space.kscience.controls.manager.ClockManager
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.time.ClockManager
import space.kscience.dataforge.context.Context
import java.awt.Dimension
import kotlin.random.Random

@ -4,7 +4,6 @@ package space.kscience.controls.demo.collective
import space.kscience.controls.api.Device
import space.kscience.controls.constructor.*
import space.kscience.controls.misc.stringList
import space.kscience.controls.peer.PeerConnection
import space.kscience.controls.spec.DeviceSpec
import space.kscience.dataforge.context.Context

@ -8,10 +8,7 @@ import space.kscience.controls.api.PropertySetMessage
import space.kscience.controls.client.DeviceClient
import space.kscience.controls.client.launchMagixService
import space.kscience.controls.client.write
import space.kscience.controls.constructor.DeviceState
import space.kscience.controls.constructor.ModelConstructor
import space.kscience.controls.constructor.MutableDeviceState
import space.kscience.controls.constructor.onTimer
import space.kscience.controls.constructor.*
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.install
import space.kscience.controls.manager.respondMessage
@ -37,7 +34,6 @@ import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
private val deviceVelocity = 0.1.kilometers
private val center = Gmc.ofDegrees(55.925, 37.514)
@ -156,6 +152,7 @@ internal class DeviceCollectiveModel(
json.encodeToString(
DeviceMessage.serializer(),
PropertySetMessage(
time = clock.now(),
property = CollectiveDevice.velocity.name,
value = gmcVelocityMetaConverter.convert(state.velocity.value),
targetDevice = null

@ -21,7 +21,7 @@ dependencies {
}
kotlin{
jvmToolchain(11)
jvmToolchain(17)
}

@ -8,12 +8,14 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Clock
import space.kscience.controls.api.PropertyChangedMessage
import space.kscience.controls.client.launchMagixService
import space.kscience.controls.client.magixFormat
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.install
import space.kscience.controls.spec.*
import space.kscience.controls.time.ClockManager
import space.kscience.controls.time.clock
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.context.request
@ -22,17 +24,17 @@ import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.int
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.subscribe
import space.kscience.magix.rsocket.rSocketStreamWithWebSockets
import space.kscience.magix.rsocket.rSocketStreamWithTcp
import space.kscience.magix.server.RSocketMagixFlowPlugin
import space.kscience.magix.server.startMagixServer
import space.kscience.plotly.Plotly
import space.kscience.plotly.PlotlyConfig
import space.kscience.plotly.layout
import space.kscience.plotly.models.Bar
import space.kscience.plotly.plot
import space.kscience.plotly.server.PlotlyUpdateMode
import space.kscience.plotly.server.serve
import space.kscience.plotly.server.show
import space.kscience.plotly.models.invoke
import space.kscience.plotly.plotly
import space.kscience.visionforge.plotly.serveSinglePage
import space.kscience.visionforge.server.openInBrowser
import space.kscince.magix.zmq.ZmqMagixFlowPlugin
import space.kscince.magix.zmq.zmq
import kotlin.random.Random
@ -46,6 +48,10 @@ class MassDevice(context: Context, meta: Meta) : DeviceBySpec<MassDevice>(MassDe
private val randomValue get() = rng.nextDouble()
private var counter: Long = 1
private val incrementValue: Double get() = (counter++).toDouble()
companion object : DeviceSpec<MassDevice>(), Factory<MassDevice> {
override fun build(context: Context, meta: Meta): MassDevice = MassDevice(context, meta)
@ -61,7 +67,13 @@ class MassDevice(context: Context, meta: Meta) : DeviceBySpec<MassDevice>(MassDe
}
suspend fun main() {
val context = Context("Mass")
val context = Context("Mass"){
plugin(ClockManager){
"clock.mode" put "jvm"
}
}
val clock = context.clock
context.startMagixServer(
RSocketMagixFlowPlugin(),
@ -79,10 +91,10 @@ suspend fun main() {
val deviceManager = deviceContext.request(DeviceManager)
deviceManager.install("device$it", MassDevice)
deviceManager.install("device$it", MassDevice, Meta { "delay" put 5 })
val endpointId = "device$it"
val deviceEndpoint = MagixEndpoint.rSocketStreamWithWebSockets("localhost")
val deviceEndpoint = MagixEndpoint.rSocketStreamWithTcp("localhost")
deviceManager.launchMagixService(deviceEndpoint, endpointId)
}
@ -95,34 +107,39 @@ suspend fun main() {
val latest = HashMap<String, Duration>()
val max = HashMap<String, Duration>()
// val counters = hashMapOf<String, Double>()
monitorEndpoint.subscribe(DeviceManager.magixFormat).onEach { (magixMessage, payload) ->
mutex.withLock {
val delay = Clock.System.now() - payload.time
latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time
max[magixMessage.sourceEndpoint] =
maxOf(delay, max[magixMessage.sourceEndpoint] ?: ZERO)
if(payload is PropertyChangedMessage) {
val delay = clock.now() - payload.time
mutex.withLock {
// val deviceName = payload.sourceDevice.toString()
// counters[deviceName] = counters[deviceName]?.inc() ?: 1.0
// println("${deviceName}:${counters[deviceName]!! - payload.value.double!!}")
latest[magixMessage.sourceEndpoint] = delay
max[magixMessage.sourceEndpoint] = maxOf(delay, max[magixMessage.sourceEndpoint] ?: ZERO)
}
}
}.launchIn(this)
while (isActive) {
delay(200)
delay(1000)
mutex.withLock {
val sorted = max.mapKeys { it.key.substring(6).toInt() }.toSortedMap()
latest.clear()
max.clear()
x.numbers = sorted.keys
y.numbers = sorted.values.map { it.inWholeMicroseconds / 1000.0 + 0.0001 }
y.numbers = sorted.values.map { it.inWholeMicroseconds.toDouble() / 1000.0 }
}
}
}
}
val application = Plotly.serve(port = 9091) {
updateMode = PlotlyUpdateMode.PUSH
val application = Plotly.serveSinglePage(port = 9091, routeConfiguration = {
updateInterval = 1000
page { container ->
plot(renderer = container, config = PlotlyConfig { saveAsSvg() }) {
}) {
vision {
plotly(config = PlotlyConfig { saveAsSvg() }) {
layout {
// title = "Latest event"
@ -134,7 +151,8 @@ suspend fun main() {
}
}
application.show()
application.openInBrowser()
while (readlnOrNull().isNullOrBlank()) {

@ -12,8 +12,8 @@ import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withTimeout
import space.kscience.controls.api.DeviceHub
import space.kscience.controls.api.PropertyDescriptor
import space.kscience.controls.misc.asMeta
import space.kscience.controls.misc.duration
import space.kscience.controls.asMeta
import space.kscience.controls.duration
import space.kscience.controls.ports.AsynchronousPort
import space.kscience.controls.ports.KtorTcpPort
import space.kscience.controls.ports.send

@ -4,9 +4,9 @@ import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.util.InternalAPI
import io.ktor.util.moveToByteArray
import io.ktor.utils.io.writeAvailable
import io.ktor.utils.io.read
import io.ktor.utils.io.writeByteArray
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
@ -17,7 +17,6 @@ val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
throwable.printStackTrace()
}
@OptIn(InternalAPI::class)
fun Context.launchPiDebugServer(port: Int, axes: List<String>): Job = launch(exceptionHandler) {
val virtualDevice = PiMotionMasterVirtualDevice(this@launchPiDebugServer, axes)
aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().bind("localhost", port).use { server ->
@ -32,7 +31,7 @@ fun Context.launchPiDebugServer(port: Int, axes: List<String>): Job = launch(exc
val sendJob = virtualDevice.subscribe().onEach {
//println("Sending: ${it.decodeToString()}")
output.writeAvailable(it)
output.writeByteArray(it)
output.flush()
}.launchIn(this)

@ -7,4 +7,6 @@ org.gradle.parallel=true
org.gradle.configureondemand=true
org.gradle.jvmargs=-Xmx4096m
toolsVersion=0.15.4-kotlin-2.0.0
org.jetbrains.dokka.experimental.gradle.pluginMode=V2Enabled
toolsVersion=0.17.1-kotlin-2.1.20

@ -1,7 +1,7 @@
[versions]
dataforge = "0.9.0"
rsocket = "0.15.4"
dataforge = "0.10.1"
rsocket = "0.20.0"
xodus = "2.0.1"
uuid = "0.8.0"
@ -10,8 +10,6 @@ fazecast = "2.10.3"
tornadofx = "1.7.20"
plotlykt = "0.7.2"
logback = "1.2.11"
hivemq = "1.3.1"
@ -29,7 +27,7 @@ pi4j-ktx = "2.4.0"
plc4j = "0.12.0"
visionforge = "0.4.2"
visionforge = "0.5.0"
[libraries]
@ -42,15 +40,17 @@ xodus-entity-store = { module = "org.jetbrains.xodus:xodus-entity-store", versio
xodus-environment = { module = "org.jetbrains.xodus:xodus-environment", version.ref = "xodus" }
xodus-vfs = { module = "org.jetbrains.xodus:xodus-vfs", version.ref = "xodus" }
rsocket-ktor-client = { module = "io.rsocket.kotlin:rsocket-ktor-client", version.ref = "rsocket" }
rsocket-ktor-server = { module = "io.rsocket.kotlin:rsocket-ktor-server", version.ref = "rsocket" }
rsocket-ktor-client = { module = "io.rsocket.kotlin:ktor-client-rsocket", version.ref = "rsocket" }
rsocket-ktor-server = { module = "io.rsocket.kotlin:ktor-server-rsocket", version.ref = "rsocket" }
rsocket-transport-ktor-tcp = { module = "io.rsocket.kotlin:rsocket-transport-ktor-tcp", version.ref = "rsocket" }
jSerialComm = { module = "com.fazecast:jSerialComm", version.ref = "fazecast" }
tornadofx = { module = "no.tornado:tornadofx", version.ref = "tornadofx" }
plotlykt-server = { module = "space.kscience:plotlykt-server", version.ref = "plotlykt" }
plotlykt-core = { module = "space.kscience:plotly-kt-core", version.ref = "visionforge" }
plotlykt-server = { module = "space.kscience:plotly-kt-server", version.ref = "visionforge" }
logback-classic = { module = "ch.qos.logback:logback-classic", version.ref = "logback" }
@ -74,7 +74,6 @@ pi4j-plugin-pigpio = { module = "com.pi4j:pi4j-plugin-pigpio", version.ref = "pi
plc4j-spi = { module = "org.apache.plc4x:plc4j-spi", version.ref = "plc4j" }
visionforge-jupiter = { module = "space.kscience:visionforge-jupyter", version.ref = "visionforge" }
visionforge-plotly = { module = "space.kscience:visionforge-plotly", version.ref = "visionforge" }
visionforge-markdown = { module = "space.kscience:visionforge-markdown", version.ref = "visionforge" }
visionforge-server = { module = "space.kscience:visionforge-server", version.ref = "visionforge" }
visionforge-compose-html = { module = "space.kscience:visionforge-compose-html", version.ref = "visionforge" }

@ -15,15 +15,15 @@ kscience {
native()
wasm()
useCoroutines()
useSerialization{
useSerialization {
json()
}
commonMain{
commonMain {
implementation(spclibs.atomicfu)
}
}
readme{
readme {
maturity = Maturity.EXPERIMENTAL
}

@ -11,7 +11,7 @@ description = """
""".trimIndent()
dependencies {
implementation(project(":magix:magix-rsocket"))
implementation(projects.magix.magixRsocket)
implementation(spclibs.kotlinx.coroutines.jdk9)
}

@ -20,11 +20,12 @@ kscience {
}
dependencies {
api(projects.magix.magixApi)
implementation(spclibs.ktor.client.core)
implementation(libs.rsocket.ktor.client)
api(spclibs.kotlinx.io.core)
api(spclibs.ktor.client.core)
api(libs.rsocket.ktor.client)
}
dependencies(jvmMain) {
implementation(libs.rsocket.transport.ktor.tcp)
api(libs.rsocket.transport.ktor.tcp)
}
}

@ -10,6 +10,7 @@ import io.rsocket.kotlin.ktor.client.RSocketSupport
import io.rsocket.kotlin.ktor.client.rSocket
import io.rsocket.kotlin.payload.buildPayload
import io.rsocket.kotlin.payload.data
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
@ -17,6 +18,7 @@ import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.io.readString
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.MagixMessageFilter
@ -32,7 +34,10 @@ public class RSocketMagixEndpoint(private val rSocket: RSocket) : MagixEndpoint,
}
val flow = rSocket.requestStream(payload)
return flow.map {
MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText())
MagixEndpoint.magixJson.decodeFromString(
MagixMessage.serializer(),
it.data.readString()
)
}.filter(filter).flowOn(rSocket.coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined)
}
@ -65,12 +70,12 @@ public suspend fun MagixEndpoint.Companion.rSocketWithWebSockets(
host: String,
port: Int = DEFAULT_MAGIX_HTTP_PORT,
path: String = "/rsocket",
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
rSocketConfig: RSocketConnectorBuilder.() -> Unit = {},
): RSocketMagixEndpoint {
val client = HttpClient {
install(WebSockets)
install(RSocketSupport) {
connector = buildConnector(rSocketConfig)
connector(rSocketConfig)
}
}

@ -16,6 +16,7 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.map
import kotlinx.io.readString
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.MagixMessageFilter
@ -51,7 +52,7 @@ public class RSocketStreamMagixEndpoint(
override fun subscribe(
filter: MagixMessageFilter,
): Flow<MagixMessage> = input.map {
MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText())
MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readString())
}.filter(filter)
override suspend fun broadcast(message: MagixMessage): Unit {
@ -72,12 +73,12 @@ public suspend fun MagixEndpoint.Companion.rSocketStreamWithWebSockets(
port: Int = DEFAULT_MAGIX_HTTP_PORT,
path: String = "/rsocket",
filter: MagixMessageFilter = MagixMessageFilter.ALL,
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
rSocketConfig: RSocketConnectorBuilder.() -> Unit = {},
): RSocketStreamMagixEndpoint {
val client = HttpClient {
install(WebSockets)
install(RSocketSupport) {
connector = buildConnector(rSocketConfig)
connector(rSocketConfig)
}
}

@ -1,8 +1,8 @@
package space.kscience.magix.rsocket
import io.ktor.network.sockets.SocketOptions
import io.rsocket.kotlin.core.RSocketConnectorBuilder
import io.rsocket.kotlin.transport.ktor.tcp.TcpClientTransport
import io.rsocket.kotlin.transport.ktor.tcp.KtorTcpClientTransport
import io.rsocket.kotlin.transport.ktor.tcp.KtorTcpClientTransportBuilder
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixMessageFilter
import kotlin.coroutines.coroutineContext
@ -14,15 +14,14 @@ import kotlin.coroutines.coroutineContext
public suspend fun MagixEndpoint.Companion.rSocketWithTcp(
host: String,
port: Int = DEFAULT_MAGIX_RAW_PORT,
tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
tcpConfig: KtorTcpClientTransportBuilder.() -> Unit = {},
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
): RSocketMagixEndpoint {
val transport = TcpClientTransport(
hostname = host,
port = port,
val transport = KtorTcpClientTransport(
context = coroutineContext,
configure = tcpConfig
)
).target(host,port)
val rSocket = buildConnector(rSocketConfig).connect(transport)
return RSocketMagixEndpoint(rSocket)
@ -33,15 +32,14 @@ public suspend fun MagixEndpoint.Companion.rSocketStreamWithTcp(
host: String,
port: Int = DEFAULT_MAGIX_RAW_PORT,
filter: MagixMessageFilter = MagixMessageFilter.ALL,
tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
tcpConfig: KtorTcpClientTransportBuilder.() -> Unit = {},
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
): RSocketStreamMagixEndpoint {
val transport = TcpClientTransport(
hostname = host,
port = port,
val transport = KtorTcpClientTransport(
context = coroutineContext,
configure = tcpConfig
)
).target(host,port)
val rSocket = buildConnector(rSocketConfig).connect(transport)
return RSocketStreamMagixEndpoint(rSocket, filter)

@ -1,11 +1,12 @@
package rsocket
import io.ktor.network.sockets.SocketOptions
import io.rsocket.kotlin.core.RSocketConnectorBuilder
import io.rsocket.kotlin.transport.ktor.tcp.TcpClientTransport
import io.rsocket.kotlin.transport.ktor.tcp.KtorTcpClientTransport
import io.rsocket.kotlin.transport.ktor.tcp.KtorTcpClientTransportBuilder
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.rsocket.RSocketMagixEndpoint
import space.kscience.magix.rsocket.buildConnector
import kotlin.coroutines.coroutineContext
/**
@ -14,14 +15,14 @@ import space.kscience.magix.rsocket.buildConnector
public suspend fun MagixEndpoint.Companion.rSocketWithTcp(
host: String,
port: Int = DEFAULT_MAGIX_RAW_PORT,
tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
tcpConfig: KtorTcpClientTransportBuilder.() -> Unit = {},
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
): RSocketMagixEndpoint {
val transport = TcpClientTransport(
hostname = host,
port = port,
val transport = KtorTcpClientTransport(
context = coroutineContext,
configure = tcpConfig
)
).target(host,port)
val rSocket = buildConnector(rSocketConfig).connect(transport)
return RSocketMagixEndpoint(rSocket)

@ -16,6 +16,8 @@ kscience {
jvm()
useSerialization{
json()
cbor()
protobuf()
}
jvmMain{
@ -28,6 +30,7 @@ kscience {
api(libs.rsocket.ktor.server)
api(libs.rsocket.transport.ktor.tcp)
api(spclibs.kotlinx.io.core)
}
}

@ -1,32 +1,86 @@
@file:OptIn(ExperimentalSerializationApi::class)
package space.kscience.magix.server
import io.ktor.network.sockets.SocketOptions
import io.rsocket.kotlin.ConnectionAcceptor
import io.rsocket.kotlin.RSocketRequestHandler
import io.rsocket.kotlin.core.RSocketServer
import io.rsocket.kotlin.core.RSocketServerBuilder
import io.rsocket.kotlin.payload.Payload
import io.rsocket.kotlin.payload.buildPayload
import io.rsocket.kotlin.payload.data
import io.rsocket.kotlin.transport.ktor.tcp.TcpServer
import io.rsocket.kotlin.transport.ktor.tcp.TcpServerTransport
import io.rsocket.kotlin.transport.ktor.tcp.KtorTcpServerTransport
import io.rsocket.kotlin.transport.ktor.tcp.KtorTcpServerTransportBuilder
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.serialization.encodeToString
import kotlinx.coroutines.launch
import kotlinx.io.Buffer
import kotlinx.io.readByteArray
import kotlinx.io.readString
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.cbor.Cbor
import kotlinx.serialization.json.io.encodeToSink
import kotlinx.serialization.protobuf.ProtoBuf
import space.kscience.magix.api.*
import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT
private enum class RSocketMessageEncoding {
JSON,
CBOR,
PROTO
}
private fun Buffer?.inferFormat(): RSocketMessageEncoding = when (val str = this?.readString()) {
"proto" -> RSocketMessageEncoding.PROTO
"cbor" -> RSocketMessageEncoding.CBOR
else -> RSocketMessageEncoding.JSON
}
private fun decodeMessage(buffer: Buffer, format: RSocketMessageEncoding): MagixMessage = when (format) {
RSocketMessageEncoding.JSON -> MagixEndpoint.magixJson.decodeFromString(
MagixMessage.serializer(),
buffer.readString()
)
RSocketMessageEncoding.CBOR -> Cbor.decodeFromByteArray(
MagixMessage.serializer(),
buffer.readByteArray()
)
RSocketMessageEncoding.PROTO -> ProtoBuf.decodeFromByteArray(
MagixMessage.serializer(),
buffer.readByteArray()
)
}
private fun encodeMessage(message: MagixMessage, format: RSocketMessageEncoding): Buffer {
return when (format) {
RSocketMessageEncoding.JSON -> Buffer().also { buffer ->
MagixEndpoint.magixJson.encodeToSink(MagixMessage.serializer(), message, buffer)
}
RSocketMessageEncoding.CBOR -> Buffer().also { buffer ->
buffer.write(Cbor.encodeToByteArray(MagixMessage.serializer(), message))
}
RSocketMessageEncoding.PROTO -> Buffer().also { buffer ->
buffer.write(ProtoBuf.encodeToByteArray(MagixMessage.serializer(), message))
}
}
}
/**
* Raw TCP magix server plugin
*/
public class RSocketMagixFlowPlugin(
private val serverHost: String = "0.0.0.0",
private val serverPort: Int = DEFAULT_MAGIX_RAW_PORT,
private val transportConfiguration: SocketOptions.AcceptorOptions.() -> Unit = {},
private val transportConfiguration: KtorTcpServerTransportBuilder.() -> Unit = {},
private val rsocketConfiguration: RSocketServerBuilder.() -> Unit = {},
) : MagixFlowPlugin {
@ -35,19 +89,15 @@ public class RSocketMagixFlowPlugin(
receive: Flow<MagixMessage>,
sendMessage: suspend (MagixMessage) -> Unit,
): Job {
val tcpTransport = TcpServerTransport(
hostname = serverHost,
port = serverPort,
val tcpTransport = KtorTcpServerTransport(
scope.coroutineContext,
configure = transportConfiguration
)
val rSocketJob: TcpServer = RSocketServer(rsocketConfiguration)
.bindIn(scope, tcpTransport, acceptor(scope, receive, sendMessage))
).target(serverHost, serverPort)
scope.coroutineContext[Job]?.invokeOnCompletion {
rSocketJob.handlerJob.cancel()
return scope.launch {
RSocketServer(rsocketConfiguration)
.startServer(tcpTransport, acceptor(scope, receive, sendMessage))
}
return rSocketJob.handlerJob
}
public companion object {
@ -59,40 +109,38 @@ public class RSocketMagixFlowPlugin(
RSocketRequestHandler(coroutineScope.coroutineContext) {
//handler for request/stream
requestStream { request: Payload ->
val requestText = request.data.readText()
val filter = if(requestText.isBlank()) {
val format = request.metadata.inferFormat()
val requestText = request.data.readString()
val filter = if (requestText.isBlank()) {
MagixMessageFilter.ALL
} else MagixEndpoint.magixJson.decodeFromString(
} else MagixEndpoint.magixJson.decodeFromString(
MagixMessageFilter.serializer(),
requestText
)
receive.filter(filter).map { message ->
val string = MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message)
buildPayload { data(string) }
buildPayload {
data(encodeMessage(message,format))
}
}
}
//single send
fireAndForget { request: Payload ->
val message = MagixEndpoint.magixJson.decodeFromString(
MagixMessage.serializer(),
request.data.readText()
)
val format = request.metadata.inferFormat()
val message = decodeMessage(request.data, format)
sendMessage(message)
}
// bidirectional connection, used for streaming connection
requestChannel { request: Payload, input: Flow<Payload> ->
input.onEach { inputPayload ->
sendMessage(
MagixEndpoint.magixJson.decodeFromString(
MagixMessage.serializer(),
inputPayload.use { it.data.readText() }
)
)
val format = request.metadata.inferFormat()
input.onEach { inputPayload: Payload ->
sendMessage(decodeMessage(inputPayload.data, format))
}.launchIn(this)
val filterText = request.data.readText()
val filterText = request.data.readString()
val filter = if (filterText.isBlank()) {
MagixMessageFilter.ALL
@ -101,8 +149,9 @@ public class RSocketMagixFlowPlugin(
}
receive.filter(filter).map { message ->
val string = MagixEndpoint.magixJson.encodeToString(message)
buildPayload { data(string) }
buildPayload {
data(encodeMessage(message,format))
}
}
}
}

@ -1,7 +1,7 @@
package space.kscience.magix.server
import io.ktor.server.cio.CIO
import io.ktor.server.engine.ApplicationEngine
import io.ktor.server.engine.EmbeddedServer
import io.ktor.server.engine.embeddedServer
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.BufferOverflow
@ -19,7 +19,7 @@ public fun CoroutineScope.startMagixServer(
vararg plugins: MagixFlowPlugin,
port: Int = DEFAULT_MAGIX_HTTP_PORT,
buffer: Int = 1000,
): ApplicationEngine {
): EmbeddedServer<*,*> {
val magixFlow = MutableSharedFlow<MagixMessage>(
replay = buffer,
@ -30,7 +30,5 @@ public fun CoroutineScope.startMagixServer(
it.start(this, magixFlow)
}
return embeddedServer(CIO, host = "localhost", port = port, module = { magixModule(magixFlow) }).apply {
start()
}
return embeddedServer(CIO, host = "localhost", port = port, module = { magixModule(magixFlow) }).start()
}

@ -6,49 +6,86 @@ import kotlinx.coroutines.flow.*
import kotlinx.datetime.Instant
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.cancellation.CancellationException
import kotlin.time.Duration
/**
* Suspend the collection of this [Flow] until event time is lower that threshold
*/
public fun <E : TimelineEvent> Flow<E>.withTimeThreshold(
public fun <E : WithTime> Flow<E>.withTimeThreshold(
threshold: Flow<Instant>
): Flow<E> = transform { event ->
threshold.first { it > event.time }
emit(event)
}
private class OriginChangedException : CancellationException("Origin is changed")
/**
* @param lookaheadInterval an interval for generated events ahead of the last observed event.
*/
public class GeneratingTimeline<E : TimelineEvent>(
public class GeneratingTimeline<E : Any>(
origin: E,
private val lookaheadInterval: Duration,
timeOf: E.() -> Instant,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
private val generator: suspend FlowCollector<E>.(E) -> Unit
) : ProducerTimeline<E>(origin.time, coroutineContext) {
private val generator: suspend TimelineCollector<E>.(E) -> Unit
) : ProducerTimeline<E>(timeOf(origin), timeOf, coroutineContext) {
private val startEventFlow = MutableStateFlow(origin)
private data class EventWithOrigin<E : TimelineEvent>(val origin: E, val event: E) : TimelineEvent {
override val time: Instant get() = event.time
private inner class EventWithOrigin(val origin: E, val event: E) : WithTime {
override val time: Instant get() = timeOf(event)
}
private val events: SharedFlow<E> = flow {
private val events: SharedFlow<E> = flow<EventWithOrigin> {
coroutineScope {
startEventFlow.collect { startEvent ->
emitAll(
flow { generator(startEvent) }.takeWhile { startEvent == startEventFlow.value }.map {
EventWithOrigin(startEvent, it)
val timelineCollector = object : TimelineCollector<E> {
override val time: StateFlow<Instant> get() = this@GeneratingTimeline.time
override var lastEvent: E? = startEvent
override suspend fun emit(value: E) {
if (startEvent == startEvent) {
lastEvent = value
emit(EventWithOrigin(startEvent, value))
} else {
throw OriginChangedException()
}
}
)
}
try {
timelineCollector.generator(startEvent)
} catch (_: OriginChangedException) {
return@collect
}
// emitAll(
// flow innerFlow@{
// object : TimelineCollector<E> {
// override val time: StateFlow<Instant> get() = this@GeneratingTimeline.time
// override val lastEvent: E?
// get() = TODO("Not yet implemented")
//
// override suspend fun emit(value: E) {
// this@innerFlow.emit(value)
// }
//
// }.generator(startEvent)
// }.takeWhile {
// startEvent == startEventFlow.value
// }.map {
// EventWithOrigin(startEvent, it)
// }
// )
}
}
}.withTimeThreshold(
threshold = time.map { it + lookaheadInterval }
).buffer(Channel.UNLIMITED).mapNotNull {
).buffer(Channel.UNLIMITED).mapNotNull { event: GeneratingTimeline<E>.EventWithOrigin ->
//a barrier to avoid leaking stale events after interruption from buffer
it.takeIf { it.origin == startEventFlow.value }?.event
event.takeIf { it.origin == startEventFlow.value }?.event
}.shareIn(
scope = timelineScope,
started = SharingStarted.Lazily,
@ -57,10 +94,10 @@ public class GeneratingTimeline<E : TimelineEvent>(
override fun events(): Flow<E> = events
public suspend fun interrupt(newStart: E) {
check(newStart.time >= time.value) {
check(timeOf(newStart) >= time.value) {
"Can't interrupt generating timeline after observed event"
}
startTime = newStart.time
startTime = timeOf(newStart)
startEventFlow.emit(newStart)
}
}

@ -10,19 +10,22 @@ import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
public class MergedTimeline<E : TimelineEvent>(
public class MergedTimeline<E : Any>(
private val timelines: List<Timeline<E>>,
private val timeOf: E.() -> Instant,
coroutineContext: CoroutineContext = EmptyCoroutineContext
) : Timeline<E> {
protected val timelineScope: CoroutineScope = CoroutineScope(
private val timelineScope: CoroutineScope = CoroutineScope(
coroutineContext +
SupervisorJob(coroutineContext[Job]) +
CoroutineExceptionHandler{ _, throwable -> throwable.printStackTrace() } +
CoroutineExceptionHandler { _, throwable -> throwable.printStackTrace() } +
CoroutineName("MergedTimeline")
)
override val time: StateFlow<Instant> = combine(timelines.map { it.time }){ array->
override fun timeOf(event: E): Instant = event.timeOf()
override val time: StateFlow<Instant> = combine(timelines.map { it.time }) { array ->
array.max()
}.stateIn(timelineScope, SharingStarted.Lazily, timelines.maxOf { it.time.value })
@ -52,17 +55,17 @@ public class MergedTimeline<E : TimelineEvent>(
private val collectJob = timelineScope.launch(context) {
channel.consumeAsFlow().onEach {
time.emit(it.time)
time.emit(timeOf(it))
}.collector()
}
private val mutex = Mutex()
override suspend fun collect(upTo: Instant) = mutex.withLock{
override suspend fun collect(upTo: Instant) = mutex.withLock {
timelineObservers.forEach {
it.collect(upTo)
}
buffer.sortedBy { it.time }.forEach {
buffer.sortedBy { timeOf(it) }.forEach {
channel.send(it)
buffer.remove(it)
}

@ -9,29 +9,42 @@ import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Instant
import kotlin.coroutines.CoroutineContext
public abstract class ProducerTimeline<E : TimelineEvent>(
/**
* A general abstraction for timelines that could produce new events
*/
public abstract class ProducerTimeline<E : Any>(
protected var startTime: Instant,
private val timeOf: E.() -> Instant,
coroutineContext: CoroutineContext
) : Timeline<E>, AutoCloseable {
protected val timelineScope: CoroutineScope = CoroutineScope(
coroutineContext +
SupervisorJob(coroutineContext[Job]) +
CoroutineExceptionHandler{ _, throwable -> throwable.printStackTrace() } +
CoroutineName("Timeline")
SupervisorJob(coroutineContext[Job]) +
CoroutineExceptionHandler { _, throwable -> throwable.printStackTrace() } +
CoroutineName("Timeline[${hashCode().toString(16)}]")
)
override fun timeOf(event: E): Instant = event.timeOf()
private val observers: MutableSet<TimelineObserver> = mutableSetOf()
/**
* Update time on this channel event
*/
private val feedbackChannel = Channel<Unit>(onBufferOverflow = BufferOverflow.DROP_OLDEST)
override val time: StateFlow<Instant> = feedbackChannel.consumeAsFlow().map {
maxOf(startTime,observers.maxOfOrNull { it.time.value } ?: startTime)
maxOf(startTime, observers.maxOfOrNull { it.time.value } ?: Instant.DISTANT_PAST)
}.stateIn(timelineScope, SharingStarted.Lazily, startTime)
override suspend fun advance(toTime: Instant) {
observers.forEach {
it.collect(toTime)
coroutineScope {
observers.forEach {
launch {
it.collect(toTime)
}
}
}
}
@ -50,7 +63,7 @@ public abstract class ProducerTimeline<E : TimelineEvent>(
private val collectJob = timelineScope.launch(context) {
channel.consumeAsFlow().onEach {
time.emit(it.time)
time.emit(timeOf(it))
feedbackChannel.send(Unit)
}.collector()
}
@ -60,7 +73,7 @@ public abstract class ProducerTimeline<E : TimelineEvent>(
override suspend fun collect(upTo: Instant) = mutex.withLock {
require(upTo >= time.value) { "Requested time $upTo is lower than observed ${time.value}" }
events().takeWhile {
it.time <= upTo
timeOf(it) <= upTo
}.collect {
channel.send(it)
}

@ -7,23 +7,31 @@ import kotlinx.datetime.Instant
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
public interface CollectingTimeline<E: Any>: Timeline<E>, TimelineCollector<E>
/**
* A manually mutable [Timeline] that could be modified via [emit] method by multiple
*
* @param bufferSize the size of event buffer. If more than [bufferSize] events are emitted and not consumed via [observe], emitter suspends.
*/
public class SharedTimeline<E : TimelineEvent>(
public class SharedTimeline<E : Any>(
startTime: Instant,
coroutineContext: CoroutineContext = EmptyCoroutineContext
) : ProducerTimeline<E>(startTime, coroutineContext) {
timeOf: E.() -> Instant,
bufferSize: Int = Channel.UNLIMITED,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
) : ProducerTimeline<E>(startTime, timeOf, coroutineContext), CollectingTimeline<E> {
private val events = MutableSharedFlow<E>(replay = Channel.UNLIMITED)
private val events = MutableSharedFlow<E>(replay = bufferSize)
override fun events(): Flow<E> = events
override val lastEvent: E? get() = events.replayCache.lastOrNull()
/**
* Emit new event to the timeline
*/
public suspend fun emit(event: E) {
if (event.time < (events.replayCache.lastOrNull()?.time ?: time.value)) {
override suspend fun emit(event: E) {
if (timeOf(event) < (events.replayCache.lastOrNull()?.let(::timeOf) ?: time.value)) {
error("Can't emit event $event because timeline monotony is broken")
}
events.emit(event)

@ -5,21 +5,9 @@ import kotlinx.coroutines.flow.StateFlow
import kotlinx.datetime.Instant
import kotlin.time.Duration
public interface TimelineEvent {
public val time: Instant
}
public interface TimelineInterval : TimelineEvent {
public val startTime: Instant
public val duration: Duration
override val time: Instant
get() = startTime + duration
}
public data class SimpleTimelineEvent<T>(override val time: Instant, val value: T) : TimelineEvent
/**
* A handler for observation of a timeline. On close stops collection.
*/
public interface TimelineObserver : AutoCloseable {
/**
* The subjective time of this observer (last observed time)
@ -27,11 +15,10 @@ public interface TimelineObserver : AutoCloseable {
public val time: StateFlow<Instant>
/**
* Collect all uncollected events from [time] to [upTo].
* Collect all uncollected events from [time] to [upTo]. Suspends until all valid events are collected.
*
* By default, collects all events.
*/
public suspend fun collect(upTo: Instant = Instant.DISTANT_FUTURE)
public suspend fun collect(upTo: Instant)
}
/**
@ -47,15 +34,16 @@ public suspend fun TimelineObserver.collect(duration: Duration): Unit = collect(
*
* Timeline guarantees that already read events won't change, but unread events could change.
*/
public interface Timeline<E : TimelineEvent> {
public interface Timeline<E : Any> {
/**
* A subjective time of this timeline. The subjective time is the last observed time.
*/
public val time: StateFlow<Instant>
public fun timeOf(event: E): Instant
/**
* Attach observer to this [Timeline]. The observer collection is not triggered right away, but only on demand.
* Attach observer to this [Timeline]. The observer collection is triggered by timeline itself.
*
* Each collection shifts [TimelineObserver.time] for this observer.
*/
@ -71,10 +59,11 @@ public interface Timeline<E : TimelineEvent> {
public suspend fun advance(toTime: Instant)
}
/**
* Perform [collector] action on each event
*/
public suspend fun <E : TimelineEvent> Timeline<E>.observeEach(
public suspend fun <E : Any> Timeline<E>.observeEach(
collector: suspend (E) -> Unit
): TimelineObserver = observe {
collect(collector)

@ -0,0 +1,20 @@
package space.kscience.simulation
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.StateFlow
import kotlinx.datetime.Instant
import kotlin.time.Duration
public interface TimelineCollector<E : Any> : FlowCollector<E> {
public val time: StateFlow<Instant>
public val lastEvent: E?
}
public interface TimelineInterval : WithTime {
public val startTime: Instant
public val duration: Duration
override val time: Instant get() = startTime + duration
}
public data class TimelineEvent<T>(override val time: Instant, val value: T) : WithTime

@ -0,0 +1,121 @@
package space.kscience.simulation
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Instant
import kotlin.coroutines.CoroutineContext
/**
* A timeline that could be forked. The events from the fork appear in parent timeline events, but not vise versa.
*/
public interface ForkingTimeline<E : Any> : CollectingTimeline<E> {
public suspend fun fork(): ForkingTimeline<E>
}
public class TreeTimeline<E : Any>(
private val startTime: Instant,
private val timeOf: E.() -> Instant,
coroutineContext: CoroutineContext,
) : ForkingTimeline<E>, AutoCloseable {
private val timelineScope: CoroutineScope = CoroutineScope(
coroutineContext +
SupervisorJob(coroutineContext[Job]) +
CoroutineExceptionHandler { _, throwable -> throwable.printStackTrace() } +
CoroutineName("TreeTimeline[${hashCode().toString(16)}]")
)
override fun timeOf(event: E): Instant = timeOf(event)
private val _time = MutableStateFlow<Instant>(startTime)
override val time: StateFlow<Instant> get() = _time
override suspend fun advance(toTime: Instant) {
coroutineScope {
observers.forEach {
launch {
it.collect(toTime)
}
}
}
}
private val mutex = Mutex()
private val buffer = mutableListOf<E>()
private val branches: MutableSet<TimelineObserver> = mutableSetOf()
private val events = MutableSharedFlow<E>(1)
override val lastEvent: E? get() = events.replayCache.lastOrNull()
override suspend fun emit(value: E) {
mutex.withLock {
buffer.add(value)
}
}
private val observers: MutableSet<TimelineObserver> = mutableSetOf()
/**
* Update time on this channel event
*/
private val feedbackChannel = Channel<Unit>(onBufferOverflow = BufferOverflow.DROP_OLDEST)
override suspend fun observe(collector: suspend Flow<E>.() -> Unit): TimelineObserver {
val context = currentCoroutineContext()
val observer = object : TimelineObserver {
// observed time
override val time = MutableStateFlow(startTime)
private val channel = Channel<E>()
private val collectJob = timelineScope.launch(context) {
channel.consumeAsFlow().onEach {
time.emit(timeOf(it))
feedbackChannel.send(Unit)
}.collector()
}
private val mutex = Mutex()
override suspend fun collect(upTo: Instant) = mutex.withLock {
require(upTo >= time.value) { "Requested time $upTo is lower than observed ${time.value}" }
TODO("Not yet implemented")
// events().takeWhile {
// timeOf(it) <= upTo
// }.collect {
// channel.send(it)
// }
}
override fun close() {
collectJob.cancel()
observers.remove(this)
}
}
observers.add(observer)
return observer
}
override suspend fun fork(): TreeTimeline<E> {
val theFork = TreeTimeline(time.value, timeOf, timelineScope.coroutineContext)
branches.add(theFork.observeEach {
emit(it)
})
return theFork
}
override fun close() {
observers.forEach { it.close() }
branches.forEach { it.close() }
timelineScope.cancel()
}
}

@ -0,0 +1,7 @@
package space.kscience.simulation
import kotlinx.datetime.Instant
public interface WithTime {
public val time: Instant
}

@ -15,14 +15,15 @@ class TimelineTests {
val startTime = Instant.parse("2020-01-01T00:00:00.000Z")
val generation = GeneratingTimeline(
origin = SimpleTimelineEvent(startTime, Unit),
lookaheadInterval = 1.seconds
origin = TimelineEvent(startTime, Unit),
lookaheadInterval = 1.seconds,
timeOf = WithTime::time
) { event ->
var time = event.time
while (isActive) {
time += 0.1.seconds
println("Emit: ${time - startTime}")
emit(SimpleTimelineEvent(time, Unit))
emit(TimelineEvent(time, Unit))
}
}
@ -38,8 +39,8 @@ class TimelineTests {
observer.collect(2.seconds)
println("Second collection complete")
println("Interrupt")
generation.interrupt(SimpleTimelineEvent(startTime + 6.seconds, Unit))
println("Collecting after interruption")
// generation.interrupt(TimelineEvent(startTime + 6.seconds, Unit))
// println("Collecting after interruption")
observer.collect(startTime + 6.seconds + 2.5.seconds)
println(result)
generation.close()