Compare commits
7 Commits
master
...
feature/si
Author | SHA1 | Date | |
---|---|---|---|
6e26894564 | |||
13b3ef6a8d | |||
e0e7dc7a0f | |||
9de4413037 | |||
8e55d1e22e | |||
ed3e1c13e0 | |||
847b4962c0 |
CHANGELOG.mdgradle.properties
controls-constructor
build.gradle.kts
src
commonMain/kotlin/space/kscience/controls/constructor
commonTest/kotlin/space/kscience/controls/constructor
controls-core/src
commonMain/kotlin/space/kscience/controls
commonTest/kotlin/space/kscience/controls/api
jsMain/kotlin/space/kscience/controls/time
jvmMain/kotlin/space/kscience/controls/time
jvmTest/kotlin/space/kscience/controls/time
nativeMain/kotlin/space/kscience/controls/time
wasmJsMain/kotlin/space/kscience/controls
controls-jupyter
controls-magix
controls-opcua/src/main/kotlin/space/kscience/controls/opcua/client
controls-ports-ktor/src/jvmMain/kotlin/space/kscience/controls/ports
controls-server/src/jvmMain/kotlin/space/kscience/controls/server
controls-storage
controls-xodus/src/jvmTest/kotlin
src/commonMain/kotlin/space/kscience/controls/storage
controls-vision
controls-visualisation-compose
demo
all-things
build.gradle.kts
src/main/kotlin/space/kscience/controls/demo
car/src/jvmMain/kotlin/space/kscience/controls/demo/car
constructor
device-collective/src/jvmMain/kotlin
many-devices
motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster
gradle
magix
magix-api
magix-java-endpoint
magix-rsocket
build.gradle.kts
src
commonMain/kotlin/space/kscience/magix/rsocket
jvmMain/kotlin/space/kscience/magix/rsocket
linuxX64Main/kotlin/rsocket
magix-server
simulation-kt/src
@ -18,6 +18,7 @@
|
||||
- `DeviceHub` now works with `Name` instead of `NameToken`. Tree-like structure is made using `Path`. Device messages no longer have access to sub-devices.
|
||||
- Add some utility methods to ports. Synchronous port response could be now consumed as `Source`.
|
||||
- `DeviceLifecycleState` is replaced by `LifecycleState`.
|
||||
- Time is now mandatory first field of all device messages
|
||||
|
||||
|
||||
### Deprecated
|
||||
|
@ -14,8 +14,10 @@ kscience{
|
||||
wasm()
|
||||
useCoroutines()
|
||||
useSerialization()
|
||||
|
||||
commonMain {
|
||||
api(projects.controlsCore)
|
||||
api(projects.simulationKt)
|
||||
}
|
||||
|
||||
commonTest{
|
||||
|
2
controls-constructor/src/commonMain/kotlin/space/kscience/controls/constructor/ConstructorElement.kt
2
controls-constructor/src/commonMain/kotlin/space/kscience/controls/constructor/ConstructorElement.kt
@ -5,7 +5,7 @@ import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.flow.*
|
||||
import kotlinx.datetime.Instant
|
||||
import space.kscience.controls.api.Device
|
||||
import space.kscience.controls.manager.ClockManager
|
||||
import space.kscience.controls.time.ClockManager
|
||||
import space.kscience.dataforge.context.ContextAware
|
||||
import space.kscience.dataforge.context.request
|
||||
import kotlin.time.Duration
|
||||
|
@ -2,11 +2,13 @@ package space.kscience.controls.constructor
|
||||
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.flow.*
|
||||
import kotlinx.datetime.Clock
|
||||
import space.kscience.controls.api.*
|
||||
import space.kscience.controls.api.LifecycleState.*
|
||||
import space.kscience.controls.manager.DeviceManager
|
||||
import space.kscience.controls.manager.install
|
||||
import space.kscience.controls.spec.DevicePropertySpec
|
||||
import space.kscience.controls.time.clock
|
||||
import space.kscience.dataforge.context.*
|
||||
import space.kscience.dataforge.meta.Laminate
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
@ -16,7 +18,6 @@ import space.kscience.dataforge.misc.DFExperimental
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.names.get
|
||||
import space.kscience.dataforge.names.parseAsName
|
||||
import kotlin.collections.set
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
|
||||
|
||||
@ -66,6 +67,7 @@ public open class DeviceGroup(
|
||||
context.launch {
|
||||
sharedMessageFlow.emit(
|
||||
DeviceErrorMessage(
|
||||
time = clock.now(),
|
||||
errorMessage = throwable.message,
|
||||
errorType = throwable::class.simpleName,
|
||||
errorStackTrace = throwable.stackTraceToString()
|
||||
@ -109,8 +111,9 @@ public open class DeviceGroup(
|
||||
state.valueFlow.map(converter::convert).onEach {
|
||||
sharedMessageFlow.emit(
|
||||
PropertyChangedMessage(
|
||||
descriptor.name,
|
||||
it
|
||||
time = clock.now(),
|
||||
property = descriptor.name,
|
||||
value = it
|
||||
)
|
||||
)
|
||||
}.launchIn(this)
|
||||
@ -172,7 +175,7 @@ public open class DeviceGroup(
|
||||
private suspend fun setLifecycleState(lifecycleState: LifecycleState) {
|
||||
this.lifecycleState = lifecycleState
|
||||
sharedMessageFlow.emit(
|
||||
DeviceLifeCycleMessage(lifecycleState)
|
||||
DeviceLifeCycleMessage(clock.now(), lifecycleState)
|
||||
)
|
||||
}
|
||||
|
||||
@ -194,6 +197,8 @@ public open class DeviceGroup(
|
||||
super.stop()
|
||||
}
|
||||
|
||||
override val clock: Clock = context.clock
|
||||
|
||||
public companion object {
|
||||
|
||||
}
|
||||
|
7
controls-constructor/src/commonMain/kotlin/space/kscience/controls/constructor/ModelConstructor.kt
7
controls-constructor/src/commonMain/kotlin/space/kscience/controls/constructor/ModelConstructor.kt
@ -3,13 +3,14 @@ package space.kscience.controls.constructor
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.newCoroutineContext
|
||||
import space.kscience.controls.time.clock
|
||||
import space.kscience.dataforge.context.Context
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
|
||||
public abstract class ModelConstructor(
|
||||
final override val context: Context,
|
||||
vararg dependencies: DeviceState<*>,
|
||||
) : StateContainer, CoroutineScope {
|
||||
) : StateContainer, CoroutineScope{
|
||||
|
||||
@OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class)
|
||||
override val coroutineContext: CoroutineContext = context.newCoroutineContext(SupervisorJob())
|
||||
@ -30,4 +31,6 @@ public abstract class ModelConstructor(
|
||||
override fun unregisterElement(constructorElement: ConstructorElement) {
|
||||
_constructorElements.remove(constructorElement)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public val ModelConstructor.clock get() = context.clock
|
@ -6,7 +6,7 @@ import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.isActive
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.datetime.Instant
|
||||
import space.kscience.controls.manager.ClockManager
|
||||
import space.kscience.controls.time.ClockManager
|
||||
import kotlin.time.Duration
|
||||
|
||||
/**
|
||||
@ -20,14 +20,15 @@ import kotlin.time.Duration
|
||||
public class TimerState(
|
||||
public val clockManager: ClockManager,
|
||||
public val tick: Duration,
|
||||
initialValue: Instant = Instant.DISTANT_PAST,
|
||||
) : DeviceState<Instant> {
|
||||
|
||||
private val clock = MutableStateFlow(clockManager.clock.now())
|
||||
private val clock = MutableStateFlow(initialValue)
|
||||
|
||||
private val updateJob = clockManager.context.launch(clockManager.asDispatcher()) {
|
||||
private val updateJob = clockManager.context.launch(clockManager.dispatcher) {
|
||||
while (isActive) {
|
||||
delay(tick)
|
||||
clock.value = clockManager.clock.now()
|
||||
delay(tick)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7,7 +7,6 @@ import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import space.kscience.controls.constructor.*
|
||||
import space.kscience.controls.constructor.units.*
|
||||
import space.kscience.controls.manager.clock
|
||||
import space.kscience.dataforge.context.Context
|
||||
import kotlin.time.Duration
|
||||
import kotlin.time.Duration.Companion.milliseconds
|
||||
@ -48,8 +47,6 @@ public class PidRegulator<P : UnitsOfMeasurement, O : UnitsOfMeasurement>(
|
||||
|
||||
val mutex = Mutex()
|
||||
|
||||
val clock = context.clock
|
||||
|
||||
var lastTime = clock.now()
|
||||
|
||||
while (isActive) {
|
||||
|
@ -4,7 +4,7 @@ import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.flow.take
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import space.kscience.controls.manager.ClockManager
|
||||
import space.kscience.controls.time.ClockManager
|
||||
import space.kscience.dataforge.context.Global
|
||||
import space.kscience.dataforge.context.request
|
||||
import kotlin.test.Test
|
||||
|
@ -4,6 +4,7 @@ import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.flow.*
|
||||
import kotlinx.datetime.Clock
|
||||
import space.kscience.controls.api.Device.Companion.DEVICE_TARGET
|
||||
import space.kscience.dataforge.context.ContextAware
|
||||
import space.kscience.dataforge.context.info
|
||||
@ -68,6 +69,11 @@ public interface Device : ContextAware, WithLifeCycle, CoroutineScope {
|
||||
*/
|
||||
override suspend fun start(): Unit = Unit
|
||||
|
||||
/**
|
||||
* Clock associated with this device
|
||||
*/
|
||||
public val clock: Clock
|
||||
|
||||
/**
|
||||
* Close and terminate the device. This function does not wait for the device to be closed.
|
||||
*/
|
||||
|
@ -2,9 +2,7 @@
|
||||
|
||||
package space.kscience.controls.api
|
||||
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.serialization.EncodeDefault
|
||||
import kotlinx.serialization.ExperimentalSerializationApi
|
||||
import kotlinx.serialization.SerialName
|
||||
import kotlinx.serialization.Serializable
|
||||
@ -31,10 +29,12 @@ public sealed class DeviceMessage {
|
||||
|
||||
public companion object {
|
||||
public fun error(
|
||||
time: Instant,
|
||||
cause: Throwable,
|
||||
sourceDevice: Name,
|
||||
targetDevice: Name? = null,
|
||||
): DeviceErrorMessage = DeviceErrorMessage(
|
||||
time = time,
|
||||
errorMessage = cause.message,
|
||||
errorType = cause::class.simpleName,
|
||||
errorStackTrace = cause.stackTraceToString(),
|
||||
@ -54,12 +54,12 @@ public sealed class DeviceMessage {
|
||||
@Serializable
|
||||
@SerialName("property.changed")
|
||||
public data class PropertyChangedMessage(
|
||||
override val time: Instant,
|
||||
public val property: String,
|
||||
public val value: Meta,
|
||||
override val sourceDevice: Name = Name.EMPTY,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
@ -70,12 +70,12 @@ public data class PropertyChangedMessage(
|
||||
@Serializable
|
||||
@SerialName("property.set")
|
||||
public data class PropertySetMessage(
|
||||
override val time: Instant,
|
||||
public val property: String,
|
||||
public val value: Meta,
|
||||
override val sourceDevice: Name? = null,
|
||||
override val targetDevice: Name?,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||
}
|
||||
@ -87,11 +87,11 @@ public data class PropertySetMessage(
|
||||
@Serializable
|
||||
@SerialName("property.get")
|
||||
public data class PropertyGetMessage(
|
||||
override val time: Instant,
|
||||
public val property: String,
|
||||
override val sourceDevice: Name? = null,
|
||||
override val targetDevice: Name,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||
}
|
||||
@ -102,10 +102,10 @@ public data class PropertyGetMessage(
|
||||
@Serializable
|
||||
@SerialName("description.get")
|
||||
public data class GetDescriptionMessage(
|
||||
override val time: Instant,
|
||||
override val sourceDevice: Name? = null,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||
}
|
||||
@ -116,13 +116,13 @@ public data class GetDescriptionMessage(
|
||||
@Serializable
|
||||
@SerialName("description")
|
||||
public data class DescriptionMessage(
|
||||
override val time: Instant,
|
||||
val description: Meta,
|
||||
val properties: Collection<PropertyDescriptor>,
|
||||
val actions: Collection<ActionDescriptor>,
|
||||
override val sourceDevice: Name,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
@ -135,13 +135,13 @@ public data class DescriptionMessage(
|
||||
@Serializable
|
||||
@SerialName("action.execute")
|
||||
public data class ActionExecuteMessage(
|
||||
override val time: Instant,
|
||||
public val action: String,
|
||||
public val argument: Meta?,
|
||||
public val requestId: String,
|
||||
override val sourceDevice: Name? = null,
|
||||
override val targetDevice: Name,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||
}
|
||||
@ -154,13 +154,13 @@ public data class ActionExecuteMessage(
|
||||
@Serializable
|
||||
@SerialName("action.result")
|
||||
public data class ActionResultMessage(
|
||||
override val time: Instant,
|
||||
public val action: String,
|
||||
public val result: Meta?,
|
||||
public val requestId: String,
|
||||
override val sourceDevice: Name,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
@ -176,12 +176,12 @@ public data class ActionResultMessage(
|
||||
@Serializable
|
||||
@SerialName("binary.notification")
|
||||
public data class BinaryNotificationMessage(
|
||||
override val time: Instant,
|
||||
val contentId: String,
|
||||
val contentMeta: Meta,
|
||||
override val sourceDevice: Name,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
@ -193,10 +193,10 @@ public data class BinaryNotificationMessage(
|
||||
@Serializable
|
||||
@SerialName("empty")
|
||||
public data class EmptyDeviceMessage(
|
||||
override val time: Instant,
|
||||
override val sourceDevice: Name? = null,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||
}
|
||||
@ -207,12 +207,12 @@ public data class EmptyDeviceMessage(
|
||||
@Serializable
|
||||
@SerialName("log")
|
||||
public data class DeviceLogMessage(
|
||||
override val time: Instant,
|
||||
val message: String,
|
||||
val data: Meta? = null,
|
||||
override val sourceDevice: Name = Name.EMPTY,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
@ -223,13 +223,13 @@ public data class DeviceLogMessage(
|
||||
@Serializable
|
||||
@SerialName("error")
|
||||
public data class DeviceErrorMessage(
|
||||
override val time: Instant,
|
||||
public val errorMessage: String?,
|
||||
public val errorType: String? = null,
|
||||
public val errorStackTrace: String? = null,
|
||||
override val sourceDevice: Name = Name.EMPTY,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
@ -240,11 +240,11 @@ public data class DeviceErrorMessage(
|
||||
@Serializable
|
||||
@SerialName("lifecycle")
|
||||
public data class DeviceLifeCycleMessage(
|
||||
override val time: Instant,
|
||||
val state: LifecycleState,
|
||||
override val sourceDevice: Name = Name.EMPTY,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
|
@ -1,7 +1,16 @@
|
||||
package space.kscience.controls.misc
|
||||
package space.kscience.controls
|
||||
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.io.Sink
|
||||
import kotlinx.io.Source
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.io.IOFormat
|
||||
import space.kscience.dataforge.io.IOFormatFactory
|
||||
import space.kscience.dataforge.meta.*
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.names.asName
|
||||
import kotlin.reflect.KType
|
||||
import kotlin.reflect.typeOf
|
||||
import kotlin.time.Duration
|
||||
import kotlin.time.DurationUnit
|
||||
import kotlin.time.toDuration
|
||||
@ -40,6 +49,33 @@ private object InstantConverter : MetaConverter<Instant> {
|
||||
|
||||
public val MetaConverter.Companion.instant: MetaConverter<Instant> get() = InstantConverter
|
||||
|
||||
public fun Instant.toMeta(): Meta = Meta(toString())
|
||||
|
||||
public val Meta?.instant: Instant? get() = this?.value?.string?.let { Instant.parse(it) }
|
||||
|
||||
/**
|
||||
* An [IOFormat] for [Instant]
|
||||
*/
|
||||
public object InstantIOFormat : IOFormat<Instant>, IOFormatFactory<Instant> {
|
||||
override fun build(context: Context, meta: Meta): IOFormat<Instant> = this
|
||||
|
||||
override val name: Name = "instant".asName()
|
||||
|
||||
override val type: KType get() = typeOf<Instant>()
|
||||
|
||||
override fun writeTo(sink: Sink, obj: Instant) {
|
||||
sink.writeLong(obj.epochSeconds)
|
||||
sink.writeInt(obj.nanosecondsOfSecond)
|
||||
}
|
||||
|
||||
override fun readFrom(source: Source): Instant {
|
||||
val seconds = source.readLong()
|
||||
val nanoseconds = source.readInt()
|
||||
return Instant.fromEpochSeconds(seconds, nanoseconds)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private object DoubleRangeConverter : MetaConverter<ClosedFloatingPointRange<Double>> {
|
||||
override fun readOrNull(source: Meta): ClosedFloatingPointRange<Double>? =
|
||||
source.value?.doubleArray?.let { (start, end) ->
|
||||
@ -52,11 +88,3 @@ private object DoubleRangeConverter : MetaConverter<ClosedFloatingPointRange<Dou
|
||||
}
|
||||
|
||||
public val MetaConverter.Companion.doubleRange: MetaConverter<ClosedFloatingPointRange<Double>> get() = DoubleRangeConverter
|
||||
|
||||
private object StringListConverter : MetaConverter<List<String>> {
|
||||
override fun convert(obj: List<String>): Meta = Meta(obj.map { it.asValue() }.asValue())
|
||||
|
||||
override fun readOrNull(source: Meta): List<String>? = source.stringList ?: source["@jsonArray"]?.stringList
|
||||
}
|
||||
|
||||
public val MetaConverter.Companion.stringList: MetaConverter<List<String>> get() = StringListConverter
|
@ -1,109 +0,0 @@
|
||||
package space.kscience.controls.manager
|
||||
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlinx.datetime.Instant
|
||||
import space.kscience.controls.api.Device
|
||||
import space.kscience.dataforge.context.*
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.double
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
import kotlin.math.roundToLong
|
||||
import kotlin.time.Duration
|
||||
|
||||
@OptIn(InternalCoroutinesApi::class)
|
||||
private class CompressedTimeDispatcher(
|
||||
val clockManager: ClockManager,
|
||||
val dispatcher: CoroutineDispatcher,
|
||||
val compression: Double,
|
||||
) : CoroutineDispatcher(), Delay {
|
||||
|
||||
@InternalCoroutinesApi
|
||||
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
|
||||
dispatcher.dispatchYield(context, block)
|
||||
}
|
||||
|
||||
override fun isDispatchNeeded(context: CoroutineContext): Boolean = dispatcher.isDispatchNeeded(context)
|
||||
|
||||
@ExperimentalCoroutinesApi
|
||||
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher = dispatcher.limitedParallelism(parallelism)
|
||||
|
||||
override fun dispatch(context: CoroutineContext, block: Runnable) {
|
||||
dispatcher.dispatch(context, block)
|
||||
}
|
||||
|
||||
private val delay = ((dispatcher as? Delay) ?: (Dispatchers.Default as Delay))
|
||||
|
||||
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
|
||||
delay.scheduleResumeAfterDelay((timeMillis / compression).roundToLong(), continuation)
|
||||
}
|
||||
|
||||
|
||||
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
|
||||
return delay.invokeOnTimeout((timeMillis / compression).roundToLong(), block, context)
|
||||
}
|
||||
}
|
||||
|
||||
private class CompressedClock(
|
||||
val start: Instant,
|
||||
val compression: Double,
|
||||
val baseClock: Clock = Clock.System,
|
||||
) : Clock {
|
||||
override fun now(): Instant {
|
||||
val elapsed = (baseClock.now() - start)
|
||||
return start + elapsed / compression
|
||||
}
|
||||
}
|
||||
|
||||
public class ClockManager : AbstractPlugin() {
|
||||
override val tag: PluginTag get() = Companion.tag
|
||||
|
||||
public val timeCompression: Double by meta.double(1.0)
|
||||
|
||||
public val clock: Clock by lazy {
|
||||
if (timeCompression == 1.0) {
|
||||
Clock.System
|
||||
} else {
|
||||
CompressedClock(Clock.System.now(), timeCompression)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide a [CoroutineDispatcher] with compressed time based on given [dispatcher]
|
||||
*/
|
||||
public fun asDispatcher(
|
||||
dispatcher: CoroutineDispatcher = Dispatchers.Default,
|
||||
): CoroutineDispatcher = if (timeCompression == 1.0) {
|
||||
dispatcher
|
||||
} else {
|
||||
CompressedTimeDispatcher(this, dispatcher, timeCompression)
|
||||
}
|
||||
|
||||
public fun scheduleWithFixedDelay(tick: Duration, block: suspend () -> Unit): Job = context.launch(asDispatcher()) {
|
||||
while (isActive) {
|
||||
delay(tick)
|
||||
block()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public companion object : PluginFactory<ClockManager> {
|
||||
override val tag: PluginTag = PluginTag("clock", group = PluginTag.DATAFORGE_GROUP)
|
||||
|
||||
override fun build(context: Context, meta: Meta): ClockManager = ClockManager()
|
||||
}
|
||||
}
|
||||
|
||||
public val Context.clock: Clock get() = plugins[ClockManager]?.clock ?: Clock.System
|
||||
|
||||
public val Device.clock: Clock get() = context.clock
|
||||
|
||||
public fun Device.getCoroutineDispatcher(dispatcher: CoroutineDispatcher = Dispatchers.Default): CoroutineDispatcher =
|
||||
context.plugins[ClockManager]?.asDispatcher(dispatcher) ?: dispatcher
|
||||
|
||||
public fun ContextBuilder.withTimeCompression(compression: Double) {
|
||||
require(compression > 0.0) { "Time compression must be greater than zero." }
|
||||
plugin(ClockManager) {
|
||||
"timeCompression" put compression
|
||||
}
|
||||
}
|
@ -15,25 +15,28 @@ public suspend fun Device.respondMessage(deviceTarget: Name, request: DeviceMess
|
||||
when (request) {
|
||||
is PropertyGetMessage -> {
|
||||
PropertyChangedMessage(
|
||||
time = clock.now(),
|
||||
property = request.property,
|
||||
value = getOrReadProperty(request.property),
|
||||
sourceDevice = deviceTarget,
|
||||
targetDevice = request.sourceDevice
|
||||
targetDevice = request.sourceDevice,
|
||||
)
|
||||
}
|
||||
|
||||
is PropertySetMessage -> {
|
||||
writeProperty(request.property, request.value)
|
||||
PropertyChangedMessage(
|
||||
time = clock.now(),
|
||||
property = request.property,
|
||||
value = getOrReadProperty(request.property),
|
||||
sourceDevice = deviceTarget,
|
||||
targetDevice = request.sourceDevice
|
||||
targetDevice = request.sourceDevice,
|
||||
)
|
||||
}
|
||||
|
||||
is ActionExecuteMessage -> {
|
||||
ActionResultMessage(
|
||||
time = clock.now(),
|
||||
action = request.action,
|
||||
result = execute(request.action, request.argument),
|
||||
requestId = request.requestId,
|
||||
@ -44,6 +47,7 @@ public suspend fun Device.respondMessage(deviceTarget: Name, request: DeviceMess
|
||||
|
||||
is GetDescriptionMessage -> {
|
||||
DescriptionMessage(
|
||||
time = clock.now(),
|
||||
description = meta,
|
||||
properties = propertyDescriptors,
|
||||
actions = actionDescriptors,
|
||||
@ -60,10 +64,15 @@ public suspend fun Device.respondMessage(deviceTarget: Name, request: DeviceMess
|
||||
is EmptyDeviceMessage,
|
||||
is DeviceLogMessage,
|
||||
is DeviceLifeCycleMessage,
|
||||
-> null
|
||||
-> null
|
||||
}
|
||||
} catch (ex: Exception) {
|
||||
DeviceMessage.error(ex, sourceDevice = deviceTarget, targetDevice = request.sourceDevice)
|
||||
DeviceMessage.error(
|
||||
time = clock.now(),
|
||||
cause = ex,
|
||||
sourceDevice = deviceTarget,
|
||||
targetDevice = request.sourceDevice
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
@ -82,7 +91,14 @@ public suspend fun DeviceHub.respondHubMessage(request: DeviceMessage): List<Dev
|
||||
listOfNotNull(device.respondMessage(targetName, request))
|
||||
}
|
||||
} catch (ex: Exception) {
|
||||
listOf(DeviceMessage.error(ex, sourceDevice = Name.EMPTY, targetDevice = request.sourceDevice))
|
||||
listOf(
|
||||
DeviceMessage.error(
|
||||
time = request.time, //FIXME add actual time
|
||||
cause = ex,
|
||||
sourceDevice = Name.EMPTY,
|
||||
targetDevice = request.sourceDevice
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,42 +0,0 @@
|
||||
package space.kscience.controls.misc
|
||||
|
||||
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.io.Sink
|
||||
import kotlinx.io.Source
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.io.IOFormat
|
||||
import space.kscience.dataforge.io.IOFormatFactory
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.string
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.names.asName
|
||||
import kotlin.reflect.KType
|
||||
import kotlin.reflect.typeOf
|
||||
|
||||
|
||||
/**
|
||||
* An [IOFormat] for [Instant]
|
||||
*/
|
||||
public object InstantIOFormat : IOFormat<Instant>, IOFormatFactory<Instant> {
|
||||
override fun build(context: Context, meta: Meta): IOFormat<Instant> = this
|
||||
|
||||
override val name: Name = "instant".asName()
|
||||
|
||||
override val type: KType get() = typeOf<Instant>()
|
||||
|
||||
override fun writeTo(sink: Sink, obj: Instant) {
|
||||
sink.writeLong(obj.epochSeconds)
|
||||
sink.writeInt(obj.nanosecondsOfSecond)
|
||||
}
|
||||
|
||||
override fun readFrom(source: Source): Instant {
|
||||
val seconds = source.readLong()
|
||||
val nanoseconds = source.readInt()
|
||||
return Instant.fromEpochSeconds(seconds, nanoseconds)
|
||||
}
|
||||
}
|
||||
|
||||
public fun Instant.toMeta(): Meta = Meta(toString())
|
||||
|
||||
public val Meta.instant: Instant? get() = value?.string?.let { Instant.parse(it) }
|
@ -6,7 +6,9 @@ import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.SharedFlow
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.datetime.Clock
|
||||
import space.kscience.controls.api.*
|
||||
import space.kscience.controls.time.clock
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.debug
|
||||
import space.kscience.dataforge.context.error
|
||||
@ -72,6 +74,7 @@ public abstract class DeviceBase<D : Device>(
|
||||
onBufferOverflow = BufferOverflow.DROP_OLDEST
|
||||
)
|
||||
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
override val coroutineContext: CoroutineContext = context.newCoroutineContext(
|
||||
SupervisorJob(context.coroutineContext[Job]) +
|
||||
CoroutineName("Device $id") +
|
||||
@ -79,6 +82,7 @@ public abstract class DeviceBase<D : Device>(
|
||||
launch {
|
||||
sharedMessageFlow.emit(
|
||||
DeviceErrorMessage(
|
||||
time = clock.now(),
|
||||
errorMessage = throwable.message,
|
||||
errorType = throwable::class.simpleName,
|
||||
errorStackTrace = throwable.stackTraceToString()
|
||||
@ -112,7 +116,7 @@ public abstract class DeviceBase<D : Device>(
|
||||
logicalState[propertyName] = value
|
||||
}
|
||||
if (value != null) {
|
||||
sharedMessageFlow.emit(PropertyChangedMessage(propertyName, value))
|
||||
sharedMessageFlow.emit(PropertyChangedMessage(clock.now(), propertyName, value))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -194,7 +198,7 @@ public abstract class DeviceBase<D : Device>(
|
||||
private suspend fun setLifecycleState(lifecycleState: LifecycleState) {
|
||||
this.lifecycleState = lifecycleState
|
||||
sharedMessageFlow.emit(
|
||||
DeviceLifeCycleMessage(lifecycleState)
|
||||
DeviceLifeCycleMessage(clock.now(), lifecycleState)
|
||||
)
|
||||
}
|
||||
|
||||
@ -223,6 +227,7 @@ public abstract class DeviceBase<D : Device>(
|
||||
super.stop()
|
||||
}
|
||||
|
||||
override val clock: Clock = context.clock
|
||||
|
||||
abstract override fun toString(): String
|
||||
|
||||
|
@ -4,7 +4,7 @@ import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.flow
|
||||
import space.kscience.controls.api.Device
|
||||
import space.kscience.controls.manager.getCoroutineDispatcher
|
||||
import space.kscience.controls.time.coroutineDispatcher
|
||||
import kotlin.time.Duration
|
||||
|
||||
/**
|
||||
@ -16,7 +16,7 @@ public fun <D : Device> D.doRecurring(
|
||||
task: suspend D.() -> Unit,
|
||||
): Job {
|
||||
val taskName = debugTaskName ?: "task[${task.hashCode().toString(16)}]"
|
||||
val dispatcher = getCoroutineDispatcher()
|
||||
val dispatcher = coroutineDispatcher
|
||||
return launch(CoroutineName(taskName) + dispatcher) {
|
||||
while (isActive) {
|
||||
delay(interval)
|
||||
|
@ -0,0 +1,127 @@
|
||||
package space.kscience.controls.time
|
||||
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlinx.datetime.Instant
|
||||
import space.kscience.controls.api.Device
|
||||
import space.kscience.controls.instant
|
||||
import space.kscience.dataforge.context.*
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.double
|
||||
import space.kscience.dataforge.meta.get
|
||||
import space.kscience.dataforge.meta.string
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
import kotlin.math.roundToLong
|
||||
import kotlin.time.Duration
|
||||
|
||||
@OptIn(InternalCoroutinesApi::class)
|
||||
private class CompressedTimeDispatcher(
|
||||
val coroutineContext: CoroutineContext,
|
||||
val compression: Double,
|
||||
) : CoroutineDispatcher(), Delay {
|
||||
|
||||
val dispatcher = coroutineContext[CoroutineDispatcher] ?: Dispatchers.Default
|
||||
|
||||
@InternalCoroutinesApi
|
||||
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
|
||||
dispatcher.dispatchYield(context, block)
|
||||
}
|
||||
|
||||
override fun isDispatchNeeded(context: CoroutineContext): Boolean = dispatcher.isDispatchNeeded(context)
|
||||
|
||||
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher =
|
||||
dispatcher.limitedParallelism(parallelism, name)
|
||||
|
||||
override fun dispatch(context: CoroutineContext, block: Runnable) {
|
||||
dispatcher.dispatch(context, block)
|
||||
}
|
||||
|
||||
private val parentDelay = ((dispatcher as? Delay) ?: (Dispatchers.Default as Delay))
|
||||
|
||||
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
|
||||
parentDelay.scheduleResumeAfterDelay((timeMillis / compression).roundToLong(), continuation)
|
||||
}
|
||||
|
||||
|
||||
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
|
||||
parentDelay.invokeOnTimeout((timeMillis / compression).roundToLong(), block, context)
|
||||
}
|
||||
|
||||
private class CompressedClock(
|
||||
val baseClock: Clock = Clock.System,
|
||||
val compression: Double,
|
||||
val start: Instant = baseClock.now(),
|
||||
) : Clock {
|
||||
override fun now(): Instant {
|
||||
val elapsed = (baseClock.now() - start)
|
||||
return start + elapsed / compression
|
||||
}
|
||||
}
|
||||
|
||||
internal expect fun resolveClock(meta: Meta): Clock?
|
||||
|
||||
public sealed interface ClockMode {
|
||||
public object System : ClockMode
|
||||
public class Custom(public val clock: Clock) : ClockMode
|
||||
public class Compressed(public val compression: Double) : ClockMode
|
||||
public class Virtual(public val manager: VirtualTimeManager) : ClockMode
|
||||
}
|
||||
|
||||
public class ClockManager : AbstractPlugin() {
|
||||
override val tag: PluginTag get() = Companion.tag
|
||||
|
||||
public val clockMode: ClockMode = when (meta["clock.mode"].string) {
|
||||
null, "system" -> ClockMode.System
|
||||
"virtual" -> ClockMode.Virtual(VirtualTimeManager(meta["clock.start"]?.instant ?: Clock.System.now()))
|
||||
"compressed" -> ClockMode.Compressed(meta["clock.compression"].double ?: 1.0)
|
||||
else -> ClockMode.Custom(resolveClock(meta) ?: error("Can't resolve clock for $meta"))
|
||||
}
|
||||
|
||||
public val clock: Clock = when (clockMode) {
|
||||
ClockMode.System -> Clock.System
|
||||
is ClockMode.Custom -> clockMode.clock
|
||||
is ClockMode.Compressed -> CompressedClock(Clock.System, clockMode.compression)
|
||||
is ClockMode.Virtual -> clockMode.manager
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Provide a [CoroutineDispatcher] with compressed time based on context dispatcher
|
||||
*/
|
||||
public val dispatcher: CoroutineDispatcher by lazy {
|
||||
when (clockMode) {
|
||||
is ClockMode.System, is ClockMode.Custom -> context.coroutineContext[CoroutineDispatcher]
|
||||
?: Dispatchers.Default
|
||||
|
||||
is ClockMode.Compressed -> CompressedTimeDispatcher(context.coroutineContext, clockMode.compression)
|
||||
is ClockMode.Virtual -> VirtualTimeDispatcher(context.coroutineContext, clockMode.manager)
|
||||
}
|
||||
}
|
||||
|
||||
public fun scheduleWithFixedDelay(tick: Duration, block: suspend () -> Unit): Job = context.launch(dispatcher) {
|
||||
while (isActive) {
|
||||
delay(tick)
|
||||
block()
|
||||
}
|
||||
}
|
||||
|
||||
public companion object : PluginFactory<ClockManager> {
|
||||
override val tag: PluginTag = PluginTag("clock", group = PluginTag.DATAFORGE_GROUP)
|
||||
|
||||
override fun build(context: Context, meta: Meta): ClockManager = ClockManager()
|
||||
}
|
||||
}
|
||||
|
||||
public val Context.clock: Clock get() = plugins[ClockManager]?.clock ?: Clock.System
|
||||
|
||||
public val Device.coroutineDispatcher: CoroutineDispatcher
|
||||
get() = context.plugins[ClockManager]?.dispatcher
|
||||
?: context.coroutineContext[CoroutineDispatcher]
|
||||
?: Dispatchers.Default
|
||||
|
||||
public fun ContextBuilder.withTimeCompression(compression: Double) {
|
||||
require(compression > 0.0) { "Time compression must be greater than zero." }
|
||||
plugin(ClockManager) {
|
||||
"timeCompression" put compression
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package space.kscience.controls.misc
|
||||
package space.kscience.controls.time
|
||||
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.*
|
@ -1,8 +1,11 @@
|
||||
package space.kscience.controls.misc
|
||||
package space.kscience.controls.time
|
||||
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.io.Sink
|
||||
import kotlinx.io.Source
|
||||
import space.kscience.controls.InstantIOFormat
|
||||
import space.kscience.controls.instant
|
||||
import space.kscience.controls.toMeta
|
||||
import space.kscience.dataforge.io.IOFormat
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.MetaConverter
|
159
controls-core/src/commonMain/kotlin/space/kscience/controls/time/VirtualTimeManager.kt
Normal file
159
controls-core/src/commonMain/kotlin/space/kscience/controls/time/VirtualTimeManager.kt
Normal file
@ -0,0 +1,159 @@
|
||||
package space.kscience.controls.time
|
||||
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.flow.takeWhile
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
import kotlin.time.Duration
|
||||
import kotlin.time.Duration.Companion.milliseconds
|
||||
|
||||
public class VirtualTimeManager(
|
||||
startTime: Instant,
|
||||
) : Clock {
|
||||
private val _time = MutableStateFlow(startTime)
|
||||
|
||||
private fun advanceTime() {
|
||||
markerTimes.values.minOrNull()?.let {
|
||||
_time.value = it
|
||||
}
|
||||
}
|
||||
|
||||
public val time: StateFlow<Instant> get() = _time
|
||||
|
||||
override fun now(): Instant = _time.value
|
||||
|
||||
private val markerTimes = mutableMapOf<Any, Instant>()
|
||||
|
||||
private val mutex = Mutex()
|
||||
|
||||
/**
|
||||
* Read current time for the given [handle]. Handle time is always lower or equals to global manager time
|
||||
*/
|
||||
public suspend fun readTime(handle: Any): Instant = markerTimes[handle] ?: time.value
|
||||
|
||||
/**
|
||||
* Set target of [handle] timeline to [to] and wait for it to happen
|
||||
*/
|
||||
public suspend fun advanceTimeTo(handle: Any, to: Instant) {
|
||||
val currentMarkerTime = readTime(handle)
|
||||
//if it is already the last instant - bypass
|
||||
if (currentMarkerTime == to) return
|
||||
// require that time is in the future
|
||||
require(to > currentMarkerTime) { "The advanced time for marker `$handle` $to is less that current marker time $currentMarkerTime" }
|
||||
|
||||
// println("$handle locked at $currentMarkerTime")
|
||||
|
||||
mutex.withLock {
|
||||
if(handle is Job && handle !in markerTimes.keys) {
|
||||
//clear job marker on completion
|
||||
handle.invokeOnCompletion {
|
||||
markerTimes.remove(handle)
|
||||
advanceTime()
|
||||
}
|
||||
}
|
||||
markerTimes[handle] = to
|
||||
// advance time if necessary
|
||||
advanceTime()
|
||||
}
|
||||
|
||||
// wait for time to exceed marker time
|
||||
if (time.value < to) {
|
||||
time.takeWhile {
|
||||
it < to
|
||||
}.collect()
|
||||
}
|
||||
// println("$handle unlocked at $currentMarkerTime")
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark given [handle] as idle so its time could advance to the time after all other handles. Then wait for the time to advance.
|
||||
*/
|
||||
public suspend fun pass(handle: Any) {
|
||||
advanceTimeTo(handle, markerTimes.values.max())
|
||||
mutex.withLock {
|
||||
markerTimes.remove(handle)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark the whole manager as idle and advance time to the maximum of all handles. Don't wait for time to advance
|
||||
*/
|
||||
public suspend fun pass(){
|
||||
_time.value = markerTimes.values.max()
|
||||
mutex.withLock {
|
||||
markerTimes.clear()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public suspend fun VirtualTimeManager.advanceTimeBy(handle: Any, duration: Duration) {
|
||||
advanceTimeTo(handle, readTime(handle) + duration)
|
||||
}
|
||||
|
||||
@OptIn(InternalCoroutinesApi::class)
|
||||
public class VirtualTimeDispatcher internal constructor(
|
||||
private val coroutineContext: CoroutineContext,
|
||||
private val virtualTimeManager: VirtualTimeManager
|
||||
) : CoroutineDispatcher(), Delay {
|
||||
|
||||
private val scope = CoroutineScope(coroutineContext)
|
||||
|
||||
private val dispatcher: CoroutineDispatcher = coroutineContext[CoroutineDispatcher] ?: Dispatchers.Default
|
||||
|
||||
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = dispatcher.dispatch(context, block)
|
||||
|
||||
override fun limitedParallelism(
|
||||
parallelism: Int,
|
||||
name: String?
|
||||
): CoroutineDispatcher = VirtualTimeDispatcher(
|
||||
coroutineContext = dispatcher.limitedParallelism(parallelism, name),
|
||||
virtualTimeManager = virtualTimeManager
|
||||
)
|
||||
|
||||
override fun isDispatchNeeded(context: CoroutineContext): Boolean = dispatcher.isDispatchNeeded(context)
|
||||
|
||||
@InternalCoroutinesApi
|
||||
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
|
||||
dispatcher.dispatchYield(context, block)
|
||||
}
|
||||
|
||||
override fun toString(): String = "VirtualTimeDispatcher($virtualTimeManager)"
|
||||
|
||||
override fun scheduleResumeAfterDelay(
|
||||
timeMillis: Long,
|
||||
continuation: CancellableContinuation<Unit>
|
||||
) {
|
||||
val handle = continuation.context[Job] ?: error("Can't use VirtualTimeDispatcher without Job")
|
||||
|
||||
val scheduledJob = scope.launch {
|
||||
virtualTimeManager.advanceTimeBy(handle, timeMillis.milliseconds)
|
||||
dispatcher.dispatch(
|
||||
continuation.context,
|
||||
Runnable {
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
with(dispatcher) { with(continuation) { resumeUndispatched(Unit) } }
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
continuation.disposeOnCancellation {
|
||||
scheduledJob.cancel()
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public fun CoroutineContext.withVirtualTime(
|
||||
virtualTimeManager: VirtualTimeManager
|
||||
): CoroutineContext = if (this[Job] != null) {
|
||||
this
|
||||
} else {
|
||||
//add job if it is not present
|
||||
plus(Job(null))
|
||||
}.plus(VirtualTimeDispatcher(this, virtualTimeManager))
|
@ -1,15 +1,15 @@
|
||||
package space.kscience.controls.api
|
||||
|
||||
import kotlinx.serialization.encodeToString
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlinx.serialization.json.Json
|
||||
import space.kscience.controls.misc.asMeta
|
||||
import space.kscience.controls.asMeta
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class MessageTest {
|
||||
@Test
|
||||
fun messageSerialization() {
|
||||
val changedMessage = PropertyChangedMessage("test", 22.0.asMeta())
|
||||
val changedMessage = PropertyChangedMessage(Clock.System.now(),"test", 22.0.asMeta())
|
||||
val json = Json.encodeToString(changedMessage)
|
||||
val reconstructed: PropertyChangedMessage = Json.decodeFromString(json)
|
||||
assertEquals(changedMessage.time, reconstructed.time)
|
||||
|
@ -0,0 +1,6 @@
|
||||
package space.kscience.controls.time
|
||||
|
||||
import kotlinx.datetime.Clock
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
|
||||
internal actual fun resolveClock(meta: Meta): Clock? = null
|
@ -0,0 +1,17 @@
|
||||
package space.kscience.controls.time
|
||||
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.datetime.toKotlinInstant
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.get
|
||||
import space.kscience.dataforge.meta.string
|
||||
|
||||
internal actual fun resolveClock(meta: Meta): Clock? = when (meta["clock.mode"].string) {
|
||||
"jvm" -> NanoClock
|
||||
else -> null
|
||||
}
|
||||
|
||||
public object NanoClock: Clock {
|
||||
override fun now(): Instant = java.time.Instant.now().toKotlinInstant()
|
||||
}
|
@ -0,0 +1,56 @@
|
||||
package space.kscience.controls.time
|
||||
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertTrue
|
||||
import kotlin.time.Duration.Companion.milliseconds
|
||||
|
||||
private data class TimedResult(val time: Instant, val marker: String)
|
||||
|
||||
class VirtualTimeTest {
|
||||
@Test
|
||||
fun manualAdvance(): Unit {
|
||||
val timeManager = VirtualTimeManager(Instant.fromEpochMilliseconds(0L))
|
||||
val collector = mutableListOf<TimedResult>()
|
||||
runBlocking(Dispatchers.Default) {
|
||||
withTimeout(500) {
|
||||
repeat(3) { series ->
|
||||
launch {
|
||||
timeManager.advanceTimeBy(series, 100.milliseconds * (series + 1))
|
||||
repeat(10) { number ->
|
||||
collector.add(TimedResult(timeManager.now(),"$series.$number"))
|
||||
timeManager.advanceTimeBy(series, 2000.milliseconds)
|
||||
}
|
||||
timeManager.pass(series)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
println(collector.joinToString("\n"))
|
||||
assertTrue { collector.sortedBy { it.time } == collector }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun contextAdvance(): Unit {
|
||||
val timeManager = VirtualTimeManager(Instant.fromEpochMilliseconds(0L))
|
||||
val collector = mutableListOf<TimedResult>()
|
||||
runBlocking(Dispatchers.Default.withVirtualTime(timeManager)) {
|
||||
withTimeout(500) {
|
||||
repeat(3) { series ->
|
||||
launch {
|
||||
delay(100.milliseconds * (series + 1))
|
||||
repeat((series + 1) * 10) { number ->
|
||||
collector.add(TimedResult(timeManager.now(),"$series.$number"))
|
||||
println(collector.last())
|
||||
delay(2000.milliseconds)
|
||||
}
|
||||
//timeManager.pass(this)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
println(collector.joinToString("\n"))
|
||||
assertTrue { collector.sortedBy { it.time } == collector }
|
||||
}
|
||||
}
|
6
controls-core/src/nativeMain/kotlin/space/kscience/controls/time/resolveClock.native.kt
Normal file
6
controls-core/src/nativeMain/kotlin/space/kscience/controls/time/resolveClock.native.kt
Normal file
@ -0,0 +1,6 @@
|
||||
package space.kscience.controls.time
|
||||
|
||||
import kotlinx.datetime.Clock
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
|
||||
internal actual fun resolveClock(meta: Meta): Clock? = null
|
@ -0,0 +1,6 @@
|
||||
package space.kscience.controls.time
|
||||
|
||||
import kotlinx.datetime.Clock
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
|
||||
internal actual fun resolveClock(meta: Meta): Clock? = null
|
@ -5,13 +5,18 @@ plugins {
|
||||
|
||||
kscience {
|
||||
fullStack("js/controls-jupyter.js")
|
||||
useKtor()
|
||||
useContextReceivers()
|
||||
jupyterLibrary("space.kscience.controls.jupyter.ControlsJupyter")
|
||||
dependencies {
|
||||
implementation(projects.controlsVision)
|
||||
implementation(libs.visionforge.jupiter)
|
||||
}
|
||||
|
||||
jsMain{
|
||||
//FIXME remove after VisionForge 0.5
|
||||
api("org.jetbrains.kotlin-wrappers:kotlin-extensions:1.0.1-pre.823")
|
||||
}
|
||||
|
||||
jvmMain {
|
||||
implementation(spclibs.logback.classic)
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
import space.kscience.plotly.PlotlyPlugin
|
||||
import space.kscience.visionforge.html.runVisionClient
|
||||
import space.kscience.visionforge.jupyter.VFNotebookClient
|
||||
import space.kscience.visionforge.markup.MarkupPlugin
|
||||
import space.kscience.visionforge.plotly.PlotlyPlugin
|
||||
|
||||
public fun main(): Unit = runVisionClient {
|
||||
// plugin(DeviceManager)
|
||||
|
@ -2,8 +2,8 @@ package space.kscience.controls.jupyter
|
||||
|
||||
import org.jetbrains.kotlinx.jupyter.api.declare
|
||||
import org.jetbrains.kotlinx.jupyter.api.libraries.resources
|
||||
import space.kscience.controls.manager.ClockManager
|
||||
import space.kscience.controls.manager.DeviceManager
|
||||
import space.kscience.controls.time.ClockManager
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.misc.DFExperimental
|
||||
import space.kscience.plotly.Plot
|
||||
|
@ -13,7 +13,7 @@ kscience {
|
||||
jvm()
|
||||
js()
|
||||
native()
|
||||
wasm()
|
||||
// wasm()
|
||||
useCoroutines()
|
||||
useSerialization {
|
||||
json()
|
||||
|
@ -8,10 +8,12 @@ import kotlinx.coroutines.flow.*
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.datetime.Clock
|
||||
import space.kscience.controls.api.*
|
||||
import space.kscience.controls.manager.DeviceManager
|
||||
import space.kscience.controls.spec.DevicePropertySpec
|
||||
import space.kscience.controls.spec.name
|
||||
import space.kscience.controls.time.clock
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.misc.DFExperimental
|
||||
@ -67,7 +69,7 @@ public class DeviceClient internal constructor(
|
||||
|
||||
override suspend fun readProperty(propertyName: String): Meta {
|
||||
send(
|
||||
PropertyGetMessage(propertyName, targetDevice = deviceName)
|
||||
PropertyGetMessage(clock.now(), propertyName, targetDevice = deviceName)
|
||||
)
|
||||
return messageFlow.filterIsInstance<PropertyChangedMessage>().first {
|
||||
it.property == propertyName
|
||||
@ -84,14 +86,25 @@ public class DeviceClient internal constructor(
|
||||
|
||||
override suspend fun writeProperty(propertyName: String, value: Meta) {
|
||||
send(
|
||||
PropertySetMessage(propertyName, value, targetDevice = deviceName)
|
||||
PropertySetMessage(
|
||||
time = clock.now(),
|
||||
property = propertyName,
|
||||
value = value,
|
||||
targetDevice = deviceName
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
override suspend fun execute(actionName: String, argument: Meta?): Meta? {
|
||||
val id = stringUID()
|
||||
send(
|
||||
ActionExecuteMessage(actionName, argument, id, targetDevice = deviceName)
|
||||
ActionExecuteMessage(
|
||||
time = clock.now(),
|
||||
action = actionName,
|
||||
argument = argument,
|
||||
requestId = id,
|
||||
targetDevice = deviceName
|
||||
)
|
||||
)
|
||||
return messageFlow.filterIsInstance<ActionResultMessage>().first {
|
||||
it.action == actionName && it.requestId == id
|
||||
@ -103,6 +116,8 @@ public class DeviceClient internal constructor(
|
||||
|
||||
@DFExperimental
|
||||
override val lifecycleState: LifecycleState get() = lifecycleStateFlow.value
|
||||
|
||||
override val clock: Clock = context.clock
|
||||
}
|
||||
|
||||
/**
|
||||
@ -135,7 +150,7 @@ public suspend fun MagixEndpoint.remoteDevice(
|
||||
|
||||
send(
|
||||
format = DeviceManager.magixFormat,
|
||||
payload = GetDescriptionMessage(targetDevice = deviceName),
|
||||
payload = GetDescriptionMessage(Clock.System.now(), targetDevice = deviceName),
|
||||
source = thisEndpoint,
|
||||
target = deviceEndpoint,
|
||||
id = stringUID()
|
||||
@ -196,7 +211,7 @@ public suspend fun MagixEndpoint.remoteDeviceHub(
|
||||
|
||||
send(
|
||||
format = DeviceManager.magixFormat,
|
||||
payload = GetDescriptionMessage(targetDevice = null),
|
||||
payload = GetDescriptionMessage(Clock.System.now(), targetDevice = null),
|
||||
source = thisEndpoint,
|
||||
target = deviceEndpoint,
|
||||
id = stringUID()
|
||||
@ -214,7 +229,7 @@ public suspend fun MagixEndpoint.requestDeviceUpdate(
|
||||
) {
|
||||
send(
|
||||
format = DeviceManager.magixFormat,
|
||||
payload = GetDescriptionMessage(),
|
||||
payload = GetDescriptionMessage(Clock.System.now()),
|
||||
source = thisEndpoint,
|
||||
target = deviceEndpoint,
|
||||
id = stringUID()
|
||||
@ -247,6 +262,7 @@ public suspend fun <T> MagixEndpoint.sendControlsPropertyChange(
|
||||
value: T,
|
||||
) {
|
||||
val message = PropertySetMessage(
|
||||
Clock.System.now(),
|
||||
property = propertySpec.name,
|
||||
value = propertySpec.converter.convert(value),
|
||||
targetDevice = deviceName
|
||||
|
@ -13,8 +13,8 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.*
|
||||
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.*
|
||||
import org.opcfoundation.opcua.binaryschema.EnumeratedType
|
||||
import org.opcfoundation.opcua.binaryschema.StructuredType
|
||||
import space.kscience.controls.misc.instant
|
||||
import space.kscience.controls.misc.toMeta
|
||||
import space.kscience.controls.instant
|
||||
import space.kscience.controls.toMeta
|
||||
import space.kscience.dataforge.meta.*
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.names.asName
|
||||
|
@ -5,8 +5,8 @@ import io.ktor.network.sockets.SocketOptions
|
||||
import io.ktor.network.sockets.aSocket
|
||||
import io.ktor.network.sockets.openReadChannel
|
||||
import io.ktor.network.sockets.openWriteChannel
|
||||
import io.ktor.utils.io.consumeEachBufferRange
|
||||
import io.ktor.utils.io.writeAvailable
|
||||
import io.ktor.utils.io.read
|
||||
import io.ktor.utils.io.writeByteArray
|
||||
import kotlinx.coroutines.*
|
||||
import space.kscience.controls.api.LifecycleState
|
||||
import space.kscience.dataforge.context.Context
|
||||
@ -15,7 +15,6 @@ import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.get
|
||||
import space.kscience.dataforge.meta.int
|
||||
import space.kscience.dataforge.meta.string
|
||||
import java.nio.ByteBuffer
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
|
||||
public class KtorTcpPort internal constructor(
|
||||
@ -42,21 +41,22 @@ public class KtorTcpPort internal constructor(
|
||||
override fun onOpen() {
|
||||
listenerJob = scope.launch {
|
||||
val input = futureSocket.await().openReadChannel()
|
||||
input.consumeEachBufferRange { buffer: ByteBuffer, last ->
|
||||
val array = ByteArray(buffer.remaining())
|
||||
buffer.get(array)
|
||||
receive(array)
|
||||
!last && isActive
|
||||
while (!input.isClosedForRead && isActive) {
|
||||
input.read { arraySource, begin, endExclusive ->
|
||||
val array = arraySource.copyOfRange(begin, endExclusive)
|
||||
receive(array)
|
||||
array.size
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun write(data: ByteArray) {
|
||||
writeChannel.await().writeAvailable(data)
|
||||
writeChannel.await().writeByteArray(data)
|
||||
}
|
||||
|
||||
override val lifecycleState: LifecycleState
|
||||
get() = if(listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
|
||||
get() = if (listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
|
||||
|
||||
override suspend fun stop() {
|
||||
listenerJob?.cancel()
|
||||
|
@ -1,11 +1,13 @@
|
||||
package space.kscience.controls.ports
|
||||
|
||||
import io.ktor.network.selector.ActorSelectorManager
|
||||
import io.ktor.network.sockets.*
|
||||
import io.ktor.utils.io.ByteWriteChannel
|
||||
import io.ktor.utils.io.consumeEachBufferRange
|
||||
import io.ktor.utils.io.writeAvailable
|
||||
import io.ktor.network.sockets.Datagram
|
||||
import io.ktor.network.sockets.InetSocketAddress
|
||||
import io.ktor.network.sockets.SocketOptions
|
||||
import io.ktor.network.sockets.aSocket
|
||||
import io.ktor.utils.io.core.ByteReadPacket
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.io.readByteArray
|
||||
import space.kscience.controls.api.LifecycleState
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.Factory
|
||||
@ -36,30 +38,28 @@ public class KtorUdpPort internal constructor(
|
||||
)
|
||||
}
|
||||
|
||||
private val writeChannel: Deferred<ByteWriteChannel> = scope.async(Dispatchers.IO, start = CoroutineStart.LAZY) {
|
||||
futureSocket.await().openWriteChannel(true)
|
||||
}
|
||||
// private val writeChannel= scope.async(Dispatchers.IO, start = CoroutineStart.LAZY) {
|
||||
// futureSocket.await().outgoing
|
||||
// }
|
||||
|
||||
private var listenerJob: Job? = null
|
||||
|
||||
override fun onOpen() {
|
||||
listenerJob = scope.launch {
|
||||
val input = futureSocket.await().openReadChannel()
|
||||
input.consumeEachBufferRange { buffer, last ->
|
||||
val array = ByteArray(buffer.remaining())
|
||||
buffer.get(array)
|
||||
receive(array)
|
||||
!last && isActive
|
||||
val input = futureSocket.await().incoming
|
||||
for (datagram in input) {
|
||||
receive(datagram.packet.readByteArray())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun write(data: ByteArray) {
|
||||
writeChannel.await().writeAvailable(data)
|
||||
val socket = futureSocket.await()
|
||||
socket.send(Datagram(ByteReadPacket(data), socket.remoteAddress))
|
||||
}
|
||||
|
||||
override val lifecycleState: LifecycleState
|
||||
get() = if(listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
|
||||
get() = if (listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
|
||||
|
||||
override suspend fun stop() {
|
||||
listenerJob?.cancel()
|
||||
|
@ -2,9 +2,12 @@ package space.kscience.controls.server
|
||||
|
||||
|
||||
import io.ktor.http.HttpStatusCode
|
||||
import io.ktor.server.application.*
|
||||
import io.ktor.server.application.Application
|
||||
import io.ktor.server.application.ApplicationStarted
|
||||
import io.ktor.server.application.install
|
||||
import io.ktor.server.application.pluginOrNull
|
||||
import io.ktor.server.cio.CIO
|
||||
import io.ktor.server.engine.ApplicationEngine
|
||||
import io.ktor.server.engine.EmbeddedServer
|
||||
import io.ktor.server.engine.embeddedServer
|
||||
import io.ktor.server.html.respondHtml
|
||||
import io.ktor.server.plugins.statuspages.StatusPages
|
||||
@ -21,7 +24,6 @@ import io.ktor.server.websocket.WebSockets
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.html.*
|
||||
import kotlinx.serialization.encodeToString
|
||||
import kotlinx.serialization.json.Json
|
||||
import kotlinx.serialization.json.buildJsonArray
|
||||
import kotlinx.serialization.json.encodeToJsonElement
|
||||
@ -63,10 +65,10 @@ public fun CoroutineScope.startDeviceServer(
|
||||
manager: DeviceManager,
|
||||
port: Int = MagixEndpoint.DEFAULT_MAGIX_HTTP_PORT,
|
||||
host: String = "localhost",
|
||||
): ApplicationEngine = embeddedServer(CIO, port, host, module = { deviceServerModule(manager) }).start()
|
||||
): EmbeddedServer<*, *> = embeddedServer(CIO, port, host, module = { deviceServerModule(manager) }).start()
|
||||
|
||||
public fun ApplicationEngine.whenStarted(callback: Application.() -> Unit) {
|
||||
environment.monitor.subscribe(ApplicationStarted, callback)
|
||||
public fun EmbeddedServer<*, *>.whenStarted(callback: Application.() -> Unit) {
|
||||
monitor.subscribe(ApplicationStarted, callback)
|
||||
}
|
||||
|
||||
|
||||
@ -171,6 +173,7 @@ public fun Application.deviceManagerModule(
|
||||
val target: String by call.parameters
|
||||
val property: String by call.parameters
|
||||
val request = PropertyGetMessage(
|
||||
time = kotlinx.datetime.Clock.System.now(),
|
||||
sourceDevice = WEB_SERVER_TARGET,
|
||||
targetDevice = Name.parse(target),
|
||||
property = property,
|
||||
@ -190,6 +193,7 @@ public fun Application.deviceManagerModule(
|
||||
val json = Json.parseToJsonElement(body)
|
||||
|
||||
val request = PropertySetMessage(
|
||||
time = kotlinx.datetime.Clock.System.now(),
|
||||
sourceDevice = WEB_SERVER_TARGET,
|
||||
targetDevice = Name.parse(target),
|
||||
property = property,
|
||||
|
@ -23,21 +23,21 @@ internal class PropertyHistoryTest {
|
||||
|
||||
private val propertyChangedMessages = listOf(
|
||||
PropertyChangedMessage(
|
||||
time = Instant.fromEpochMilliseconds(1000),
|
||||
"speed",
|
||||
Meta.EMPTY,
|
||||
time = Instant.fromEpochMilliseconds(1000),
|
||||
sourceDevice = Name.of("virtual-car")
|
||||
),
|
||||
PropertyChangedMessage(
|
||||
time = Instant.fromEpochMilliseconds(1500),
|
||||
"acceleration",
|
||||
Meta.EMPTY,
|
||||
time = Instant.fromEpochMilliseconds(1500),
|
||||
sourceDevice = Name.of("virtual-car")
|
||||
),
|
||||
PropertyChangedMessage(
|
||||
time = Instant.fromEpochMilliseconds(2000),
|
||||
"speed",
|
||||
Meta.EMPTY,
|
||||
time = Instant.fromEpochMilliseconds(2000),
|
||||
sourceDevice = Name.of("magix-virtual-car")
|
||||
)
|
||||
)
|
||||
|
@ -5,8 +5,8 @@ import kotlinx.coroutines.flow.filter
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.datetime.Instant
|
||||
import space.kscience.controls.api.PropertyChangedMessage
|
||||
import space.kscience.controls.misc.PropertyHistory
|
||||
import space.kscience.controls.misc.ValueWithTime
|
||||
import space.kscience.controls.time.PropertyHistory
|
||||
import space.kscience.controls.time.ValueWithTime
|
||||
import space.kscience.dataforge.meta.MetaConverter
|
||||
|
||||
public fun <T> DeviceMessageStorage.propertyHistory(
|
||||
|
@ -9,18 +9,22 @@ description = """
|
||||
|
||||
kscience {
|
||||
fullStack("js/controls-vision.js")
|
||||
useKtor()
|
||||
useSerialization()
|
||||
useContextReceivers()
|
||||
commonMain {
|
||||
api(projects.controlsCore)
|
||||
api(projects.controlsConstructor)
|
||||
api(libs.visionforge.plotly)
|
||||
api(libs.plotlykt.core)
|
||||
api(libs.visionforge.markdown)
|
||||
// api("space.kscience:tables-kt:0.2.1")
|
||||
// api("space.kscience:visionforge-tables:$visionforgeVersion")
|
||||
}
|
||||
|
||||
jsMain{
|
||||
//FIXME remove after VisionForge 0.5
|
||||
api("org.jetbrains.kotlin-wrappers:kotlin-extensions:1.0.1-pre.823")
|
||||
}
|
||||
|
||||
jvmMain{
|
||||
api(libs.visionforge.server)
|
||||
api(spclibs.ktor.server.cio)
|
||||
|
@ -2,7 +2,7 @@ package space.kscience.controls.vision
|
||||
|
||||
import kotlinx.serialization.SerialName
|
||||
import kotlinx.serialization.Serializable
|
||||
import space.kscience.controls.misc.doubleRange
|
||||
import space.kscience.controls.doubleRange
|
||||
import space.kscience.dataforge.meta.MetaConverter
|
||||
import space.kscience.dataforge.meta.convertable
|
||||
import space.kscience.dataforge.meta.double
|
||||
|
@ -11,20 +11,15 @@ import kotlinx.datetime.Instant
|
||||
import space.kscience.controls.api.Device
|
||||
import space.kscience.controls.api.propertyMessageFlow
|
||||
import space.kscience.controls.constructor.DeviceState
|
||||
import space.kscience.controls.manager.clock
|
||||
import space.kscience.controls.misc.ValueWithTime
|
||||
import space.kscience.controls.spec.DevicePropertySpec
|
||||
import space.kscience.controls.spec.name
|
||||
import space.kscience.controls.time.ValueWithTime
|
||||
import space.kscience.controls.time.clock
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.meta.*
|
||||
import space.kscience.dataforge.misc.DFExperimental
|
||||
import space.kscience.plotly.Plot
|
||||
import space.kscience.plotly.bar
|
||||
import space.kscience.plotly.models.Bar
|
||||
import space.kscience.plotly.models.Scatter
|
||||
import space.kscience.plotly.models.Trace
|
||||
import space.kscience.plotly.models.TraceValues
|
||||
import space.kscience.plotly.scatter
|
||||
import space.kscience.plotly.models.*
|
||||
import kotlin.time.Duration
|
||||
import kotlin.time.Duration.Companion.minutes
|
||||
import kotlin.time.Duration.Companion.seconds
|
@ -1,8 +1,8 @@
|
||||
package space.kscience.controls.vision
|
||||
|
||||
import space.kscience.plotly.PlotlyPlugin
|
||||
import space.kscience.visionforge.html.runVisionClient
|
||||
import space.kscience.visionforge.markup.MarkupPlugin
|
||||
import space.kscience.visionforge.plotly.PlotlyPlugin
|
||||
|
||||
public fun main(): Unit = runVisionClient {
|
||||
plugin(PlotlyPlugin)
|
||||
|
@ -1,33 +1,27 @@
|
||||
package space.kscience.controls.vision
|
||||
|
||||
import io.ktor.server.cio.CIO
|
||||
import io.ktor.server.engine.ApplicationEngine
|
||||
import io.ktor.server.engine.EmbeddedServer
|
||||
import io.ktor.server.engine.embeddedServer
|
||||
import io.ktor.server.http.content.staticResources
|
||||
import io.ktor.server.routing.Routing
|
||||
import io.ktor.server.routing.routing
|
||||
import kotlinx.html.TagConsumer
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.plotly.Plot
|
||||
import space.kscience.plotly.PlotlyConfig
|
||||
import space.kscience.plotly.PlotlyPlugin
|
||||
import space.kscience.visionforge.html.HtmlVisionFragment
|
||||
import space.kscience.visionforge.html.VisionPage
|
||||
import space.kscience.visionforge.html.VisionTagConsumer
|
||||
import space.kscience.visionforge.markup.MarkupPlugin
|
||||
import space.kscience.visionforge.plotly.PlotlyPlugin
|
||||
import space.kscience.visionforge.plotly.plotly
|
||||
import space.kscience.visionforge.server.VisionRoute
|
||||
import space.kscience.visionforge.server.close
|
||||
import space.kscience.visionforge.server.openInBrowser
|
||||
import space.kscience.visionforge.server.visionPage
|
||||
import space.kscience.visionforge.visionManager
|
||||
|
||||
public fun Context.showDashboard(
|
||||
public suspend fun Context.showDashboard(
|
||||
port: Int = 7777,
|
||||
routes: Routing.() -> Unit = {},
|
||||
configurationBuilder: VisionRoute.() -> Unit = {},
|
||||
visionFragment: HtmlVisionFragment,
|
||||
): ApplicationEngine {
|
||||
): EmbeddedServer<*, *> {
|
||||
//create a sub-context for visualization
|
||||
val visualisationContext = buildContext {
|
||||
plugin(PlotlyPlugin)
|
||||
@ -44,7 +38,7 @@ public fun Context.showDashboard(
|
||||
visionPage(
|
||||
visualisationContext.visionManager,
|
||||
VisionPage.scriptHeader("js/controls-vision.js"),
|
||||
configurationBuilder = configurationBuilder,
|
||||
routeConfiguration = configurationBuilder,
|
||||
visionFragment = visionFragment
|
||||
)
|
||||
}.also {
|
||||
@ -57,16 +51,16 @@ public fun Context.showDashboard(
|
||||
//
|
||||
}
|
||||
|
||||
it.close()
|
||||
it.stop()
|
||||
}
|
||||
}
|
||||
|
||||
context(VisionTagConsumer<*>)
|
||||
public fun TagConsumer<*>.plot(
|
||||
config: PlotlyConfig = PlotlyConfig(),
|
||||
block: Plot.() -> Unit,
|
||||
) {
|
||||
vision {
|
||||
plotly(config, block)
|
||||
}
|
||||
}
|
||||
//context(consumer: VisionTagConsumer<*>)
|
||||
//public fun TagConsumer<*>.plot(
|
||||
// config: PlotlyConfig = PlotlyConfig(),
|
||||
// block: Plot.() -> Unit,
|
||||
//) {
|
||||
// vision {
|
||||
// plotly(config, block)
|
||||
// }
|
||||
//}
|
||||
|
@ -13,7 +13,6 @@ description = """
|
||||
|
||||
kscience {
|
||||
jvm()
|
||||
useKtor()
|
||||
useSerialization()
|
||||
useContextReceivers()
|
||||
commonMain {
|
||||
|
@ -18,11 +18,12 @@ import space.kscience.controls.api.propertyMessageFlow
|
||||
import space.kscience.controls.constructor.DeviceState
|
||||
import space.kscience.controls.constructor.units.NumericalValue
|
||||
import space.kscience.controls.constructor.values
|
||||
import space.kscience.controls.manager.clock
|
||||
import space.kscience.controls.misc.ValueWithTime
|
||||
import space.kscience.controls.spec.DevicePropertySpec
|
||||
import space.kscience.controls.spec.name
|
||||
import space.kscience.controls.time.ValueWithTime
|
||||
import space.kscience.controls.time.clock
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.Global
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.double
|
||||
import kotlin.time.Duration
|
||||
@ -40,7 +41,7 @@ internal fun <T> Flow<ValueWithTime<T>>.collectAndTrim(
|
||||
maxAge: Duration = defaultMaxAge,
|
||||
maxPoints: Int = defaultMaxPoints,
|
||||
minPoints: Int = defaultMinPoints,
|
||||
clock: Clock = Clock.System,
|
||||
clock: Clock = Global.clock,
|
||||
): Flow<List<ValueWithTime<T>>> {
|
||||
require(maxPoints > 2)
|
||||
require(minPoints > 0)
|
||||
@ -221,7 +222,7 @@ public fun XYGraphScope<Instant, Double>.PlotAveragedDeviceProperty(
|
||||
var points by remember { mutableStateOf<List<ValueWithTime<Double>>>(emptyList()) }
|
||||
|
||||
LaunchedEffect(device, propertyName, startValue, maxAge, maxPoints, minPoints, averagingInterval) {
|
||||
val clock = device.clock
|
||||
val clock: Clock = device.clock
|
||||
var lastValue = startValue
|
||||
device.propertyMessageFlow(propertyName)
|
||||
.chunkedByPeriod(averagingInterval)
|
||||
|
@ -36,7 +36,7 @@ dependencies {
|
||||
kotlin{
|
||||
jvmToolchain(17)
|
||||
compilerOptions {
|
||||
freeCompilerArgs.addAll("-Xjvm-default=all", "-Xopt-in=kotlin.RequiresOptIn")
|
||||
freeCompilerArgs.addAll("-Xjvm-default=all", "-Xopt-in=kotlin.RequiresOptIn", "-Xcontext-parameters")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -9,11 +9,13 @@ import androidx.compose.ui.unit.dp
|
||||
import androidx.compose.ui.window.Window
|
||||
import androidx.compose.ui.window.application
|
||||
import androidx.compose.ui.window.rememberWindowState
|
||||
import io.ktor.server.engine.ApplicationEngine
|
||||
import io.ktor.server.application.port
|
||||
import io.ktor.server.engine.EmbeddedServer
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlinx.serialization.json.Json
|
||||
import org.eclipse.milo.opcua.sdk.server.OpcUaServer
|
||||
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText
|
||||
@ -46,8 +48,8 @@ private val json = Json { prettyPrint = true }
|
||||
class DemoController : ContextAware {
|
||||
|
||||
var device: DemoDevice? = null
|
||||
var magixServer: ApplicationEngine? = null
|
||||
var visualizer: ApplicationEngine? = null
|
||||
var magixServer: EmbeddedServer<*, *>? = null
|
||||
var visualizer: EmbeddedServer<*, *>? = null
|
||||
val opcUaServer: OpcUaServer = OpcUaServer {
|
||||
setApplicationName(LocalizedText.english("space.kscience.controls.opcua"))
|
||||
|
||||
@ -96,7 +98,7 @@ class DemoController : ContextAware {
|
||||
// send description request
|
||||
listenerEndpoint.send(
|
||||
format = DeviceManager.magixFormat,
|
||||
payload = GetDescriptionMessage(),
|
||||
payload = GetDescriptionMessage(Clock.System.now()),
|
||||
source = "listener",
|
||||
// target = "demoDevice"
|
||||
)
|
||||
@ -174,7 +176,7 @@ fun DemoControls(controller: DemoController) {
|
||||
onClick = {
|
||||
controller.visualizer?.run {
|
||||
val host = "localhost"//environment.connectors.first().host
|
||||
val port = environment.connectors.first().port
|
||||
val port = environment.config.port
|
||||
val uri = URI("http", null, host, port, "/", null, null)
|
||||
Desktop.getDesktop().browse(uri)
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
package space.kscience.controls.demo
|
||||
|
||||
import io.ktor.server.engine.ApplicationEngine
|
||||
import io.ktor.server.engine.EmbeddedServer
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.*
|
||||
import kotlinx.coroutines.launch
|
||||
@ -18,9 +18,8 @@ import space.kscience.plotly.Plotly
|
||||
import space.kscience.plotly.layout
|
||||
import space.kscience.plotly.models.Trace
|
||||
import space.kscience.plotly.plot
|
||||
import space.kscience.plotly.server.PlotlyUpdateMode
|
||||
import space.kscience.plotly.server.serve
|
||||
import space.kscience.plotly.trace
|
||||
import space.kscience.visionforge.plotly.serveSinglePage
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
|
||||
/**
|
||||
@ -53,7 +52,7 @@ suspend fun Trace.updateXYFrom(flow: Flow<Iterable<Pair<Double, Double>>>) {
|
||||
}
|
||||
|
||||
|
||||
fun CoroutineScope.startDemoDeviceServer(magixEndpoint: MagixEndpoint): ApplicationEngine {
|
||||
fun CoroutineScope.startDemoDeviceServer(magixEndpoint: MagixEndpoint): EmbeddedServer<*, *> {
|
||||
//share subscription to a parse message only once
|
||||
val subscription = magixEndpoint.subscribe(DeviceManager.magixFormat).shareIn(this, SharingStarted.Lazily)
|
||||
|
||||
@ -69,70 +68,68 @@ fun CoroutineScope.startDemoDeviceServer(magixEndpoint: MagixEndpoint): Applicat
|
||||
(payload as? PropertyChangedMessage)?.takeIf { it.property == DemoDevice.coordinates.name }
|
||||
}.map { it.value }
|
||||
|
||||
return Plotly.serve(port = 9091, scope = this) {
|
||||
updateMode = PlotlyUpdateMode.PUSH
|
||||
return Plotly.serveSinglePage(port = 9091, routeConfiguration = {
|
||||
updateInterval = 100
|
||||
page { container ->
|
||||
link {
|
||||
rel = "stylesheet"
|
||||
href = "https://stackpath.bootstrapcdn.com/bootstrap/4.5.0/css/bootstrap.min.css"
|
||||
attributes["integrity"] = "sha384-9aIt2nRpC12Uk9gS9baDl411NQApFmC26EwAOH8WgZl5MYYxFfc+NcPb1dKGj7Sk"
|
||||
attributes["crossorigin"] = "anonymous"
|
||||
}
|
||||
div("row") {
|
||||
div("col-6") {
|
||||
plot(renderer = container) {
|
||||
layout {
|
||||
title = "sin property"
|
||||
xaxis.title = "point index"
|
||||
yaxis.title = "sin"
|
||||
}
|
||||
trace {
|
||||
launch {
|
||||
val flow: Flow<Iterable<Double>> = sinFlow.mapNotNull { it.double }.windowed(100)
|
||||
updateFrom(Trace.Y_AXIS, flow)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
div("col-6") {
|
||||
plot(renderer = container) {
|
||||
layout {
|
||||
title = "cos property"
|
||||
xaxis.title = "point index"
|
||||
yaxis.title = "cos"
|
||||
}
|
||||
trace {
|
||||
launch {
|
||||
val flow: Flow<Iterable<Double>> = cosFlow.mapNotNull { it.double }.windowed(100)
|
||||
updateFrom(Trace.Y_AXIS, flow)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
div("row") {
|
||||
div("col-12") {
|
||||
plot(renderer = container) {
|
||||
layout {
|
||||
title = "cos vs sin"
|
||||
xaxis.title = "sin"
|
||||
yaxis.title = "cos"
|
||||
}
|
||||
trace {
|
||||
name = "non-synchronized"
|
||||
launch {
|
||||
val flow: Flow<Iterable<Pair<Double, Double>>> = sinCosFlow.mapNotNull {
|
||||
it["x"].double!! to it["y"].double!!
|
||||
}.windowed(30)
|
||||
updateXYFrom(flow)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}) {
|
||||
link {
|
||||
rel = "stylesheet"
|
||||
href = "https://stackpath.bootstrapcdn.com/bootstrap/4.5.0/css/bootstrap.min.css"
|
||||
attributes["integrity"] = "sha384-9aIt2nRpC12Uk9gS9baDl411NQApFmC26EwAOH8WgZl5MYYxFfc+NcPb1dKGj7Sk"
|
||||
attributes["crossorigin"] = "anonymous"
|
||||
}
|
||||
div("row") {
|
||||
div("col-6") {
|
||||
plot{
|
||||
layout {
|
||||
title = "sin property"
|
||||
xaxis.title = "point index"
|
||||
yaxis.title = "sin"
|
||||
}
|
||||
trace {
|
||||
launch {
|
||||
val flow: Flow<Iterable<Double>> = sinFlow.mapNotNull { it.double }.windowed(100)
|
||||
updateFrom(Trace.Y_AXIS, flow)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
div("col-6") {
|
||||
plot{
|
||||
layout {
|
||||
title = "cos property"
|
||||
xaxis.title = "point index"
|
||||
yaxis.title = "cos"
|
||||
}
|
||||
trace {
|
||||
launch {
|
||||
val flow: Flow<Iterable<Double>> = cosFlow.mapNotNull { it.double }.windowed(100)
|
||||
updateFrom(Trace.Y_AXIS, flow)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
div("row") {
|
||||
div("col-12") {
|
||||
plot{
|
||||
layout {
|
||||
title = "cos vs sin"
|
||||
xaxis.title = "sin"
|
||||
yaxis.title = "cos"
|
||||
}
|
||||
trace {
|
||||
name = "non-synchronized"
|
||||
launch {
|
||||
val flow: Flow<Iterable<Pair<Double, Double>>> = sinCosFlow.mapNotNull {
|
||||
it["x"].double!! to it["y"].double!!
|
||||
}.windowed(30)
|
||||
updateXYFrom(flow)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -37,8 +37,10 @@ data class Vector2D(var x: Double = 0.0, var y: Double = 0.0) : MetaRepr {
|
||||
}
|
||||
}
|
||||
|
||||
open class VirtualCar(context: Context, meta: Meta) : DeviceBySpec<VirtualCar>(IVirtualCar, context, meta),
|
||||
IVirtualCar {
|
||||
open class VirtualCar(
|
||||
context: Context,
|
||||
meta: Meta
|
||||
) : DeviceBySpec<VirtualCar>(IVirtualCar, context, meta), IVirtualCar {
|
||||
private val clock = context.clock
|
||||
|
||||
private val timeScale = 1e-3
|
||||
|
@ -9,7 +9,6 @@ plugins {
|
||||
|
||||
kscience {
|
||||
jvm()
|
||||
useKtor()
|
||||
useSerialization()
|
||||
useContextReceivers()
|
||||
commonMain {
|
||||
|
@ -42,7 +42,11 @@ import space.kscience.controls.constructor.onTimer
|
||||
import space.kscience.controls.constructor.units.Kilograms
|
||||
import space.kscience.controls.constructor.units.Meters
|
||||
import space.kscience.controls.constructor.units.NumericalValue
|
||||
import space.kscience.controls.manager.*
|
||||
import space.kscience.controls.manager.DeviceManager
|
||||
import space.kscience.controls.manager.hubMessageFlow
|
||||
import space.kscience.controls.manager.install
|
||||
import space.kscience.controls.time.ClockManager
|
||||
import space.kscience.controls.time.clock
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.request
|
||||
import java.awt.Dimension
|
||||
@ -182,8 +186,6 @@ fun main() = application {
|
||||
}.collect()
|
||||
}
|
||||
|
||||
val clock = remember { context.clock }
|
||||
|
||||
Window(title = "Pid regulator simulator", onCloseRequest = ::exitApplication) {
|
||||
window.minimumSize = Dimension(800, 400)
|
||||
MaterialTheme {
|
||||
@ -269,12 +271,12 @@ fun main() = application {
|
||||
second(400.dp) {
|
||||
ChartLayout {
|
||||
XYGraph<Instant, Double>(
|
||||
xAxisModel = remember { TimeAxisModel.recent(maxAge, clock) },
|
||||
xAxisModel = remember { TimeAxisModel.recent(maxAge, context.clock) },
|
||||
yAxisModel = rememberDoubleLinearAxisModel((range.start - 1.0)..(range.endInclusive + 1.0)),
|
||||
xAxisTitle = { Text("Time in seconds relative to current") },
|
||||
xAxisLabels = { it: Instant ->
|
||||
Text(
|
||||
(clock.now() - it).toDouble(
|
||||
(context.clock.now() - it).toDouble(
|
||||
DurationUnit.SECONDS
|
||||
).toString(2)
|
||||
)
|
||||
|
@ -31,8 +31,8 @@ import space.kscience.controls.constructor.devices.angle
|
||||
import space.kscience.controls.constructor.models.Leadscrew
|
||||
import space.kscience.controls.constructor.models.coerceIn
|
||||
import space.kscience.controls.constructor.units.*
|
||||
import space.kscience.controls.manager.ClockManager
|
||||
import space.kscience.controls.manager.DeviceManager
|
||||
import space.kscience.controls.time.ClockManager
|
||||
import space.kscience.dataforge.context.Context
|
||||
import java.awt.Dimension
|
||||
import kotlin.random.Random
|
||||
|
@ -4,7 +4,6 @@ package space.kscience.controls.demo.collective
|
||||
|
||||
import space.kscience.controls.api.Device
|
||||
import space.kscience.controls.constructor.*
|
||||
import space.kscience.controls.misc.stringList
|
||||
import space.kscience.controls.peer.PeerConnection
|
||||
import space.kscience.controls.spec.DeviceSpec
|
||||
import space.kscience.dataforge.context.Context
|
||||
|
@ -8,10 +8,7 @@ import space.kscience.controls.api.PropertySetMessage
|
||||
import space.kscience.controls.client.DeviceClient
|
||||
import space.kscience.controls.client.launchMagixService
|
||||
import space.kscience.controls.client.write
|
||||
import space.kscience.controls.constructor.DeviceState
|
||||
import space.kscience.controls.constructor.ModelConstructor
|
||||
import space.kscience.controls.constructor.MutableDeviceState
|
||||
import space.kscience.controls.constructor.onTimer
|
||||
import space.kscience.controls.constructor.*
|
||||
import space.kscience.controls.manager.DeviceManager
|
||||
import space.kscience.controls.manager.install
|
||||
import space.kscience.controls.manager.respondMessage
|
||||
@ -37,7 +34,6 @@ import kotlin.time.Duration.Companion.milliseconds
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
|
||||
|
||||
|
||||
private val deviceVelocity = 0.1.kilometers
|
||||
|
||||
private val center = Gmc.ofDegrees(55.925, 37.514)
|
||||
@ -156,6 +152,7 @@ internal class DeviceCollectiveModel(
|
||||
json.encodeToString(
|
||||
DeviceMessage.serializer(),
|
||||
PropertySetMessage(
|
||||
time = clock.now(),
|
||||
property = CollectiveDevice.velocity.name,
|
||||
value = gmcVelocityMetaConverter.convert(state.velocity.value),
|
||||
targetDevice = null
|
||||
|
@ -21,7 +21,7 @@ dependencies {
|
||||
}
|
||||
|
||||
kotlin{
|
||||
jvmToolchain(11)
|
||||
jvmToolchain(17)
|
||||
}
|
||||
|
||||
|
||||
|
@ -8,12 +8,14 @@ import kotlinx.coroutines.isActive
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.datetime.Clock
|
||||
import space.kscience.controls.api.PropertyChangedMessage
|
||||
import space.kscience.controls.client.launchMagixService
|
||||
import space.kscience.controls.client.magixFormat
|
||||
import space.kscience.controls.manager.DeviceManager
|
||||
import space.kscience.controls.manager.install
|
||||
import space.kscience.controls.spec.*
|
||||
import space.kscience.controls.time.ClockManager
|
||||
import space.kscience.controls.time.clock
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.Factory
|
||||
import space.kscience.dataforge.context.request
|
||||
@ -22,17 +24,17 @@ import space.kscience.dataforge.meta.get
|
||||
import space.kscience.dataforge.meta.int
|
||||
import space.kscience.magix.api.MagixEndpoint
|
||||
import space.kscience.magix.api.subscribe
|
||||
import space.kscience.magix.rsocket.rSocketStreamWithWebSockets
|
||||
import space.kscience.magix.rsocket.rSocketStreamWithTcp
|
||||
import space.kscience.magix.server.RSocketMagixFlowPlugin
|
||||
import space.kscience.magix.server.startMagixServer
|
||||
import space.kscience.plotly.Plotly
|
||||
import space.kscience.plotly.PlotlyConfig
|
||||
import space.kscience.plotly.layout
|
||||
import space.kscience.plotly.models.Bar
|
||||
import space.kscience.plotly.plot
|
||||
import space.kscience.plotly.server.PlotlyUpdateMode
|
||||
import space.kscience.plotly.server.serve
|
||||
import space.kscience.plotly.server.show
|
||||
import space.kscience.plotly.models.invoke
|
||||
import space.kscience.plotly.plotly
|
||||
import space.kscience.visionforge.plotly.serveSinglePage
|
||||
import space.kscience.visionforge.server.openInBrowser
|
||||
import space.kscince.magix.zmq.ZmqMagixFlowPlugin
|
||||
import space.kscince.magix.zmq.zmq
|
||||
import kotlin.random.Random
|
||||
@ -46,6 +48,10 @@ class MassDevice(context: Context, meta: Meta) : DeviceBySpec<MassDevice>(MassDe
|
||||
|
||||
private val randomValue get() = rng.nextDouble()
|
||||
|
||||
private var counter: Long = 1
|
||||
|
||||
private val incrementValue: Double get() = (counter++).toDouble()
|
||||
|
||||
companion object : DeviceSpec<MassDevice>(), Factory<MassDevice> {
|
||||
|
||||
override fun build(context: Context, meta: Meta): MassDevice = MassDevice(context, meta)
|
||||
@ -61,7 +67,13 @@ class MassDevice(context: Context, meta: Meta) : DeviceBySpec<MassDevice>(MassDe
|
||||
}
|
||||
|
||||
suspend fun main() {
|
||||
val context = Context("Mass")
|
||||
val context = Context("Mass"){
|
||||
plugin(ClockManager){
|
||||
"clock.mode" put "jvm"
|
||||
}
|
||||
}
|
||||
|
||||
val clock = context.clock
|
||||
|
||||
context.startMagixServer(
|
||||
RSocketMagixFlowPlugin(),
|
||||
@ -79,10 +91,10 @@ suspend fun main() {
|
||||
|
||||
val deviceManager = deviceContext.request(DeviceManager)
|
||||
|
||||
deviceManager.install("device$it", MassDevice)
|
||||
deviceManager.install("device$it", MassDevice, Meta { "delay" put 5 })
|
||||
|
||||
val endpointId = "device$it"
|
||||
val deviceEndpoint = MagixEndpoint.rSocketStreamWithWebSockets("localhost")
|
||||
val deviceEndpoint = MagixEndpoint.rSocketStreamWithTcp("localhost")
|
||||
deviceManager.launchMagixService(deviceEndpoint, endpointId)
|
||||
}
|
||||
|
||||
@ -95,34 +107,39 @@ suspend fun main() {
|
||||
val latest = HashMap<String, Duration>()
|
||||
val max = HashMap<String, Duration>()
|
||||
|
||||
// val counters = hashMapOf<String, Double>()
|
||||
|
||||
monitorEndpoint.subscribe(DeviceManager.magixFormat).onEach { (magixMessage, payload) ->
|
||||
mutex.withLock {
|
||||
val delay = Clock.System.now() - payload.time
|
||||
latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time
|
||||
max[magixMessage.sourceEndpoint] =
|
||||
maxOf(delay, max[magixMessage.sourceEndpoint] ?: ZERO)
|
||||
if(payload is PropertyChangedMessage) {
|
||||
val delay = clock.now() - payload.time
|
||||
mutex.withLock {
|
||||
// val deviceName = payload.sourceDevice.toString()
|
||||
// counters[deviceName] = counters[deviceName]?.inc() ?: 1.0
|
||||
// println("${deviceName}:${counters[deviceName]!! - payload.value.double!!}")
|
||||
latest[magixMessage.sourceEndpoint] = delay
|
||||
max[magixMessage.sourceEndpoint] = maxOf(delay, max[magixMessage.sourceEndpoint] ?: ZERO)
|
||||
}
|
||||
}
|
||||
}.launchIn(this)
|
||||
|
||||
while (isActive) {
|
||||
delay(200)
|
||||
delay(1000)
|
||||
mutex.withLock {
|
||||
val sorted = max.mapKeys { it.key.substring(6).toInt() }.toSortedMap()
|
||||
latest.clear()
|
||||
max.clear()
|
||||
x.numbers = sorted.keys
|
||||
y.numbers = sorted.values.map { it.inWholeMicroseconds / 1000.0 + 0.0001 }
|
||||
y.numbers = sorted.values.map { it.inWholeMicroseconds.toDouble() / 1000.0 }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val application = Plotly.serve(port = 9091) {
|
||||
updateMode = PlotlyUpdateMode.PUSH
|
||||
val application = Plotly.serveSinglePage(port = 9091, routeConfiguration = {
|
||||
updateInterval = 1000
|
||||
|
||||
page { container ->
|
||||
plot(renderer = container, config = PlotlyConfig { saveAsSvg() }) {
|
||||
}) {
|
||||
vision {
|
||||
plotly(config = PlotlyConfig { saveAsSvg() }) {
|
||||
layout {
|
||||
// title = "Latest event"
|
||||
|
||||
@ -134,7 +151,8 @@ suspend fun main() {
|
||||
}
|
||||
}
|
||||
|
||||
application.show()
|
||||
|
||||
application.openInBrowser()
|
||||
|
||||
while (readlnOrNull().isNullOrBlank()) {
|
||||
|
||||
|
@ -12,8 +12,8 @@ import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.coroutines.withTimeout
|
||||
import space.kscience.controls.api.DeviceHub
|
||||
import space.kscience.controls.api.PropertyDescriptor
|
||||
import space.kscience.controls.misc.asMeta
|
||||
import space.kscience.controls.misc.duration
|
||||
import space.kscience.controls.asMeta
|
||||
import space.kscience.controls.duration
|
||||
import space.kscience.controls.ports.AsynchronousPort
|
||||
import space.kscience.controls.ports.KtorTcpPort
|
||||
import space.kscience.controls.ports.send
|
||||
|
@ -4,9 +4,9 @@ import io.ktor.network.selector.ActorSelectorManager
|
||||
import io.ktor.network.sockets.aSocket
|
||||
import io.ktor.network.sockets.openReadChannel
|
||||
import io.ktor.network.sockets.openWriteChannel
|
||||
import io.ktor.util.InternalAPI
|
||||
import io.ktor.util.moveToByteArray
|
||||
import io.ktor.utils.io.writeAvailable
|
||||
import io.ktor.utils.io.read
|
||||
import io.ktor.utils.io.writeByteArray
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
@ -17,7 +17,6 @@ val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
|
||||
throwable.printStackTrace()
|
||||
}
|
||||
|
||||
@OptIn(InternalAPI::class)
|
||||
fun Context.launchPiDebugServer(port: Int, axes: List<String>): Job = launch(exceptionHandler) {
|
||||
val virtualDevice = PiMotionMasterVirtualDevice(this@launchPiDebugServer, axes)
|
||||
aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().bind("localhost", port).use { server ->
|
||||
@ -32,7 +31,7 @@ fun Context.launchPiDebugServer(port: Int, axes: List<String>): Job = launch(exc
|
||||
|
||||
val sendJob = virtualDevice.subscribe().onEach {
|
||||
//println("Sending: ${it.decodeToString()}")
|
||||
output.writeAvailable(it)
|
||||
output.writeByteArray(it)
|
||||
output.flush()
|
||||
}.launchIn(this)
|
||||
|
||||
|
@ -7,4 +7,6 @@ org.gradle.parallel=true
|
||||
org.gradle.configureondemand=true
|
||||
org.gradle.jvmargs=-Xmx4096m
|
||||
|
||||
toolsVersion=0.15.4-kotlin-2.0.0
|
||||
org.jetbrains.dokka.experimental.gradle.pluginMode=V2Enabled
|
||||
|
||||
toolsVersion=0.17.1-kotlin-2.1.20
|
@ -1,7 +1,7 @@
|
||||
[versions]
|
||||
|
||||
dataforge = "0.9.0"
|
||||
rsocket = "0.15.4"
|
||||
dataforge = "0.10.1"
|
||||
rsocket = "0.20.0"
|
||||
xodus = "2.0.1"
|
||||
|
||||
uuid = "0.8.0"
|
||||
@ -10,8 +10,6 @@ fazecast = "2.10.3"
|
||||
|
||||
tornadofx = "1.7.20"
|
||||
|
||||
plotlykt = "0.7.2"
|
||||
|
||||
logback = "1.2.11"
|
||||
|
||||
hivemq = "1.3.1"
|
||||
@ -29,7 +27,7 @@ pi4j-ktx = "2.4.0"
|
||||
|
||||
plc4j = "0.12.0"
|
||||
|
||||
visionforge = "0.4.2"
|
||||
visionforge = "0.5.0"
|
||||
|
||||
[libraries]
|
||||
|
||||
@ -42,15 +40,17 @@ xodus-entity-store = { module = "org.jetbrains.xodus:xodus-entity-store", versio
|
||||
xodus-environment = { module = "org.jetbrains.xodus:xodus-environment", version.ref = "xodus" }
|
||||
xodus-vfs = { module = "org.jetbrains.xodus:xodus-vfs", version.ref = "xodus" }
|
||||
|
||||
rsocket-ktor-client = { module = "io.rsocket.kotlin:rsocket-ktor-client", version.ref = "rsocket" }
|
||||
rsocket-ktor-server = { module = "io.rsocket.kotlin:rsocket-ktor-server", version.ref = "rsocket" }
|
||||
rsocket-ktor-client = { module = "io.rsocket.kotlin:ktor-client-rsocket", version.ref = "rsocket" }
|
||||
rsocket-ktor-server = { module = "io.rsocket.kotlin:ktor-server-rsocket", version.ref = "rsocket" }
|
||||
rsocket-transport-ktor-tcp = { module = "io.rsocket.kotlin:rsocket-transport-ktor-tcp", version.ref = "rsocket" }
|
||||
|
||||
jSerialComm = { module = "com.fazecast:jSerialComm", version.ref = "fazecast" }
|
||||
|
||||
tornadofx = { module = "no.tornado:tornadofx", version.ref = "tornadofx" }
|
||||
|
||||
plotlykt-server = { module = "space.kscience:plotlykt-server", version.ref = "plotlykt" }
|
||||
plotlykt-core = { module = "space.kscience:plotly-kt-core", version.ref = "visionforge" }
|
||||
|
||||
plotlykt-server = { module = "space.kscience:plotly-kt-server", version.ref = "visionforge" }
|
||||
|
||||
logback-classic = { module = "ch.qos.logback:logback-classic", version.ref = "logback" }
|
||||
|
||||
@ -74,7 +74,6 @@ pi4j-plugin-pigpio = { module = "com.pi4j:pi4j-plugin-pigpio", version.ref = "pi
|
||||
plc4j-spi = { module = "org.apache.plc4x:plc4j-spi", version.ref = "plc4j" }
|
||||
|
||||
visionforge-jupiter = { module = "space.kscience:visionforge-jupyter", version.ref = "visionforge" }
|
||||
visionforge-plotly = { module = "space.kscience:visionforge-plotly", version.ref = "visionforge" }
|
||||
visionforge-markdown = { module = "space.kscience:visionforge-markdown", version.ref = "visionforge" }
|
||||
visionforge-server = { module = "space.kscience:visionforge-server", version.ref = "visionforge" }
|
||||
visionforge-compose-html = { module = "space.kscience:visionforge-compose-html", version.ref = "visionforge" }
|
||||
|
@ -15,15 +15,15 @@ kscience {
|
||||
native()
|
||||
wasm()
|
||||
useCoroutines()
|
||||
useSerialization{
|
||||
useSerialization {
|
||||
json()
|
||||
}
|
||||
|
||||
commonMain{
|
||||
commonMain {
|
||||
implementation(spclibs.atomicfu)
|
||||
}
|
||||
}
|
||||
|
||||
readme{
|
||||
readme {
|
||||
maturity = Maturity.EXPERIMENTAL
|
||||
}
|
||||
|
@ -11,7 +11,7 @@ description = """
|
||||
""".trimIndent()
|
||||
|
||||
dependencies {
|
||||
implementation(project(":magix:magix-rsocket"))
|
||||
implementation(projects.magix.magixRsocket)
|
||||
implementation(spclibs.kotlinx.coroutines.jdk9)
|
||||
}
|
||||
|
||||
|
@ -20,11 +20,12 @@ kscience {
|
||||
}
|
||||
dependencies {
|
||||
api(projects.magix.magixApi)
|
||||
implementation(spclibs.ktor.client.core)
|
||||
implementation(libs.rsocket.ktor.client)
|
||||
api(spclibs.kotlinx.io.core)
|
||||
api(spclibs.ktor.client.core)
|
||||
api(libs.rsocket.ktor.client)
|
||||
}
|
||||
dependencies(jvmMain) {
|
||||
implementation(libs.rsocket.transport.ktor.tcp)
|
||||
api(libs.rsocket.transport.ktor.tcp)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -10,6 +10,7 @@ import io.rsocket.kotlin.ktor.client.RSocketSupport
|
||||
import io.rsocket.kotlin.ktor.client.rSocket
|
||||
import io.rsocket.kotlin.payload.buildPayload
|
||||
import io.rsocket.kotlin.payload.data
|
||||
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.Job
|
||||
@ -17,6 +18,7 @@ import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.flowOn
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.io.readString
|
||||
import space.kscience.magix.api.MagixEndpoint
|
||||
import space.kscience.magix.api.MagixMessage
|
||||
import space.kscience.magix.api.MagixMessageFilter
|
||||
@ -32,7 +34,10 @@ public class RSocketMagixEndpoint(private val rSocket: RSocket) : MagixEndpoint,
|
||||
}
|
||||
val flow = rSocket.requestStream(payload)
|
||||
return flow.map {
|
||||
MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText())
|
||||
MagixEndpoint.magixJson.decodeFromString(
|
||||
MagixMessage.serializer(),
|
||||
it.data.readString()
|
||||
)
|
||||
}.filter(filter).flowOn(rSocket.coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined)
|
||||
}
|
||||
|
||||
@ -65,12 +70,12 @@ public suspend fun MagixEndpoint.Companion.rSocketWithWebSockets(
|
||||
host: String,
|
||||
port: Int = DEFAULT_MAGIX_HTTP_PORT,
|
||||
path: String = "/rsocket",
|
||||
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
|
||||
rSocketConfig: RSocketConnectorBuilder.() -> Unit = {},
|
||||
): RSocketMagixEndpoint {
|
||||
val client = HttpClient {
|
||||
install(WebSockets)
|
||||
install(RSocketSupport) {
|
||||
connector = buildConnector(rSocketConfig)
|
||||
connector(rSocketConfig)
|
||||
}
|
||||
}
|
||||
|
||||
|
7
magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketStreamMagixEndpoint.kt
7
magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketStreamMagixEndpoint.kt
@ -16,6 +16,7 @@ import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.consumeAsFlow
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.io.readString
|
||||
import space.kscience.magix.api.MagixEndpoint
|
||||
import space.kscience.magix.api.MagixMessage
|
||||
import space.kscience.magix.api.MagixMessageFilter
|
||||
@ -51,7 +52,7 @@ public class RSocketStreamMagixEndpoint(
|
||||
override fun subscribe(
|
||||
filter: MagixMessageFilter,
|
||||
): Flow<MagixMessage> = input.map {
|
||||
MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText())
|
||||
MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readString())
|
||||
}.filter(filter)
|
||||
|
||||
override suspend fun broadcast(message: MagixMessage): Unit {
|
||||
@ -72,12 +73,12 @@ public suspend fun MagixEndpoint.Companion.rSocketStreamWithWebSockets(
|
||||
port: Int = DEFAULT_MAGIX_HTTP_PORT,
|
||||
path: String = "/rsocket",
|
||||
filter: MagixMessageFilter = MagixMessageFilter.ALL,
|
||||
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
|
||||
rSocketConfig: RSocketConnectorBuilder.() -> Unit = {},
|
||||
): RSocketStreamMagixEndpoint {
|
||||
val client = HttpClient {
|
||||
install(WebSockets)
|
||||
install(RSocketSupport) {
|
||||
connector = buildConnector(rSocketConfig)
|
||||
connector(rSocketConfig)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,8 +1,8 @@
|
||||
package space.kscience.magix.rsocket
|
||||
|
||||
import io.ktor.network.sockets.SocketOptions
|
||||
import io.rsocket.kotlin.core.RSocketConnectorBuilder
|
||||
import io.rsocket.kotlin.transport.ktor.tcp.TcpClientTransport
|
||||
import io.rsocket.kotlin.transport.ktor.tcp.KtorTcpClientTransport
|
||||
import io.rsocket.kotlin.transport.ktor.tcp.KtorTcpClientTransportBuilder
|
||||
import space.kscience.magix.api.MagixEndpoint
|
||||
import space.kscience.magix.api.MagixMessageFilter
|
||||
import kotlin.coroutines.coroutineContext
|
||||
@ -14,15 +14,14 @@ import kotlin.coroutines.coroutineContext
|
||||
public suspend fun MagixEndpoint.Companion.rSocketWithTcp(
|
||||
host: String,
|
||||
port: Int = DEFAULT_MAGIX_RAW_PORT,
|
||||
tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
|
||||
tcpConfig: KtorTcpClientTransportBuilder.() -> Unit = {},
|
||||
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
|
||||
): RSocketMagixEndpoint {
|
||||
val transport = TcpClientTransport(
|
||||
hostname = host,
|
||||
port = port,
|
||||
val transport = KtorTcpClientTransport(
|
||||
context = coroutineContext,
|
||||
configure = tcpConfig
|
||||
)
|
||||
).target(host,port)
|
||||
|
||||
val rSocket = buildConnector(rSocketConfig).connect(transport)
|
||||
|
||||
return RSocketMagixEndpoint(rSocket)
|
||||
@ -33,15 +32,14 @@ public suspend fun MagixEndpoint.Companion.rSocketStreamWithTcp(
|
||||
host: String,
|
||||
port: Int = DEFAULT_MAGIX_RAW_PORT,
|
||||
filter: MagixMessageFilter = MagixMessageFilter.ALL,
|
||||
tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
|
||||
tcpConfig: KtorTcpClientTransportBuilder.() -> Unit = {},
|
||||
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
|
||||
): RSocketStreamMagixEndpoint {
|
||||
val transport = TcpClientTransport(
|
||||
hostname = host,
|
||||
port = port,
|
||||
val transport = KtorTcpClientTransport(
|
||||
context = coroutineContext,
|
||||
configure = tcpConfig
|
||||
)
|
||||
).target(host,port)
|
||||
|
||||
val rSocket = buildConnector(rSocketConfig).connect(transport)
|
||||
|
||||
return RSocketStreamMagixEndpoint(rSocket, filter)
|
||||
|
@ -1,11 +1,12 @@
|
||||
package rsocket
|
||||
|
||||
import io.ktor.network.sockets.SocketOptions
|
||||
import io.rsocket.kotlin.core.RSocketConnectorBuilder
|
||||
import io.rsocket.kotlin.transport.ktor.tcp.TcpClientTransport
|
||||
import io.rsocket.kotlin.transport.ktor.tcp.KtorTcpClientTransport
|
||||
import io.rsocket.kotlin.transport.ktor.tcp.KtorTcpClientTransportBuilder
|
||||
import space.kscience.magix.api.MagixEndpoint
|
||||
import space.kscience.magix.rsocket.RSocketMagixEndpoint
|
||||
import space.kscience.magix.rsocket.buildConnector
|
||||
import kotlin.coroutines.coroutineContext
|
||||
|
||||
|
||||
/**
|
||||
@ -14,14 +15,14 @@ import space.kscience.magix.rsocket.buildConnector
|
||||
public suspend fun MagixEndpoint.Companion.rSocketWithTcp(
|
||||
host: String,
|
||||
port: Int = DEFAULT_MAGIX_RAW_PORT,
|
||||
tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
|
||||
tcpConfig: KtorTcpClientTransportBuilder.() -> Unit = {},
|
||||
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
|
||||
): RSocketMagixEndpoint {
|
||||
val transport = TcpClientTransport(
|
||||
hostname = host,
|
||||
port = port,
|
||||
val transport = KtorTcpClientTransport(
|
||||
context = coroutineContext,
|
||||
configure = tcpConfig
|
||||
)
|
||||
).target(host,port)
|
||||
|
||||
val rSocket = buildConnector(rSocketConfig).connect(transport)
|
||||
|
||||
return RSocketMagixEndpoint(rSocket)
|
||||
|
@ -16,6 +16,8 @@ kscience {
|
||||
jvm()
|
||||
useSerialization{
|
||||
json()
|
||||
cbor()
|
||||
protobuf()
|
||||
}
|
||||
|
||||
jvmMain{
|
||||
@ -28,6 +30,7 @@ kscience {
|
||||
|
||||
api(libs.rsocket.ktor.server)
|
||||
api(libs.rsocket.transport.ktor.tcp)
|
||||
api(spclibs.kotlinx.io.core)
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,32 +1,86 @@
|
||||
@file:OptIn(ExperimentalSerializationApi::class)
|
||||
|
||||
package space.kscience.magix.server
|
||||
|
||||
import io.ktor.network.sockets.SocketOptions
|
||||
import io.rsocket.kotlin.ConnectionAcceptor
|
||||
import io.rsocket.kotlin.RSocketRequestHandler
|
||||
import io.rsocket.kotlin.core.RSocketServer
|
||||
import io.rsocket.kotlin.core.RSocketServerBuilder
|
||||
import io.rsocket.kotlin.payload.Payload
|
||||
import io.rsocket.kotlin.payload.buildPayload
|
||||
import io.rsocket.kotlin.payload.data
|
||||
import io.rsocket.kotlin.transport.ktor.tcp.TcpServer
|
||||
import io.rsocket.kotlin.transport.ktor.tcp.TcpServerTransport
|
||||
import io.rsocket.kotlin.transport.ktor.tcp.KtorTcpServerTransport
|
||||
import io.rsocket.kotlin.transport.ktor.tcp.KtorTcpServerTransportBuilder
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.serialization.encodeToString
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.io.Buffer
|
||||
import kotlinx.io.readByteArray
|
||||
import kotlinx.io.readString
|
||||
import kotlinx.serialization.ExperimentalSerializationApi
|
||||
import kotlinx.serialization.cbor.Cbor
|
||||
import kotlinx.serialization.json.io.encodeToSink
|
||||
import kotlinx.serialization.protobuf.ProtoBuf
|
||||
import space.kscience.magix.api.*
|
||||
import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT
|
||||
|
||||
private enum class RSocketMessageEncoding {
|
||||
JSON,
|
||||
CBOR,
|
||||
PROTO
|
||||
}
|
||||
|
||||
|
||||
private fun Buffer?.inferFormat(): RSocketMessageEncoding = when (val str = this?.readString()) {
|
||||
"proto" -> RSocketMessageEncoding.PROTO
|
||||
"cbor" -> RSocketMessageEncoding.CBOR
|
||||
else -> RSocketMessageEncoding.JSON
|
||||
}
|
||||
|
||||
private fun decodeMessage(buffer: Buffer, format: RSocketMessageEncoding): MagixMessage = when (format) {
|
||||
RSocketMessageEncoding.JSON -> MagixEndpoint.magixJson.decodeFromString(
|
||||
MagixMessage.serializer(),
|
||||
buffer.readString()
|
||||
)
|
||||
|
||||
RSocketMessageEncoding.CBOR -> Cbor.decodeFromByteArray(
|
||||
MagixMessage.serializer(),
|
||||
buffer.readByteArray()
|
||||
)
|
||||
|
||||
RSocketMessageEncoding.PROTO -> ProtoBuf.decodeFromByteArray(
|
||||
MagixMessage.serializer(),
|
||||
buffer.readByteArray()
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
private fun encodeMessage(message: MagixMessage, format: RSocketMessageEncoding): Buffer {
|
||||
return when (format) {
|
||||
RSocketMessageEncoding.JSON -> Buffer().also { buffer ->
|
||||
MagixEndpoint.magixJson.encodeToSink(MagixMessage.serializer(), message, buffer)
|
||||
}
|
||||
|
||||
RSocketMessageEncoding.CBOR -> Buffer().also { buffer ->
|
||||
buffer.write(Cbor.encodeToByteArray(MagixMessage.serializer(), message))
|
||||
}
|
||||
|
||||
RSocketMessageEncoding.PROTO -> Buffer().also { buffer ->
|
||||
buffer.write(ProtoBuf.encodeToByteArray(MagixMessage.serializer(), message))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Raw TCP magix server plugin
|
||||
*/
|
||||
public class RSocketMagixFlowPlugin(
|
||||
private val serverHost: String = "0.0.0.0",
|
||||
private val serverPort: Int = DEFAULT_MAGIX_RAW_PORT,
|
||||
private val transportConfiguration: SocketOptions.AcceptorOptions.() -> Unit = {},
|
||||
private val transportConfiguration: KtorTcpServerTransportBuilder.() -> Unit = {},
|
||||
private val rsocketConfiguration: RSocketServerBuilder.() -> Unit = {},
|
||||
) : MagixFlowPlugin {
|
||||
|
||||
@ -35,19 +89,15 @@ public class RSocketMagixFlowPlugin(
|
||||
receive: Flow<MagixMessage>,
|
||||
sendMessage: suspend (MagixMessage) -> Unit,
|
||||
): Job {
|
||||
val tcpTransport = TcpServerTransport(
|
||||
hostname = serverHost,
|
||||
port = serverPort,
|
||||
val tcpTransport = KtorTcpServerTransport(
|
||||
scope.coroutineContext,
|
||||
configure = transportConfiguration
|
||||
)
|
||||
val rSocketJob: TcpServer = RSocketServer(rsocketConfiguration)
|
||||
.bindIn(scope, tcpTransport, acceptor(scope, receive, sendMessage))
|
||||
).target(serverHost, serverPort)
|
||||
|
||||
scope.coroutineContext[Job]?.invokeOnCompletion {
|
||||
rSocketJob.handlerJob.cancel()
|
||||
return scope.launch {
|
||||
RSocketServer(rsocketConfiguration)
|
||||
.startServer(tcpTransport, acceptor(scope, receive, sendMessage))
|
||||
}
|
||||
|
||||
return rSocketJob.handlerJob
|
||||
}
|
||||
|
||||
public companion object {
|
||||
@ -59,40 +109,38 @@ public class RSocketMagixFlowPlugin(
|
||||
RSocketRequestHandler(coroutineScope.coroutineContext) {
|
||||
//handler for request/stream
|
||||
requestStream { request: Payload ->
|
||||
val requestText = request.data.readText()
|
||||
val filter = if(requestText.isBlank()) {
|
||||
val format = request.metadata.inferFormat()
|
||||
|
||||
val requestText = request.data.readString()
|
||||
val filter = if (requestText.isBlank()) {
|
||||
MagixMessageFilter.ALL
|
||||
} else MagixEndpoint.magixJson.decodeFromString(
|
||||
} else MagixEndpoint.magixJson.decodeFromString(
|
||||
MagixMessageFilter.serializer(),
|
||||
requestText
|
||||
)
|
||||
|
||||
receive.filter(filter).map { message ->
|
||||
val string = MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message)
|
||||
buildPayload { data(string) }
|
||||
buildPayload {
|
||||
data(encodeMessage(message,format))
|
||||
}
|
||||
}
|
||||
}
|
||||
//single send
|
||||
fireAndForget { request: Payload ->
|
||||
val message = MagixEndpoint.magixJson.decodeFromString(
|
||||
MagixMessage.serializer(),
|
||||
request.data.readText()
|
||||
)
|
||||
val format = request.metadata.inferFormat()
|
||||
val message = decodeMessage(request.data, format)
|
||||
|
||||
sendMessage(message)
|
||||
}
|
||||
// bidirectional connection, used for streaming connection
|
||||
requestChannel { request: Payload, input: Flow<Payload> ->
|
||||
input.onEach { inputPayload ->
|
||||
sendMessage(
|
||||
MagixEndpoint.magixJson.decodeFromString(
|
||||
MagixMessage.serializer(),
|
||||
inputPayload.use { it.data.readText() }
|
||||
)
|
||||
)
|
||||
val format = request.metadata.inferFormat()
|
||||
|
||||
input.onEach { inputPayload: Payload ->
|
||||
sendMessage(decodeMessage(inputPayload.data, format))
|
||||
}.launchIn(this)
|
||||
|
||||
val filterText = request.data.readText()
|
||||
val filterText = request.data.readString()
|
||||
|
||||
val filter = if (filterText.isBlank()) {
|
||||
MagixMessageFilter.ALL
|
||||
@ -101,8 +149,9 @@ public class RSocketMagixFlowPlugin(
|
||||
}
|
||||
|
||||
receive.filter(filter).map { message ->
|
||||
val string = MagixEndpoint.magixJson.encodeToString(message)
|
||||
buildPayload { data(string) }
|
||||
buildPayload {
|
||||
data(encodeMessage(message,format))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
package space.kscience.magix.server
|
||||
|
||||
import io.ktor.server.cio.CIO
|
||||
import io.ktor.server.engine.ApplicationEngine
|
||||
import io.ktor.server.engine.EmbeddedServer
|
||||
import io.ktor.server.engine.embeddedServer
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.channels.BufferOverflow
|
||||
@ -19,7 +19,7 @@ public fun CoroutineScope.startMagixServer(
|
||||
vararg plugins: MagixFlowPlugin,
|
||||
port: Int = DEFAULT_MAGIX_HTTP_PORT,
|
||||
buffer: Int = 1000,
|
||||
): ApplicationEngine {
|
||||
): EmbeddedServer<*,*> {
|
||||
|
||||
val magixFlow = MutableSharedFlow<MagixMessage>(
|
||||
replay = buffer,
|
||||
@ -30,7 +30,5 @@ public fun CoroutineScope.startMagixServer(
|
||||
it.start(this, magixFlow)
|
||||
}
|
||||
|
||||
return embeddedServer(CIO, host = "localhost", port = port, module = { magixModule(magixFlow) }).apply {
|
||||
start()
|
||||
}
|
||||
return embeddedServer(CIO, host = "localhost", port = port, module = { magixModule(magixFlow) }).start()
|
||||
}
|
@ -6,49 +6,86 @@ import kotlinx.coroutines.flow.*
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
import kotlin.coroutines.EmptyCoroutineContext
|
||||
import kotlin.coroutines.cancellation.CancellationException
|
||||
import kotlin.time.Duration
|
||||
|
||||
/**
|
||||
* Suspend the collection of this [Flow] until event time is lower that threshold
|
||||
*/
|
||||
public fun <E : TimelineEvent> Flow<E>.withTimeThreshold(
|
||||
public fun <E : WithTime> Flow<E>.withTimeThreshold(
|
||||
threshold: Flow<Instant>
|
||||
): Flow<E> = transform { event ->
|
||||
threshold.first { it > event.time }
|
||||
emit(event)
|
||||
}
|
||||
|
||||
private class OriginChangedException : CancellationException("Origin is changed")
|
||||
|
||||
/**
|
||||
* @param lookaheadInterval an interval for generated events ahead of the last observed event.
|
||||
*/
|
||||
public class GeneratingTimeline<E : TimelineEvent>(
|
||||
public class GeneratingTimeline<E : Any>(
|
||||
origin: E,
|
||||
private val lookaheadInterval: Duration,
|
||||
timeOf: E.() -> Instant,
|
||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||
private val generator: suspend FlowCollector<E>.(E) -> Unit
|
||||
) : ProducerTimeline<E>(origin.time, coroutineContext) {
|
||||
private val generator: suspend TimelineCollector<E>.(E) -> Unit
|
||||
) : ProducerTimeline<E>(timeOf(origin), timeOf, coroutineContext) {
|
||||
|
||||
private val startEventFlow = MutableStateFlow(origin)
|
||||
|
||||
private data class EventWithOrigin<E : TimelineEvent>(val origin: E, val event: E) : TimelineEvent {
|
||||
override val time: Instant get() = event.time
|
||||
private inner class EventWithOrigin(val origin: E, val event: E) : WithTime {
|
||||
override val time: Instant get() = timeOf(event)
|
||||
}
|
||||
|
||||
private val events: SharedFlow<E> = flow {
|
||||
private val events: SharedFlow<E> = flow<EventWithOrigin> {
|
||||
coroutineScope {
|
||||
startEventFlow.collect { startEvent ->
|
||||
emitAll(
|
||||
flow { generator(startEvent) }.takeWhile { startEvent == startEventFlow.value }.map {
|
||||
EventWithOrigin(startEvent, it)
|
||||
val timelineCollector = object : TimelineCollector<E> {
|
||||
override val time: StateFlow<Instant> get() = this@GeneratingTimeline.time
|
||||
override var lastEvent: E? = startEvent
|
||||
|
||||
override suspend fun emit(value: E) {
|
||||
if (startEvent == startEvent) {
|
||||
lastEvent = value
|
||||
emit(EventWithOrigin(startEvent, value))
|
||||
} else {
|
||||
throw OriginChangedException()
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
try {
|
||||
timelineCollector.generator(startEvent)
|
||||
} catch (_: OriginChangedException) {
|
||||
return@collect
|
||||
}
|
||||
|
||||
// emitAll(
|
||||
// flow innerFlow@{
|
||||
// object : TimelineCollector<E> {
|
||||
// override val time: StateFlow<Instant> get() = this@GeneratingTimeline.time
|
||||
// override val lastEvent: E?
|
||||
// get() = TODO("Not yet implemented")
|
||||
//
|
||||
// override suspend fun emit(value: E) {
|
||||
// this@innerFlow.emit(value)
|
||||
// }
|
||||
//
|
||||
// }.generator(startEvent)
|
||||
// }.takeWhile {
|
||||
// startEvent == startEventFlow.value
|
||||
// }.map {
|
||||
// EventWithOrigin(startEvent, it)
|
||||
// }
|
||||
// )
|
||||
}
|
||||
}
|
||||
}.withTimeThreshold(
|
||||
threshold = time.map { it + lookaheadInterval }
|
||||
).buffer(Channel.UNLIMITED).mapNotNull {
|
||||
).buffer(Channel.UNLIMITED).mapNotNull { event: GeneratingTimeline<E>.EventWithOrigin ->
|
||||
//a barrier to avoid leaking stale events after interruption from buffer
|
||||
it.takeIf { it.origin == startEventFlow.value }?.event
|
||||
event.takeIf { it.origin == startEventFlow.value }?.event
|
||||
}.shareIn(
|
||||
scope = timelineScope,
|
||||
started = SharingStarted.Lazily,
|
||||
@ -57,10 +94,10 @@ public class GeneratingTimeline<E : TimelineEvent>(
|
||||
override fun events(): Flow<E> = events
|
||||
|
||||
public suspend fun interrupt(newStart: E) {
|
||||
check(newStart.time >= time.value) {
|
||||
check(timeOf(newStart) >= time.value) {
|
||||
"Can't interrupt generating timeline after observed event"
|
||||
}
|
||||
startTime = newStart.time
|
||||
startTime = timeOf(newStart)
|
||||
startEventFlow.emit(newStart)
|
||||
}
|
||||
}
|
@ -10,19 +10,22 @@ import kotlin.coroutines.CoroutineContext
|
||||
import kotlin.coroutines.EmptyCoroutineContext
|
||||
|
||||
|
||||
public class MergedTimeline<E : TimelineEvent>(
|
||||
public class MergedTimeline<E : Any>(
|
||||
private val timelines: List<Timeline<E>>,
|
||||
private val timeOf: E.() -> Instant,
|
||||
coroutineContext: CoroutineContext = EmptyCoroutineContext
|
||||
) : Timeline<E> {
|
||||
|
||||
protected val timelineScope: CoroutineScope = CoroutineScope(
|
||||
private val timelineScope: CoroutineScope = CoroutineScope(
|
||||
coroutineContext +
|
||||
SupervisorJob(coroutineContext[Job]) +
|
||||
CoroutineExceptionHandler{ _, throwable -> throwable.printStackTrace() } +
|
||||
CoroutineExceptionHandler { _, throwable -> throwable.printStackTrace() } +
|
||||
CoroutineName("MergedTimeline")
|
||||
)
|
||||
|
||||
override val time: StateFlow<Instant> = combine(timelines.map { it.time }){ array->
|
||||
override fun timeOf(event: E): Instant = event.timeOf()
|
||||
|
||||
override val time: StateFlow<Instant> = combine(timelines.map { it.time }) { array ->
|
||||
array.max()
|
||||
}.stateIn(timelineScope, SharingStarted.Lazily, timelines.maxOf { it.time.value })
|
||||
|
||||
@ -52,17 +55,17 @@ public class MergedTimeline<E : TimelineEvent>(
|
||||
|
||||
private val collectJob = timelineScope.launch(context) {
|
||||
channel.consumeAsFlow().onEach {
|
||||
time.emit(it.time)
|
||||
time.emit(timeOf(it))
|
||||
}.collector()
|
||||
}
|
||||
|
||||
private val mutex = Mutex()
|
||||
|
||||
override suspend fun collect(upTo: Instant) = mutex.withLock{
|
||||
override suspend fun collect(upTo: Instant) = mutex.withLock {
|
||||
timelineObservers.forEach {
|
||||
it.collect(upTo)
|
||||
}
|
||||
buffer.sortedBy { it.time }.forEach {
|
||||
buffer.sortedBy { timeOf(it) }.forEach {
|
||||
channel.send(it)
|
||||
buffer.remove(it)
|
||||
}
|
||||
|
@ -9,29 +9,42 @@ import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
|
||||
public abstract class ProducerTimeline<E : TimelineEvent>(
|
||||
/**
|
||||
* A general abstraction for timelines that could produce new events
|
||||
*/
|
||||
public abstract class ProducerTimeline<E : Any>(
|
||||
protected var startTime: Instant,
|
||||
private val timeOf: E.() -> Instant,
|
||||
coroutineContext: CoroutineContext
|
||||
) : Timeline<E>, AutoCloseable {
|
||||
|
||||
protected val timelineScope: CoroutineScope = CoroutineScope(
|
||||
coroutineContext +
|
||||
SupervisorJob(coroutineContext[Job]) +
|
||||
CoroutineExceptionHandler{ _, throwable -> throwable.printStackTrace() } +
|
||||
CoroutineName("Timeline")
|
||||
SupervisorJob(coroutineContext[Job]) +
|
||||
CoroutineExceptionHandler { _, throwable -> throwable.printStackTrace() } +
|
||||
CoroutineName("Timeline[${hashCode().toString(16)}]")
|
||||
)
|
||||
|
||||
override fun timeOf(event: E): Instant = event.timeOf()
|
||||
|
||||
private val observers: MutableSet<TimelineObserver> = mutableSetOf()
|
||||
|
||||
/**
|
||||
* Update time on this channel event
|
||||
*/
|
||||
private val feedbackChannel = Channel<Unit>(onBufferOverflow = BufferOverflow.DROP_OLDEST)
|
||||
|
||||
override val time: StateFlow<Instant> = feedbackChannel.consumeAsFlow().map {
|
||||
maxOf(startTime,observers.maxOfOrNull { it.time.value } ?: startTime)
|
||||
maxOf(startTime, observers.maxOfOrNull { it.time.value } ?: Instant.DISTANT_PAST)
|
||||
}.stateIn(timelineScope, SharingStarted.Lazily, startTime)
|
||||
|
||||
override suspend fun advance(toTime: Instant) {
|
||||
observers.forEach {
|
||||
it.collect(toTime)
|
||||
coroutineScope {
|
||||
observers.forEach {
|
||||
launch {
|
||||
it.collect(toTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -50,7 +63,7 @@ public abstract class ProducerTimeline<E : TimelineEvent>(
|
||||
|
||||
private val collectJob = timelineScope.launch(context) {
|
||||
channel.consumeAsFlow().onEach {
|
||||
time.emit(it.time)
|
||||
time.emit(timeOf(it))
|
||||
feedbackChannel.send(Unit)
|
||||
}.collector()
|
||||
}
|
||||
@ -60,7 +73,7 @@ public abstract class ProducerTimeline<E : TimelineEvent>(
|
||||
override suspend fun collect(upTo: Instant) = mutex.withLock {
|
||||
require(upTo >= time.value) { "Requested time $upTo is lower than observed ${time.value}" }
|
||||
events().takeWhile {
|
||||
it.time <= upTo
|
||||
timeOf(it) <= upTo
|
||||
}.collect {
|
||||
channel.send(it)
|
||||
}
|
||||
|
@ -7,23 +7,31 @@ import kotlinx.datetime.Instant
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
import kotlin.coroutines.EmptyCoroutineContext
|
||||
|
||||
public interface CollectingTimeline<E: Any>: Timeline<E>, TimelineCollector<E>
|
||||
|
||||
/**
|
||||
* A manually mutable [Timeline] that could be modified via [emit] method by multiple
|
||||
*
|
||||
* @param bufferSize the size of event buffer. If more than [bufferSize] events are emitted and not consumed via [observe], emitter suspends.
|
||||
*/
|
||||
public class SharedTimeline<E : TimelineEvent>(
|
||||
public class SharedTimeline<E : Any>(
|
||||
startTime: Instant,
|
||||
coroutineContext: CoroutineContext = EmptyCoroutineContext
|
||||
) : ProducerTimeline<E>(startTime, coroutineContext) {
|
||||
timeOf: E.() -> Instant,
|
||||
bufferSize: Int = Channel.UNLIMITED,
|
||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||
) : ProducerTimeline<E>(startTime, timeOf, coroutineContext), CollectingTimeline<E> {
|
||||
|
||||
private val events = MutableSharedFlow<E>(replay = Channel.UNLIMITED)
|
||||
private val events = MutableSharedFlow<E>(replay = bufferSize)
|
||||
|
||||
override fun events(): Flow<E> = events
|
||||
|
||||
override val lastEvent: E? get() = events.replayCache.lastOrNull()
|
||||
|
||||
/**
|
||||
* Emit new event to the timeline
|
||||
*/
|
||||
public suspend fun emit(event: E) {
|
||||
if (event.time < (events.replayCache.lastOrNull()?.time ?: time.value)) {
|
||||
override suspend fun emit(event: E) {
|
||||
if (timeOf(event) < (events.replayCache.lastOrNull()?.let(::timeOf) ?: time.value)) {
|
||||
error("Can't emit event $event because timeline monotony is broken")
|
||||
}
|
||||
events.emit(event)
|
||||
|
@ -5,21 +5,9 @@ import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlin.time.Duration
|
||||
|
||||
|
||||
public interface TimelineEvent {
|
||||
public val time: Instant
|
||||
}
|
||||
|
||||
public interface TimelineInterval : TimelineEvent {
|
||||
public val startTime: Instant
|
||||
public val duration: Duration
|
||||
|
||||
override val time: Instant
|
||||
get() = startTime + duration
|
||||
}
|
||||
|
||||
public data class SimpleTimelineEvent<T>(override val time: Instant, val value: T) : TimelineEvent
|
||||
|
||||
/**
|
||||
* A handler for observation of a timeline. On close stops collection.
|
||||
*/
|
||||
public interface TimelineObserver : AutoCloseable {
|
||||
/**
|
||||
* The subjective time of this observer (last observed time)
|
||||
@ -27,11 +15,10 @@ public interface TimelineObserver : AutoCloseable {
|
||||
public val time: StateFlow<Instant>
|
||||
|
||||
/**
|
||||
* Collect all uncollected events from [time] to [upTo].
|
||||
* Collect all uncollected events from [time] to [upTo]. Suspends until all valid events are collected.
|
||||
*
|
||||
* By default, collects all events.
|
||||
*/
|
||||
public suspend fun collect(upTo: Instant = Instant.DISTANT_FUTURE)
|
||||
public suspend fun collect(upTo: Instant)
|
||||
}
|
||||
|
||||
/**
|
||||
@ -47,15 +34,16 @@ public suspend fun TimelineObserver.collect(duration: Duration): Unit = collect(
|
||||
*
|
||||
* Timeline guarantees that already read events won't change, but unread events could change.
|
||||
*/
|
||||
public interface Timeline<E : TimelineEvent> {
|
||||
public interface Timeline<E : Any> {
|
||||
/**
|
||||
* A subjective time of this timeline. The subjective time is the last observed time.
|
||||
*/
|
||||
public val time: StateFlow<Instant>
|
||||
|
||||
public fun timeOf(event: E): Instant
|
||||
|
||||
/**
|
||||
* Attach observer to this [Timeline]. The observer collection is not triggered right away, but only on demand.
|
||||
* Attach observer to this [Timeline]. The observer collection is triggered by timeline itself.
|
||||
*
|
||||
* Each collection shifts [TimelineObserver.time] for this observer.
|
||||
*/
|
||||
@ -71,10 +59,11 @@ public interface Timeline<E : TimelineEvent> {
|
||||
public suspend fun advance(toTime: Instant)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Perform [collector] action on each event
|
||||
*/
|
||||
public suspend fun <E : TimelineEvent> Timeline<E>.observeEach(
|
||||
public suspend fun <E : Any> Timeline<E>.observeEach(
|
||||
collector: suspend (E) -> Unit
|
||||
): TimelineObserver = observe {
|
||||
collect(collector)
|
||||
|
20
simulation-kt/src/commonMain/kotlin/TimelineCollector.kt
Normal file
20
simulation-kt/src/commonMain/kotlin/TimelineCollector.kt
Normal file
@ -0,0 +1,20 @@
|
||||
package space.kscience.simulation
|
||||
|
||||
import kotlinx.coroutines.flow.FlowCollector
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlin.time.Duration
|
||||
|
||||
public interface TimelineCollector<E : Any> : FlowCollector<E> {
|
||||
public val time: StateFlow<Instant>
|
||||
public val lastEvent: E?
|
||||
}
|
||||
|
||||
public interface TimelineInterval : WithTime {
|
||||
public val startTime: Instant
|
||||
public val duration: Duration
|
||||
|
||||
override val time: Instant get() = startTime + duration
|
||||
}
|
||||
|
||||
public data class TimelineEvent<T>(override val time: Instant, val value: T) : WithTime
|
121
simulation-kt/src/commonMain/kotlin/TreeTimeline.kt
Normal file
121
simulation-kt/src/commonMain/kotlin/TreeTimeline.kt
Normal file
@ -0,0 +1,121 @@
|
||||
package space.kscience.simulation
|
||||
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.channels.BufferOverflow
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.flow.*
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
|
||||
/**
|
||||
* A timeline that could be forked. The events from the fork appear in parent timeline events, but not vise versa.
|
||||
*/
|
||||
public interface ForkingTimeline<E : Any> : CollectingTimeline<E> {
|
||||
public suspend fun fork(): ForkingTimeline<E>
|
||||
}
|
||||
|
||||
public class TreeTimeline<E : Any>(
|
||||
private val startTime: Instant,
|
||||
private val timeOf: E.() -> Instant,
|
||||
coroutineContext: CoroutineContext,
|
||||
) : ForkingTimeline<E>, AutoCloseable {
|
||||
|
||||
private val timelineScope: CoroutineScope = CoroutineScope(
|
||||
coroutineContext +
|
||||
SupervisorJob(coroutineContext[Job]) +
|
||||
CoroutineExceptionHandler { _, throwable -> throwable.printStackTrace() } +
|
||||
CoroutineName("TreeTimeline[${hashCode().toString(16)}]")
|
||||
)
|
||||
|
||||
override fun timeOf(event: E): Instant = timeOf(event)
|
||||
|
||||
private val _time = MutableStateFlow<Instant>(startTime)
|
||||
|
||||
override val time: StateFlow<Instant> get() = _time
|
||||
|
||||
override suspend fun advance(toTime: Instant) {
|
||||
coroutineScope {
|
||||
observers.forEach {
|
||||
launch {
|
||||
it.collect(toTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val mutex = Mutex()
|
||||
|
||||
private val buffer = mutableListOf<E>()
|
||||
|
||||
private val branches: MutableSet<TimelineObserver> = mutableSetOf()
|
||||
|
||||
private val events = MutableSharedFlow<E>(1)
|
||||
|
||||
override val lastEvent: E? get() = events.replayCache.lastOrNull()
|
||||
|
||||
override suspend fun emit(value: E) {
|
||||
mutex.withLock {
|
||||
buffer.add(value)
|
||||
}
|
||||
}
|
||||
|
||||
private val observers: MutableSet<TimelineObserver> = mutableSetOf()
|
||||
|
||||
/**
|
||||
* Update time on this channel event
|
||||
*/
|
||||
private val feedbackChannel = Channel<Unit>(onBufferOverflow = BufferOverflow.DROP_OLDEST)
|
||||
|
||||
override suspend fun observe(collector: suspend Flow<E>.() -> Unit): TimelineObserver {
|
||||
val context = currentCoroutineContext()
|
||||
val observer = object : TimelineObserver {
|
||||
// observed time
|
||||
override val time = MutableStateFlow(startTime)
|
||||
|
||||
private val channel = Channel<E>()
|
||||
|
||||
private val collectJob = timelineScope.launch(context) {
|
||||
channel.consumeAsFlow().onEach {
|
||||
time.emit(timeOf(it))
|
||||
feedbackChannel.send(Unit)
|
||||
}.collector()
|
||||
}
|
||||
|
||||
private val mutex = Mutex()
|
||||
|
||||
override suspend fun collect(upTo: Instant) = mutex.withLock {
|
||||
require(upTo >= time.value) { "Requested time $upTo is lower than observed ${time.value}" }
|
||||
TODO("Not yet implemented")
|
||||
// events().takeWhile {
|
||||
// timeOf(it) <= upTo
|
||||
// }.collect {
|
||||
// channel.send(it)
|
||||
// }
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
collectJob.cancel()
|
||||
observers.remove(this)
|
||||
}
|
||||
|
||||
}
|
||||
observers.add(observer)
|
||||
return observer
|
||||
}
|
||||
|
||||
override suspend fun fork(): TreeTimeline<E> {
|
||||
val theFork = TreeTimeline(time.value, timeOf, timelineScope.coroutineContext)
|
||||
branches.add(theFork.observeEach {
|
||||
emit(it)
|
||||
})
|
||||
return theFork
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
observers.forEach { it.close() }
|
||||
branches.forEach { it.close() }
|
||||
timelineScope.cancel()
|
||||
}
|
||||
}
|
7
simulation-kt/src/commonMain/kotlin/WithTime.kt
Normal file
7
simulation-kt/src/commonMain/kotlin/WithTime.kt
Normal file
@ -0,0 +1,7 @@
|
||||
package space.kscience.simulation
|
||||
|
||||
import kotlinx.datetime.Instant
|
||||
|
||||
public interface WithTime {
|
||||
public val time: Instant
|
||||
}
|
@ -15,14 +15,15 @@ class TimelineTests {
|
||||
val startTime = Instant.parse("2020-01-01T00:00:00.000Z")
|
||||
|
||||
val generation = GeneratingTimeline(
|
||||
origin = SimpleTimelineEvent(startTime, Unit),
|
||||
lookaheadInterval = 1.seconds
|
||||
origin = TimelineEvent(startTime, Unit),
|
||||
lookaheadInterval = 1.seconds,
|
||||
timeOf = WithTime::time
|
||||
) { event ->
|
||||
var time = event.time
|
||||
while (isActive) {
|
||||
time += 0.1.seconds
|
||||
println("Emit: ${time - startTime}")
|
||||
emit(SimpleTimelineEvent(time, Unit))
|
||||
emit(TimelineEvent(time, Unit))
|
||||
}
|
||||
}
|
||||
|
||||
@ -38,8 +39,8 @@ class TimelineTests {
|
||||
observer.collect(2.seconds)
|
||||
println("Second collection complete")
|
||||
println("Interrupt")
|
||||
generation.interrupt(SimpleTimelineEvent(startTime + 6.seconds, Unit))
|
||||
println("Collecting after interruption")
|
||||
// generation.interrupt(TimelineEvent(startTime + 6.seconds, Unit))
|
||||
// println("Collecting after interruption")
|
||||
observer.collect(startTime + 6.seconds + 2.5.seconds)
|
||||
println(result)
|
||||
generation.close()
|
||||
|
Loading…
x
Reference in New Issue
Block a user