Migration to DF-0.5 and bug fixes

This commit is contained in:
Alexander Nozik 2021-08-02 16:42:19 +03:00
parent b068403429
commit 3606bc3a46
35 changed files with 559 additions and 618 deletions

View File

@ -7,7 +7,7 @@ This repository contains a prototype of API and simple implementation
of a slow control system, including a demo.
DataForge-control uses some concepts and modules of DataForge,
such as `Meta` (immutable tree-like structure) and `MetaItem` (which
such as `Meta` (immutable tree-like structure) and `Meta` (which
includes a scalar value, or a tree of values, easily convertable to/from JSON
if needed).

View File

@ -2,16 +2,13 @@ plugins {
id("ru.mipt.npm.gradle.project")
}
val dataforgeVersion: String by extra("0.4.3")
val dataforgeVersion: String by extra("0.5.0-dev-7")
val ktorVersion: String by extra(ru.mipt.npm.gradle.KScienceVersions.ktorVersion)
val rsocketVersion by extra("0.12.0")
val rsocketVersion by extra("0.13.1")
allprojects {
group = "ru.mipt.npm"
version = "0.1.0"
repositories{
jcenter()
}
version = "0.1.1"
}
ksciencePublish {

View File

@ -3,12 +3,12 @@ package ru.mipt.npm.controls.api
import io.ktor.utils.io.core.Closeable
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.Flow
import ru.mipt.npm.controls.api.Device.Companion.DEVICE_TARGET
import space.kscience.dataforge.context.ContextAware
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaItem
import space.kscience.dataforge.misc.Type
import space.kscience.dataforge.names.Name
/**
@ -31,12 +31,12 @@ public interface Device : Closeable, ContextAware, CoroutineScope {
/**
* Read physical state of property and update/push notifications if needed.
*/
public suspend fun readItem(propertyName: String): MetaItem
public suspend fun readProperty(propertyName: String): Meta
/**
* Get the logical state of property or return null if it is invalid
*/
public fun getItem(propertyName: String): MetaItem?
public fun getProperty(propertyName: String): Meta?
/**
* Invalidate property (set logical state to invalid)
@ -47,18 +47,19 @@ public interface Device : Closeable, ContextAware, CoroutineScope {
* Set property [value] for a property with name [propertyName].
* In rare cases could suspend if the [Device] supports command queue and it is full at the moment.
*/
public suspend fun writeItem(propertyName: String, value: MetaItem)
public suspend fun writeItem(propertyName: String, value: Meta)
/**
* The [SharedFlow] of property changes
* A subscription-based [Flow] of [DeviceMessage] provided by device. The flow is guaranteed to be readable
* multiple times
*/
public val propertyFlow: SharedFlow<Pair<String, MetaItem>>
public val messageFlow: Flow<DeviceMessage>
/**
* Send an action request and suspend caller while request is being processed.
* Could return null if request does not return a meaningful answer.
*/
public suspend fun execute(action: String, argument: MetaItem? = null): MetaItem?
public suspend fun execute(action: String, argument: Meta? = null): Meta?
override fun close() {
cancel("The device is closed")
@ -73,16 +74,16 @@ public interface Device : Closeable, ContextAware, CoroutineScope {
/**
* Get the logical state of property or suspend to read the physical value.
*/
public suspend fun Device.getOrReadItem(propertyName: String): MetaItem =
getItem(propertyName) ?: readItem(propertyName)
public suspend fun Device.getOrReadItem(propertyName: String): Meta =
getProperty(propertyName) ?: readProperty(propertyName)
/**
* Get a snapshot of logical state of the device
*/
public fun Device.getProperties(): Meta = Meta {
for (descriptor in propertyDescriptors) {
descriptor.name put getItem(descriptor.name)
setMeta(Name.parse(descriptor.name), getProperty(descriptor.name))
}
}
//public suspend fun Device.execute(name: String, meta: Meta?): MetaItem? = execute(name, meta?.let { MetaItemNode(it) })
//public suspend fun Device.execute(name: String, meta: Meta?): Meta? = execute(name, meta?.let { MetaNode(it) })

View File

@ -1,6 +1,6 @@
package ru.mipt.npm.controls.api
import space.kscience.dataforge.meta.MetaItem
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.*
import space.kscience.dataforge.provider.Provider
@ -8,8 +8,6 @@ import space.kscience.dataforge.provider.Provider
* A hub that could locate multiple devices and redirect actions to them
*/
public interface DeviceHub : Provider {
public val deviceName: String
public val devices: Map<NameToken, Device>
override val defaultTarget: String get() = Device.DEVICE_TARGET
@ -53,19 +51,19 @@ public fun DeviceHub.getOrNull(name: Name): Device? = when {
public operator fun DeviceHub.get(name: Name): Device =
getOrNull(name) ?: error("Device with name $name not found in $this")
public fun DeviceHub.getOrNull(nameString: String): Device? = getOrNull(nameString.toName())
public fun DeviceHub.getOrNull(nameString: String): Device? = getOrNull(Name.parse(nameString))
public operator fun DeviceHub.get(nameString: String): Device =
getOrNull(nameString) ?: error("Device with name $nameString not found in $this")
public suspend fun DeviceHub.readItem(deviceName: Name, propertyName: String): MetaItem =
this[deviceName].readItem(propertyName)
public suspend fun DeviceHub.readProperty(deviceName: Name, propertyName: String): Meta =
this[deviceName].readProperty(propertyName)
public suspend fun DeviceHub.writeItem(deviceName: Name, propertyName: String, value: MetaItem) {
public suspend fun DeviceHub.writeItem(deviceName: Name, propertyName: String, value: Meta) {
this[deviceName].writeItem(propertyName, value)
}
public suspend fun DeviceHub.execute(deviceName: Name, command: String, argument: MetaItem?): MetaItem? =
public suspend fun DeviceHub.execute(deviceName: Name, command: String, argument: Meta?): Meta? =
this[deviceName].execute(command, argument)

View File

@ -6,20 +6,27 @@ import kotlinx.serialization.json.Json
import kotlinx.serialization.json.decodeFromJsonElement
import kotlinx.serialization.json.encodeToJsonElement
import space.kscience.dataforge.io.SimpleEnvelope
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.toJson
import space.kscience.dataforge.meta.toMeta
import space.kscience.dataforge.names.Name
@Serializable
public sealed class DeviceMessage {
public abstract val sourceDevice: String?
public abstract val targetDevice: String?
public abstract val sourceDevice: Name?
public abstract val targetDevice: Name?
public abstract val comment: String?
/**
* Update the source device name for composition. If the original name is null, resulting name is also null.
*/
public abstract fun changeSource(block: (Name) -> Name): DeviceMessage
public companion object {
public fun error(
cause: Throwable,
sourceDevice: String,
targetDevice: String? = null,
sourceDevice: Name,
targetDevice: Name? = null,
): DeviceErrorMessage = DeviceErrorMessage(
errorMessage = cause.message,
errorType = cause::class.simpleName,
@ -42,11 +49,13 @@ public sealed class DeviceMessage {
@SerialName("property.changed")
public data class PropertyChangedMessage(
public val property: String,
public val value: MetaItem?,
override val sourceDevice: String,
override val targetDevice: String? = null,
public val value: Meta?,
override val sourceDevice: Name = Name.EMPTY,
override val targetDevice: Name? = null,
override val comment: String? = null,
) : DeviceMessage()
) : DeviceMessage(){
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
/**
* A command to set or invalidate property. [targetDevice] is mandatory.
@ -55,11 +64,13 @@ public data class PropertyChangedMessage(
@SerialName("property.set")
public data class PropertySetMessage(
public val property: String,
public val value: MetaItem?,
override val sourceDevice: String? = null,
override val targetDevice: String,
public val value: Meta?,
override val sourceDevice: Name? = null,
override val targetDevice: Name,
override val comment: String? = null,
) : DeviceMessage()
) : DeviceMessage(){
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
}
/**
* A command to request property value asynchronously. [targetDevice] is mandatory.
@ -69,10 +80,12 @@ public data class PropertySetMessage(
@SerialName("property.get")
public data class PropertyGetMessage(
public val property: String,
override val sourceDevice: String? = null,
override val targetDevice: String,
override val sourceDevice: Name? = null,
override val targetDevice: Name,
override val comment: String? = null,
) : DeviceMessage()
) : DeviceMessage(){
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
}
/**
* Request device description. The result is returned in form of [DescriptionMessage]
@ -80,10 +93,12 @@ public data class PropertyGetMessage(
@Serializable
@SerialName("description.get")
public data class GetDescriptionMessage(
override val sourceDevice: String? = null,
override val targetDevice: String,
override val sourceDevice: Name? = null,
override val targetDevice: Name,
override val comment: String? = null,
) : DeviceMessage()
) : DeviceMessage(){
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
}
/**
* The full device description message
@ -92,10 +107,12 @@ public data class GetDescriptionMessage(
@SerialName("description")
public data class DescriptionMessage(
val description: Meta,
override val sourceDevice: String,
override val targetDevice: String? = null,
override val sourceDevice: Name,
override val targetDevice: Name? = null,
override val comment: String? = null,
) : DeviceMessage()
) : DeviceMessage(){
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
/**
* A request to execute an action. [targetDevice] is mandatory
@ -104,11 +121,13 @@ public data class DescriptionMessage(
@SerialName("action.execute")
public data class ActionExecuteMessage(
public val action: String,
public val argument: MetaItem?,
override val sourceDevice: String? = null,
override val targetDevice: String,
public val argument: Meta?,
override val sourceDevice: Name? = null,
override val targetDevice: Name,
override val comment: String? = null,
) : DeviceMessage()
) : DeviceMessage(){
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
}
/**
* Asynchronous action result. [sourceDevice] is mandatory
@ -117,11 +136,13 @@ public data class ActionExecuteMessage(
@SerialName("action.result")
public data class ActionResultMessage(
public val action: String,
public val result: MetaItem?,
override val sourceDevice: String,
override val targetDevice: String? = null,
public val result: Meta?,
override val sourceDevice: Name,
override val targetDevice: Name? = null,
override val comment: String? = null,
) : DeviceMessage()
) : DeviceMessage(){
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
/**
* Notifies listeners that a new binary with given [binaryID] is available. The binary itself could not be provided via [DeviceMessage] API.
@ -130,10 +151,12 @@ public data class ActionResultMessage(
@SerialName("binary.notification")
public data class BinaryNotificationMessage(
val binaryID: String,
override val sourceDevice: String,
override val targetDevice: String? = null,
override val sourceDevice: Name,
override val targetDevice: Name? = null,
override val comment: String? = null,
) : DeviceMessage()
) : DeviceMessage(){
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
/**
* The message states that the message is received, but no meaningful response is produced.
@ -142,10 +165,12 @@ public data class BinaryNotificationMessage(
@Serializable
@SerialName("empty")
public data class EmptyDeviceMessage(
override val sourceDevice: String? = null,
override val targetDevice: String? = null,
override val sourceDevice: Name? = null,
override val targetDevice: Name? = null,
override val comment: String? = null,
) : DeviceMessage()
) : DeviceMessage(){
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
}
/**
* Information log message
@ -154,11 +179,13 @@ public data class EmptyDeviceMessage(
@SerialName("log")
public data class DeviceLogMessage(
val message: String,
val data: MetaItem? = null,
override val sourceDevice: String? = null,
override val targetDevice: String? = null,
val data: Meta? = null,
override val sourceDevice: Name? = null,
override val targetDevice: Name? = null,
override val comment: String? = null,
) : DeviceMessage()
) : DeviceMessage(){
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
}
/**
* The evaluation of the message produced a service error
@ -169,12 +196,14 @@ public data class DeviceErrorMessage(
public val errorMessage: String?,
public val errorType: String? = null,
public val errorStackTrace: String? = null,
override val sourceDevice: String,
override val targetDevice: String? = null,
override val sourceDevice: Name,
override val targetDevice: Name? = null,
override val comment: String? = null,
) : DeviceMessage()
) : DeviceMessage(){
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
public fun DeviceMessage.toMeta(): JsonMeta = Json.encodeToJsonElement(this).toMetaItem().node!!
public fun DeviceMessage.toMeta(): Meta = Json.encodeToJsonElement(this).toMeta()
public fun DeviceMessage.toEnvelope(): SimpleEnvelope = SimpleEnvelope(toMeta(), null)

View File

@ -2,13 +2,9 @@ package ru.mipt.npm.controls.base
import ru.mipt.npm.controls.api.ActionDescriptor
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaItem
import space.kscience.dataforge.meta.asMetaItem
public interface DeviceAction {
public val name: String
public val descriptor: ActionDescriptor
public suspend operator fun invoke(arg: MetaItem? = null): MetaItem?
public suspend operator fun invoke(arg: Meta? = null): Meta?
}
public suspend operator fun DeviceAction.invoke(meta: Meta): MetaItem? = invoke(meta.asMetaItem())

View File

@ -7,12 +7,11 @@ import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import ru.mipt.npm.controls.api.ActionDescriptor
import ru.mipt.npm.controls.api.Device
import ru.mipt.npm.controls.api.PropertyDescriptor
import ru.mipt.npm.controls.api.*
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.MetaItem
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
import kotlin.collections.set
import kotlin.coroutines.CoroutineContext
//TODO move to DataForge-core
@ -24,28 +23,33 @@ public data class LogEntry(val content: String, val priority: Int = 0)
private open class BasicReadOnlyDeviceProperty(
val device: DeviceBase,
override val name: String,
default: MetaItem?,
default: Meta?,
override val descriptor: PropertyDescriptor,
private val getter: suspend (before: MetaItem?) -> MetaItem,
private val getter: suspend (before: Meta?) -> Meta,
) : ReadOnlyDeviceProperty {
override val scope: CoroutineScope get() = device
private val state: MutableStateFlow<MetaItem?> = MutableStateFlow(default)
override val value: MetaItem? get() = state.value
private val state: MutableStateFlow<Meta?> = MutableStateFlow(default)
override val value: Meta? get() = state.value
override suspend fun invalidate() {
state.value = null
}
override fun updateLogical(item: MetaItem) {
override fun updateLogical(item: Meta) {
state.value = item
scope.launch {
device.sharedPropertyFlow.emit(Pair(name, item))
device.sharedMessageFlow.emit(
PropertyChangedMessage(
property = name,
value = item,
)
)
}
}
override suspend fun read(force: Boolean): MetaItem {
override suspend fun read(force: Boolean): Meta {
//backup current value
val currentValue = value
return if (force || currentValue == null) {
@ -61,7 +65,7 @@ private open class BasicReadOnlyDeviceProperty(
}
}
override fun flow(): StateFlow<MetaItem?> = state
override fun flow(): StateFlow<Meta?> = state
}
@ -69,13 +73,13 @@ private open class BasicReadOnlyDeviceProperty(
private class BasicDeviceProperty(
device: DeviceBase,
name: String,
default: MetaItem?,
default: Meta?,
descriptor: PropertyDescriptor,
getter: suspend (MetaItem?) -> MetaItem,
private val setter: suspend (oldValue: MetaItem?, newValue: MetaItem) -> MetaItem?,
getter: suspend (Meta?) -> Meta,
private val setter: suspend (oldValue: Meta?, newValue: Meta) -> Meta?,
) : BasicReadOnlyDeviceProperty(device, name, default, descriptor, getter), DeviceProperty {
override var value: MetaItem?
override var value: Meta?
get() = super.value
set(value) {
scope.launch {
@ -89,7 +93,7 @@ private class BasicDeviceProperty(
private val writeLock = Mutex()
override suspend fun write(item: MetaItem) {
override suspend fun write(item: Meta) {
writeLock.withLock {
//fast return if value is not changed
if (item == value) return@withLock
@ -113,16 +117,14 @@ public abstract class DeviceBase(final override val context: Context) : Device {
override val coroutineContext: CoroutineContext =
context.coroutineContext + SupervisorJob(context.coroutineContext[Job])
private val _properties = HashMap<String, ReadOnlyDeviceProperty>()
public val properties: Map<String, ReadOnlyDeviceProperty> get() = _properties
private val _actions = HashMap<String, DeviceAction>()
public val actions: Map<String, DeviceAction> get() = _actions
internal val sharedPropertyFlow = MutableSharedFlow<Pair<String, MetaItem>>()
override val propertyFlow: SharedFlow<Pair<String, MetaItem>> get() = sharedPropertyFlow
internal val sharedMessageFlow = MutableSharedFlow<DeviceMessage>()
override val messageFlow: SharedFlow<DeviceMessage> get() = sharedMessageFlow
private val sharedLogFlow = MutableSharedFlow<LogEntry>()
/**
@ -152,23 +154,23 @@ public abstract class DeviceBase(final override val context: Context) : Device {
_actions[name] = action
}
override suspend fun readItem(propertyName: String): MetaItem =
override suspend fun readProperty(propertyName: String): Meta =
(_properties[propertyName] ?: error("Property with name $propertyName not defined")).read()
override fun getItem(propertyName: String): MetaItem?=
override fun getProperty(propertyName: String): Meta? =
(_properties[propertyName] ?: error("Property with name $propertyName not defined")).value
override suspend fun invalidate(propertyName: String) {
(_properties[propertyName] ?: error("Property with name $propertyName not defined")).invalidate()
}
override suspend fun writeItem(propertyName: String, value: MetaItem) {
override suspend fun writeItem(propertyName: String, value: Meta) {
(_properties[propertyName] as? DeviceProperty ?: error("Property with name $propertyName not defined")).write(
value
)
}
override suspend fun execute(action: String, argument: MetaItem?): MetaItem? =
override suspend fun execute(action: String, argument: Meta?): Meta? =
(_actions[action] ?: error("Request with name $action not defined")).invoke(argument)
/**
@ -176,9 +178,9 @@ public abstract class DeviceBase(final override val context: Context) : Device {
*/
public fun createReadOnlyProperty(
name: String,
default: MetaItem?,
default: Meta?,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
getter: suspend (MetaItem?) -> MetaItem,
getter: suspend (Meta?) -> Meta,
): ReadOnlyDeviceProperty {
val property = BasicReadOnlyDeviceProperty(
this,
@ -197,10 +199,10 @@ public abstract class DeviceBase(final override val context: Context) : Device {
*/
internal fun createMutableProperty(
name: String,
default: MetaItem?,
default: Meta?,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
getter: suspend (MetaItem?) -> MetaItem,
setter: suspend (oldValue: MetaItem?, newValue: MetaItem) -> MetaItem?,
getter: suspend (Meta?) -> Meta,
setter: suspend (oldValue: Meta?, newValue: Meta) -> Meta?,
): DeviceProperty {
val property = BasicDeviceProperty(
this,
@ -220,9 +222,9 @@ public abstract class DeviceBase(final override val context: Context) : Device {
private inner class BasicDeviceAction(
override val name: String,
override val descriptor: ActionDescriptor,
private val block: suspend (MetaItem?) -> MetaItem?,
private val block: suspend (Meta?) -> Meta?,
) : DeviceAction {
override suspend fun invoke(arg: MetaItem?): MetaItem? =
override suspend fun invoke(arg: Meta?): Meta? =
withContext(coroutineContext) {
block(arg)
}
@ -234,7 +236,7 @@ public abstract class DeviceBase(final override val context: Context) : Device {
internal fun createAction(
name: String,
descriptorBuilder: ActionDescriptor.() -> Unit = {},
block: suspend (MetaItem?) -> MetaItem?,
block: suspend (Meta?) -> Meta?,
): DeviceAction {
val action = BasicDeviceAction(name, ActionDescriptor(name).apply(descriptorBuilder), block)
registerAction(name, action)

View File

@ -3,7 +3,7 @@ package ru.mipt.npm.controls.base
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import ru.mipt.npm.controls.api.PropertyDescriptor
import space.kscience.dataforge.meta.MetaItem
import space.kscience.dataforge.meta.Meta
import kotlin.time.Duration
/**
@ -30,24 +30,24 @@ public interface ReadOnlyDeviceProperty {
/**
* Directly update property logical value and notify listener without writing it to device
*/
public fun updateLogical(item: MetaItem)
public fun updateLogical(item: Meta)
/**
* Get cached value and return null if value is invalid or not initialized
*/
public val value: MetaItem?
public val value: Meta?
/**
* Read value either from cache if cache is valid or directly from physical device.
* If [force], reread from physical state even if the logical state is set.
*/
public suspend fun read(force: Boolean = false): MetaItem
public suspend fun read(force: Boolean = false): Meta
/**
* The [Flow] representing future logical states of the property.
* Produces null when the state is invalidated
*/
public fun flow(): Flow<MetaItem?>
public fun flow(): Flow<Meta?>
}
@ -65,10 +65,10 @@ public fun ReadOnlyDeviceProperty.readEvery(duration: Duration): Job = scope.lau
* A writeable device property with non-suspended write
*/
public interface DeviceProperty : ReadOnlyDeviceProperty {
override var value: MetaItem?
override var value: Meta?
/**
* Write value to physical device. Invalidates logical value, but does not update it automatically
*/
public suspend fun write(item: MetaItem)
public suspend fun write(item: Meta)
}

View File

@ -2,7 +2,7 @@ package ru.mipt.npm.controls.base
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import space.kscience.dataforge.meta.MetaItem
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.transformations.MetaConverter
/**
@ -14,14 +14,18 @@ public open class TypedReadOnlyDeviceProperty<T : Any>(
) : ReadOnlyDeviceProperty by property {
public fun updateLogical(obj: T) {
property.updateLogical(converter.objectToMetaItem(obj))
property.updateLogical(converter.objectToMeta(obj))
}
public open val typedValue: T? get() = value?.let { converter.itemToObject(it) }
public open val typedValue: T? get() = value?.let { converter.metaToObject(it) }
public suspend fun readTyped(force: Boolean = false): T = converter.itemToObject(read(force))
public suspend fun readTyped(force: Boolean = false): T {
val meta = read(force)
return converter.metaToObject(meta)
?: error("Meta $meta could not be converted by $converter")
}
public fun flowTyped(): Flow<T?> = flow().map { it?.let { converter.itemToObject(it) } }
public fun flowTyped(): Flow<T?> = flow().map { it?.let { converter.metaToObject(it) } }
}
/**
@ -32,23 +36,23 @@ public class TypedDeviceProperty<T : Any>(
converter: MetaConverter<T>,
) : TypedReadOnlyDeviceProperty<T>(property, converter), DeviceProperty {
override var value: MetaItem?
override var value: Meta?
get() = property.value
set(arg) {
property.value = arg
}
public override var typedValue: T?
get() = value?.let { converter.itemToObject(it) }
get() = value?.let { converter.metaToObject(it) }
set(arg) {
property.value = arg?.let { converter.objectToMetaItem(arg) }
property.value = arg?.let { converter.objectToMeta(arg) }
}
override suspend fun write(item: MetaItem) {
override suspend fun write(item: Meta) {
property.write(item)
}
public suspend fun write(obj: T) {
property.write(converter.objectToMetaItem(obj))
property.write(converter.objectToMeta(obj))
}
}

View File

@ -1,10 +1,8 @@
package ru.mipt.npm.controls.base
import ru.mipt.npm.controls.api.ActionDescriptor
import space.kscience.dataforge.meta.MetaBuilder
import space.kscience.dataforge.meta.MetaItem
import space.kscience.dataforge.meta.MetaItemNode
import space.kscience.dataforge.meta.MetaItemValue
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.values.Value
import kotlin.properties.PropertyDelegateProvider
import kotlin.properties.ReadOnlyProperty
@ -22,7 +20,7 @@ public typealias ActionDelegate = ReadOnlyProperty<DeviceBase, DeviceAction>
private class ActionProvider<D : DeviceBase>(
val owner: D,
val descriptorBuilder: ActionDescriptor.() -> Unit = {},
val block: suspend (MetaItem?) -> MetaItem?,
val block: suspend (Meta?) -> Meta?,
) : PropertyDelegateProvider<D, ActionDelegate> {
override operator fun provideDelegate(thisRef: D, property: KProperty<*>): ActionDelegate {
val name = property.name
@ -33,28 +31,27 @@ private class ActionProvider<D : DeviceBase>(
public fun DeviceBase.requesting(
descriptorBuilder: ActionDescriptor.() -> Unit = {},
action: suspend (MetaItem?) -> MetaItem?,
action: suspend (Meta?) -> Meta?,
): PropertyDelegateProvider<DeviceBase, ActionDelegate> = ActionProvider(this, descriptorBuilder, action)
public fun <D : DeviceBase> D.requestingValue(
descriptorBuilder: ActionDescriptor.() -> Unit = {},
action: suspend (MetaItem?) -> Any?,
action: suspend (Meta?) -> Any?,
): PropertyDelegateProvider<D, ActionDelegate> = ActionProvider(this, descriptorBuilder) {
val res = action(it)
MetaItemValue(Value.of(res))
Meta(Value.of(res))
}
public fun <D : DeviceBase> D.requestingMeta(
descriptorBuilder: ActionDescriptor.() -> Unit = {},
action: suspend MetaBuilder.(MetaItem?) -> Unit,
action: suspend MutableMeta.(Meta?) -> Unit,
): PropertyDelegateProvider<D, ActionDelegate> = ActionProvider(this, descriptorBuilder) {
val res = MetaBuilder().apply { action(it) }
MetaItemNode(res)
Meta { action(it) }
}
public fun DeviceBase.acting(
descriptorBuilder: ActionDescriptor.() -> Unit = {},
action: suspend (MetaItem?) -> Unit,
action: suspend (Meta?) -> Unit,
): PropertyDelegateProvider<DeviceBase, ActionDelegate> = ActionProvider(this, descriptorBuilder) {
action(it)
null

View File

@ -1,7 +1,10 @@
package ru.mipt.npm.controls.base
import ru.mipt.npm.controls.api.PropertyDescriptor
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.meta.boolean
import space.kscience.dataforge.meta.double
import space.kscience.dataforge.meta.transformations.MetaConverter
import space.kscience.dataforge.values.Null
import space.kscience.dataforge.values.Value
@ -29,9 +32,9 @@ public typealias TypedReadOnlyPropertyDelegate<T> = ReadOnlyProperty<DeviceBase,
private class ReadOnlyDevicePropertyProvider<D : DeviceBase>(
val owner: D,
val default: MetaItem?,
val default: Meta?,
val descriptorBuilder: PropertyDescriptor.() -> Unit = {},
private val getter: suspend (MetaItem?) -> MetaItem,
private val getter: suspend (Meta?) -> Meta,
) : PropertyDelegateProvider<D, ReadOnlyPropertyDelegate> {
override operator fun provideDelegate(thisRef: D, property: KProperty<*>): ReadOnlyPropertyDelegate {
@ -43,10 +46,10 @@ private class ReadOnlyDevicePropertyProvider<D : DeviceBase>(
private class TypedReadOnlyDevicePropertyProvider<D : DeviceBase, T : Any>(
val owner: D,
val default: MetaItem?,
val default: Meta?,
val converter: MetaConverter<T>,
val descriptorBuilder: PropertyDescriptor.() -> Unit = {},
private val getter: suspend (MetaItem?) -> MetaItem,
private val getter: suspend (Meta?) -> Meta,
) : PropertyDelegateProvider<D, TypedReadOnlyPropertyDelegate<T>> {
override operator fun provideDelegate(thisRef: D, property: KProperty<*>): TypedReadOnlyPropertyDelegate<T> {
@ -57,9 +60,9 @@ private class TypedReadOnlyDevicePropertyProvider<D : DeviceBase, T : Any>(
}
public fun DeviceBase.reading(
default: MetaItem? = null,
default: Meta? = null,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
getter: suspend (MetaItem?) -> MetaItem,
getter: suspend (Meta?) -> Meta,
): PropertyDelegateProvider<DeviceBase, ReadOnlyPropertyDelegate> = ReadOnlyDevicePropertyProvider(
this,
default,
@ -73,9 +76,9 @@ public fun DeviceBase.readingValue(
getter: suspend () -> Any?,
): PropertyDelegateProvider<DeviceBase, ReadOnlyPropertyDelegate> = ReadOnlyDevicePropertyProvider(
this,
default?.let { MetaItemValue(it) },
default?.let { Meta(it) },
descriptorBuilder,
getter = { MetaItemValue(Value.of(getter())) }
getter = { Meta(Value.of(getter())) }
)
public fun DeviceBase.readingNumber(
@ -84,12 +87,12 @@ public fun DeviceBase.readingNumber(
getter: suspend () -> Number,
): PropertyDelegateProvider<DeviceBase, TypedReadOnlyPropertyDelegate<Number>> = TypedReadOnlyDevicePropertyProvider(
this,
default?.let { MetaItemValue(it.asValue()) },
default?.let { Meta(it.asValue()) },
MetaConverter.number,
descriptorBuilder,
getter = {
val number = getter()
MetaItemValue(number.asValue())
Meta(number.asValue())
}
)
@ -99,12 +102,12 @@ public fun DeviceBase.readingDouble(
getter: suspend () -> Double,
): PropertyDelegateProvider<DeviceBase, TypedReadOnlyPropertyDelegate<Double>> = TypedReadOnlyDevicePropertyProvider(
this,
default?.let { MetaItemValue(it.asValue()) },
default?.let { Meta(it.asValue()) },
MetaConverter.double,
descriptorBuilder,
getter = {
val number = getter()
MetaItemValue(number.asValue())
Meta(number.asValue())
}
)
@ -114,12 +117,12 @@ public fun DeviceBase.readingString(
getter: suspend () -> String,
): PropertyDelegateProvider<DeviceBase, TypedReadOnlyPropertyDelegate<String>> = TypedReadOnlyDevicePropertyProvider(
this,
default?.let { MetaItemValue(it.asValue()) },
default?.let { Meta(it.asValue()) },
MetaConverter.string,
descriptorBuilder,
getter = {
val number = getter()
MetaItemValue(number.asValue())
Meta(number.asValue())
}
)
@ -129,26 +132,26 @@ public fun DeviceBase.readingBoolean(
getter: suspend () -> Boolean,
): PropertyDelegateProvider<DeviceBase, TypedReadOnlyPropertyDelegate<Boolean>> = TypedReadOnlyDevicePropertyProvider(
this,
default?.let { MetaItemValue(it.asValue()) },
default?.let { Meta(it.asValue()) },
MetaConverter.boolean,
descriptorBuilder,
getter = {
val boolean = getter()
MetaItemValue(boolean.asValue())
Meta(boolean.asValue())
}
)
public fun DeviceBase.readingMeta(
default: Meta? = null,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
getter: suspend MetaBuilder.() -> Unit,
getter: suspend MutableMeta.() -> Unit,
): PropertyDelegateProvider<DeviceBase, TypedReadOnlyPropertyDelegate<Meta>> = TypedReadOnlyDevicePropertyProvider(
this,
default?.let { MetaItemNode(it) },
default,
MetaConverter.meta,
descriptorBuilder,
getter = {
MetaItemNode(MetaBuilder().apply { getter() })
Meta { getter() }
}
)
@ -170,10 +173,10 @@ public typealias TypedPropertyDelegate<T> = ReadOnlyProperty<DeviceBase, TypedDe
private class DevicePropertyProvider<D : DeviceBase>(
val owner: D,
val default: MetaItem?,
val default: Meta?,
val descriptorBuilder: PropertyDescriptor.() -> Unit = {},
private val getter: suspend (MetaItem?) -> MetaItem,
private val setter: suspend (oldValue: MetaItem?, newValue: MetaItem) -> MetaItem?,
private val getter: suspend (Meta?) -> Meta,
private val setter: suspend (oldValue: Meta?, newValue: Meta) -> Meta?,
) : PropertyDelegateProvider<D, PropertyDelegate> {
override operator fun provideDelegate(thisRef: D, property: KProperty<*>): PropertyDelegate {
@ -185,11 +188,11 @@ private class DevicePropertyProvider<D : DeviceBase>(
private class TypedDevicePropertyProvider<D : DeviceBase, T : Any>(
val owner: D,
val default: MetaItem?,
val default: Meta?,
val converter: MetaConverter<T>,
val descriptorBuilder: PropertyDescriptor.() -> Unit = {},
private val getter: suspend (MetaItem?) -> MetaItem,
private val setter: suspend (oldValue: MetaItem?, newValue: MetaItem) -> MetaItem?,
private val getter: suspend (Meta?) -> Meta,
private val setter: suspend (oldValue: Meta?, newValue: Meta) -> Meta?,
) : PropertyDelegateProvider<D, TypedPropertyDelegate<T>> {
override operator fun provideDelegate(thisRef: D, property: KProperty<*>): TypedPropertyDelegate<T> {
@ -200,10 +203,10 @@ private class TypedDevicePropertyProvider<D : DeviceBase, T : Any>(
}
public fun DeviceBase.writing(
default: MetaItem? = null,
default: Meta? = null,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
getter: suspend (MetaItem?) -> MetaItem,
setter: suspend (oldValue: MetaItem?, newValue: MetaItem) -> MetaItem?,
getter: suspend (Meta?) -> Meta,
setter: suspend (oldValue: Meta?, newValue: Meta) -> Meta?,
): PropertyDelegateProvider<DeviceBase, PropertyDelegate> = DevicePropertyProvider(
this,
default,
@ -213,7 +216,7 @@ public fun DeviceBase.writing(
)
public fun DeviceBase.writingVirtual(
default: MetaItem,
default: Meta,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
): PropertyDelegateProvider<DeviceBase, PropertyDelegate> = writing(
default,
@ -226,19 +229,9 @@ public fun DeviceBase.writingVirtual(
default: Value,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
): PropertyDelegateProvider<DeviceBase, PropertyDelegate> = writing(
MetaItemValue(default),
Meta(default),
descriptorBuilder,
getter = { it ?: MetaItemValue(default) },
setter = { _, newItem -> newItem }
)
public fun DeviceBase.writingVirtual(
default: Meta,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
): PropertyDelegateProvider<DeviceBase, PropertyDelegate> = writing(
MetaItemNode(default),
descriptorBuilder,
getter = { it ?: MetaItemNode(default) },
getter = { it ?: Meta(default) },
setter = { _, newItem -> newItem }
)
@ -247,17 +240,17 @@ public fun <D : DeviceBase> D.writingDouble(
getter: suspend (Double) -> Double,
setter: suspend (oldValue: Double?, newValue: Double) -> Double?,
): PropertyDelegateProvider<D, TypedPropertyDelegate<Double>> {
val innerGetter: suspend (MetaItem?) -> MetaItem = {
MetaItemValue(getter(it.double ?: Double.NaN).asValue())
val innerGetter: suspend (Meta?) -> Meta = {
Meta(getter(it.double ?: Double.NaN).asValue())
}
val innerSetter: suspend (oldValue: MetaItem?, newValue: MetaItem) -> MetaItem? = { oldValue, newValue ->
setter(oldValue.double, newValue.double ?: Double.NaN)?.asMetaItem()
val innerSetter: suspend (oldValue: Meta?, newValue: Meta) -> Meta? = { oldValue, newValue ->
setter(oldValue.double, newValue.double ?: Double.NaN)?.asMeta()
}
return TypedDevicePropertyProvider(
this,
MetaItemValue(Double.NaN.asValue()),
Meta(Double.NaN.asValue()),
MetaConverter.double,
descriptorBuilder,
innerGetter,
@ -270,18 +263,18 @@ public fun <D : DeviceBase> D.writingBoolean(
getter: suspend (Boolean?) -> Boolean,
setter: suspend (oldValue: Boolean?, newValue: Boolean) -> Boolean?,
): PropertyDelegateProvider<D, TypedPropertyDelegate<Boolean>> {
val innerGetter: suspend (MetaItem?) -> MetaItem = {
MetaItemValue(getter(it.boolean).asValue())
val innerGetter: suspend (Meta?) -> Meta = {
Meta(getter(it.boolean).asValue())
}
val innerSetter: suspend (oldValue: MetaItem?, newValue: MetaItem) -> MetaItem? = { oldValue, newValue ->
val innerSetter: suspend (oldValue: Meta?, newValue: Meta) -> Meta? = { oldValue, newValue ->
setter(oldValue.boolean, newValue.boolean ?: error("Can't convert $newValue to boolean"))?.asValue()
?.asMetaItem()
?.let { Meta(it) }
}
return TypedDevicePropertyProvider(
this,
MetaItemValue(Null),
Meta(Null),
MetaConverter.boolean,
descriptorBuilder,
innerGetter,

View File

@ -1,6 +1,9 @@
package ru.mipt.npm.controls.base
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.double
import space.kscience.dataforge.meta.enum
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.transformations.MetaConverter
import space.kscience.dataforge.values.asValue
import space.kscience.dataforge.values.double
@ -8,20 +11,18 @@ import kotlin.time.Duration
import kotlin.time.DurationUnit
import kotlin.time.toDuration
public fun Double.asMetaItem(): MetaItemValue = MetaItemValue(asValue())
public fun Double.asMeta(): Meta = Meta(asValue())
//TODO to be moved to DF
public object DurationConverter : MetaConverter<Duration> {
override fun itemToObject(item: MetaItem): Duration = when (item) {
is MetaItemNode -> {
val unit: DurationUnit = item.node["unit"].enum<DurationUnit>() ?: DurationUnit.SECONDS
val value = item.node[Meta.VALUE_KEY].double ?: error("No value present for Duration")
value.toDuration(unit)
override fun metaToObject(meta: Meta): Duration = meta.value?.double?.toDuration(DurationUnit.SECONDS)
?: run {
val unit: DurationUnit = meta["unit"].enum<DurationUnit>() ?: DurationUnit.SECONDS
val value = meta[Meta.VALUE_KEY].double ?: error("No value present for Duration")
return@run value.toDuration(unit)
}
is MetaItemValue -> item.value.double.toDuration(DurationUnit.SECONDS)
}
override fun objectToMetaItem(obj: Duration): MetaItem = obj.toDouble(DurationUnit.SECONDS).asMetaItem()
override fun objectToMeta(obj: Duration): Meta = obj.toDouble(DurationUnit.SECONDS).asMeta()
}
public val MetaConverter.Companion.duration: MetaConverter<Duration> get() = DurationConverter

View File

@ -1,151 +0,0 @@
package ru.mipt.npm.controls.controllers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import ru.mipt.npm.controls.api.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaItem
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.toName
/**
* The [DeviceController] wraps device operations in [DeviceMessage]
*/
@OptIn(DFExperimental::class)
public class DeviceController(
public val device: Device,
public val deviceName: String,
) {
private val propertyChanges = device.propertyFlow.map { (propertyName: String, value: MetaItem) ->
PropertyChangedMessage(
sourceDevice = deviceName,
property = propertyName,
value = value,
)
}
/**
* The flow of outgoing messages
*/
public val messages: Flow<DeviceMessage> get() = propertyChanges
public suspend fun respondMessage(message: DeviceMessage): DeviceMessage =
respondMessage(device, deviceName, message)
public companion object {
public const val GET_PROPERTY_ACTION: String = "read"
public const val SET_PROPERTY_ACTION: String = "write"
public const val EXECUTE_ACTION: String = "execute"
public const val PROPERTY_LIST_ACTION: String = "propertyList"
public const val ACTION_LIST_ACTION: String = "actionList"
// internal suspend fun respond(device: Device, deviceTarget: String, request: Envelope): Envelope {
// val target = request.meta["target"].string
// return try {
// if (device is Responder) {
// device.respond(request)
// } else if (request.data == null) {
// respondMessage(device, deviceTarget, DeviceMessage.fromMeta(request.meta)).toEnvelope()
// } else if (target != null && target != deviceTarget) {
// error("Wrong target name $deviceTarget expected but $target found")
// } else error("Device does not support binary response")
// } catch (ex: Exception) {
// val requestSourceName = request.meta[DeviceMessage.SOURCE_KEY].string
// DeviceMessage.error(ex, sourceDevice = deviceTarget, targetDevice = requestSourceName).toEnvelope()
// }
// }
internal suspend fun respondMessage(
device: Device,
deviceTarget: String,
request: DeviceMessage,
): DeviceMessage = try {
when (request) {
is PropertyGetMessage -> {
PropertyChangedMessage(
property = request.property,
value = device.getOrReadItem(request.property),
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice
)
}
is PropertySetMessage -> {
if (request.value == null) {
device.invalidate(request.property)
} else {
device.writeItem(request.property, request.value)
}
PropertyChangedMessage(
property = request.property,
value = device.getOrReadItem(request.property),
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice
)
}
is ActionExecuteMessage -> {
ActionResultMessage(
action = request.action,
result = device.execute(request.action, request.argument),
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice
)
}
is GetDescriptionMessage -> {
val descriptionMeta = Meta {
"properties" put {
device.propertyDescriptors.map { descriptor ->
descriptor.name put descriptor.toMeta()
}
}
"actions" put {
device.actionDescriptors.map { descriptor ->
descriptor.name put descriptor.toMeta()
}
}
}
DescriptionMessage(
description = descriptionMeta,
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice
)
}
is DescriptionMessage,
is PropertyChangedMessage,
is ActionResultMessage,
is BinaryNotificationMessage,
is DeviceErrorMessage,
is EmptyDeviceMessage,
is DeviceLogMessage,
-> {
//Those messages are ignored
EmptyDeviceMessage(
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice,
comment = "The message is ignored"
)
}
}
} catch (ex: Exception) {
DeviceMessage.error(ex, sourceDevice = deviceTarget, targetDevice = request.sourceDevice)
}
}
}
public suspend fun DeviceHub.respondMessage(request: DeviceMessage): DeviceMessage {
return try {
val targetName = request.targetDevice?.toName() ?: Name.EMPTY
val device = this.getOrNull(targetName) ?: error("The device with name $targetName not found in $this")
DeviceController.respondMessage(device, targetName.toString(), request)
} catch (ex: Exception) {
DeviceMessage.error(ex, sourceDevice = deviceName, targetDevice = request.sourceDevice)
}
}

View File

@ -4,15 +4,14 @@ import ru.mipt.npm.controls.api.Device
import ru.mipt.npm.controls.api.DeviceHub
import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaBuilder
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken
import kotlin.collections.set
import kotlin.properties.ReadOnlyProperty
import kotlin.reflect.KClass
public class DeviceManager(override val deviceName: String = "") : AbstractPlugin(), DeviceHub {
public class DeviceManager : AbstractPlugin(), DeviceHub {
override val tag: PluginTag get() = Companion.tag
/**
@ -21,10 +20,6 @@ public class DeviceManager(override val deviceName: String = "") : AbstractPlugi
private val top = HashMap<NameToken, Device>()
override val devices: Map<NameToken, Device> get() = top
public val controller: HubController by lazy {
HubController(this)
}
public fun registerDevice(name: NameToken, device: Device) {
top[name] = device
}
@ -35,8 +30,7 @@ public class DeviceManager(override val deviceName: String = "") : AbstractPlugi
override val tag: PluginTag = PluginTag("devices", group = PluginTag.DATAFORGE_GROUP)
override val type: KClass<out DeviceManager> = DeviceManager::class
override fun invoke(meta: Meta, context: Context): DeviceManager =
DeviceManager(meta["deviceName"].string ?: "")
override fun invoke(meta: Meta, context: Context): DeviceManager = DeviceManager()
}
}
@ -47,11 +41,14 @@ public fun <D : Device> DeviceManager.install(name: String, factory: Factory<D>,
return device
}
public fun <D : Device> DeviceManager.installing(
public inline fun <D : Device> DeviceManager.installing(
factory: Factory<D>,
metaBuilder: MetaBuilder.() -> Unit = {},
): ReadOnlyProperty<Any?, D> = ReadOnlyProperty { _, property ->
val name = property.name
install(name, factory, Meta(metaBuilder))
builder: MutableMeta.() -> Unit = {},
): ReadOnlyProperty<Any?, D> {
val meta = Meta(builder)
return ReadOnlyProperty { _, property ->
val name = property.name
install(name, factory, meta)
}
}

View File

@ -1,79 +0,0 @@
package ru.mipt.npm.controls.controllers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
import ru.mipt.npm.controls.api.DeviceHub
import ru.mipt.npm.controls.api.DeviceMessage
import ru.mipt.npm.controls.api.getOrNull
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.toName
@OptIn(DFExperimental::class)
public class HubController(
public val hub: DeviceHub,
) {
private val messageOutbox = Channel<DeviceMessage>(Channel.CONFLATED)
// private val envelopeOutbox = Channel<Envelope>(Channel.CONFLATED)
public fun messageOutput(): Flow<DeviceMessage> = messageOutbox.consumeAsFlow()
// public fun envelopeOutput(): Flow<Envelope> = envelopeOutbox.consumeAsFlow()
// private val packJob = scope.launch {
// while (isActive) {
// val message = messageOutbox.receive()
// envelopeOutbox.send(message.toEnvelope())
// }
// }
// private val listeners: Map<NameToken, DeviceListener> = hub.devices.mapValues { (deviceNameToken, device) ->
// object : DeviceListener {
// override fun propertyChanged(propertyName: String, value: MetaItem?) {
// if (value == null) return
// scope.launch {
// val change = PropertyChangedMessage(
// sourceDevice = deviceNameToken.toString(),
// key = propertyName,
// value = value
// )
// messageOutbox.send(change)
// }
// }
// }.also {
// device.registerListener(it)
// }
// }
public suspend fun respondMessage(message: DeviceMessage): DeviceMessage = try {
val targetName = message.targetDevice?.toName() ?: Name.EMPTY
val device = hub.getOrNull(targetName) ?: error("The device with name $targetName not found in $hub")
DeviceController.respondMessage(device, targetName.toString(), message)
} catch (ex: Exception) {
DeviceMessage.error(ex, sourceDevice = hub.deviceName, targetDevice = message.sourceDevice)
}
//
// override suspend fun respond(request: Envelope): Envelope = try {
// val targetName = request.meta[DeviceMessage.TARGET_KEY].string?.toName() ?: Name.EMPTY
// val device = hub[targetName] ?: error("The device with name $targetName not found in $hub")
// if (request.data == null) {
// DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.fromMeta(request.meta))
// .toEnvelope()
// } else {
// DeviceController.respond(device, targetName.toString(), request)
// }
// } catch (ex: Exception) {
// DeviceMessage.error(ex, sourceDevice = null).toEnvelope()
// }
//
// override fun consume(message: Envelope) {
// // Fire the respond procedure and forget about the result
// scope.launch {
// respond(message)
// }
// }
}

View File

@ -0,0 +1,115 @@
package ru.mipt.npm.controls.controllers
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import ru.mipt.npm.controls.api.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.plus
public suspend fun Device.respondMessage(deviceTarget: Name, request: DeviceMessage): DeviceMessage? = try {
when (request) {
is PropertyGetMessage -> {
PropertyChangedMessage(
property = request.property,
value = getOrReadItem(request.property),
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice
)
}
is PropertySetMessage -> {
if (request.value == null) {
invalidate(request.property)
} else {
writeItem(request.property, request.value)
}
PropertyChangedMessage(
property = request.property,
value = getOrReadItem(request.property),
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice
)
}
is ActionExecuteMessage -> {
ActionResultMessage(
action = request.action,
result = execute(request.action, request.argument),
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice
)
}
is GetDescriptionMessage -> {
val descriptionMeta = Meta {
"properties" put {
propertyDescriptors.map { descriptor ->
descriptor.name put descriptor.toMeta()
}
}
"actions" put {
actionDescriptors.map { descriptor ->
descriptor.name put descriptor.toMeta()
}
}
}
DescriptionMessage(
description = descriptionMeta,
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice
)
}
is DescriptionMessage,
is PropertyChangedMessage,
is ActionResultMessage,
is BinaryNotificationMessage,
is DeviceErrorMessage,
is EmptyDeviceMessage,
is DeviceLogMessage,
-> null
}
} catch (ex: Exception) {
DeviceMessage.error(ex, sourceDevice = deviceTarget, targetDevice = request.sourceDevice)
}
public suspend fun DeviceHub.respondHubMessage(request: DeviceMessage): DeviceMessage? {
return try {
val targetName = request.targetDevice ?: return null
val device = getOrNull(targetName) ?: error("The device with name $targetName not found in $this")
device.respondMessage(targetName, request)
} catch (ex: Exception) {
DeviceMessage.error(ex, sourceDevice = Name.EMPTY, targetDevice = request.sourceDevice)
}
}
/**
* Collect all messages from given [DeviceHub], applying proper relative names
*/
public fun DeviceHub.hubMessageFlow(scope: CoroutineScope): Flow<DeviceMessage> {
val outbox = MutableSharedFlow<DeviceMessage>()
if (this is Device) {
messageFlow.onEach {
outbox.emit(it)
}.launchIn(scope)
}
//TODO maybe better create map of all devices to limit copying
devices.forEach { (token, childDevice) ->
val flow = if (childDevice is DeviceHub) {
childDevice.hubMessageFlow(scope)
} else {
childDevice.messageFlow
}
flow.onEach { deviceMessage ->
outbox.emit(
deviceMessage.changeSource { token + it }
)
}.launchIn(scope)
}
return outbox
}

View File

@ -7,13 +7,10 @@ import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import ru.mipt.npm.controls.api.ActionDescriptor
import ru.mipt.npm.controls.api.Device
import ru.mipt.npm.controls.api.PropertyDescriptor
import ru.mipt.npm.controls.api.*
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaItem
import space.kscience.dataforge.meta.transformations.MetaConverter
import kotlin.coroutines.CoroutineContext
import kotlin.properties.Delegates.observable
@ -48,11 +45,11 @@ public open class DeviceBySpec<D : DeviceBySpec<D>>(
context.coroutineContext + SupervisorJob(context.coroutineContext[Job])
}
private val logicalState: HashMap<String, MetaItem?> = HashMap()
private val logicalState: HashMap<String, Meta?> = HashMap()
private val _propertyFlow: MutableSharedFlow<Pair<String, MetaItem>> = MutableSharedFlow()
private val sharedMessageFlow: MutableSharedFlow<DeviceMessage> = MutableSharedFlow()
override val propertyFlow: SharedFlow<Pair<String, MetaItem>> get() = _propertyFlow
public override val messageFlow: SharedFlow<DeviceMessage> get() = sharedMessageFlow
@Suppress("UNCHECKED_CAST")
internal val self: D
@ -60,13 +57,13 @@ public open class DeviceBySpec<D : DeviceBySpec<D>>(
private val stateLock = Mutex()
private suspend fun updateLogical(propertyName: String, value: MetaItem?) {
private suspend fun updateLogical(propertyName: String, value: Meta?) {
if (value != logicalState[propertyName]) {
stateLock.withLock {
logicalState[propertyName] = value
}
if (value != null) {
_propertyFlow.emit(propertyName to value)
sharedMessageFlow.emit(PropertyChangedMessage(propertyName, value))
}
}
}
@ -75,14 +72,14 @@ public open class DeviceBySpec<D : DeviceBySpec<D>>(
* Force read physical value and push an update if it is changed. It does not matter if logical state is present.
* The logical state is updated after read
*/
override suspend fun readItem(propertyName: String): MetaItem {
override suspend fun readProperty(propertyName: String): Meta {
val newValue = properties[propertyName]?.readItem(self)
?: error("A property with name $propertyName is not registered in $this")
updateLogical(propertyName, newValue)
return newValue
}
override fun getItem(propertyName: String): MetaItem? = logicalState[propertyName]
override fun getProperty(propertyName: String): Meta? = logicalState[propertyName]
override suspend fun invalidate(propertyName: String) {
stateLock.withLock {
@ -90,7 +87,7 @@ public open class DeviceBySpec<D : DeviceBySpec<D>>(
}
}
override suspend fun writeItem(propertyName: String, value: MetaItem): Unit {
override suspend fun writeItem(propertyName: String, value: Meta): Unit {
//If there is a physical property with given name, invalidate logical property and write physical one
(properties[propertyName] as? WritableDevicePropertySpec<D, out Any>)?.let {
it.writeItem(self, value)
@ -100,7 +97,7 @@ public open class DeviceBySpec<D : DeviceBySpec<D>>(
}
}
override suspend fun execute(action: String, argument: MetaItem?): MetaItem? =
override suspend fun execute(action: String, argument: Meta?): Meta? =
actions[action]?.executeItem(self, argument)
@ -114,7 +111,7 @@ public open class DeviceBySpec<D : DeviceBySpec<D>>(
if (oldValue != newValue) {
launch {
invalidate(property.name)
_propertyFlow.emit(property.name to converter.objectToMetaItem(newValue))
sharedMessageFlow.emit(PropertyChangedMessage(property.name, converter.objectToMeta(newValue)))
}
}
}
@ -124,11 +121,11 @@ public open class DeviceBySpec<D : DeviceBySpec<D>>(
*/
public suspend fun <T : Any> DevicePropertySpec<D, T>.read(): T {
val res = read(self)
updateLogical(name, converter.objectToMetaItem(res))
updateLogical(name, converter.objectToMeta(res))
return res
}
public fun <T : Any> DevicePropertySpec<D, T>.get(): T? = getItem(name)?.let(converter::itemToObject)
public fun <T : Any> DevicePropertySpec<D, T>.get(): T? = getProperty(name)?.let(converter::metaToObject)
/**
* Write typed property state and invalidate logical state

View File

@ -3,10 +3,10 @@ package ru.mipt.npm.controls.properties
import ru.mipt.npm.controls.api.ActionDescriptor
import ru.mipt.npm.controls.api.Device
import ru.mipt.npm.controls.api.PropertyDescriptor
import space.kscience.dataforge.meta.MetaItem
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.transformations.MetaConverter
import space.kscience.dataforge.meta.transformations.nullableItemToObject
import space.kscience.dataforge.meta.transformations.nullableObjectToMetaItem
import space.kscience.dataforge.meta.transformations.nullableMetaToObject
import space.kscience.dataforge.meta.transformations.nullableObjectToMeta
/**
@ -40,8 +40,8 @@ public interface DevicePropertySpec<in D : Device, T : Any> {
}
@OptIn(InternalDeviceAPI::class)
public suspend fun <D : Device, T : Any> DevicePropertySpec<D, T>.readItem(device: D): MetaItem =
converter.objectToMetaItem(read(device))
public suspend fun <D : Device, T : Any> DevicePropertySpec<D, T>.readItem(device: D): Meta =
converter.objectToMeta(read(device))
public interface WritableDevicePropertySpec<in D : Device, T : Any> : DevicePropertySpec<D, T> {
@ -53,8 +53,8 @@ public interface WritableDevicePropertySpec<in D : Device, T : Any> : DeviceProp
}
@OptIn(InternalDeviceAPI::class)
public suspend fun <D : Device, T : Any> WritableDevicePropertySpec<D, T>.writeItem(device: D, item: MetaItem) {
write(device, converter.itemToObject(item))
public suspend fun <D : Device, T : Any> WritableDevicePropertySpec<D, T>.writeItem(device: D, item: Meta) {
write(device, converter.metaToObject(item) ?: error("Meta $item could not be read with $converter"))
}
public interface DeviceActionSpec<in D : Device, I : Any, O : Any> {
@ -80,9 +80,9 @@ public interface DeviceActionSpec<in D : Device, I : Any, O : Any> {
public suspend fun <D : Device, I : Any, O : Any> DeviceActionSpec<D, I, O>.executeItem(
device: D,
item: MetaItem?
): MetaItem? {
val arg = inputConverter.nullableItemToObject(item)
item: Meta?
): Meta? {
val arg = inputConverter.nullableMetaToObject(item)
val res = execute(device, arg)
return outputConverter.nullableObjectToMetaItem(res)
return outputConverter.nullableObjectToMeta(res)
}

View File

@ -2,8 +2,6 @@ package ru.mipt.npm.controls.properties
import ru.mipt.npm.controls.api.PropertyDescriptor
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaItem
import space.kscience.dataforge.meta.TypedMetaItem
import space.kscience.dataforge.meta.transformations.MetaConverter
import kotlin.properties.PropertyDelegateProvider
import kotlin.properties.ReadOnlyProperty
@ -39,13 +37,6 @@ public fun <D : DeviceBySpec<D>> DeviceSpec<D>.stringProperty(
): PropertyDelegateProvider<DeviceSpec<D>, ReadOnlyProperty<DeviceSpec<D>, DevicePropertySpec<D, String>>> =
property(MetaConverter.string, name, descriptorBuilder, read)
public fun <D : DeviceBySpec<D>> DeviceSpec<D>.itemProperty(
name: String? = null,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
read: suspend D.() -> MetaItem
): PropertyDelegateProvider<DeviceSpec<D>, ReadOnlyProperty<DeviceSpec<D>, DevicePropertySpec<D, MetaItem>>> =
property(MetaConverter.item, name, descriptorBuilder, read)
public fun <D : DeviceBySpec<D>> DeviceSpec<D>.metaProperty(
name: String? = null,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
@ -88,14 +79,6 @@ public fun <D : DeviceBySpec<D>> DeviceSpec<D>.stringProperty(
): PropertyDelegateProvider<DeviceSpec<D>, ReadOnlyProperty<DeviceSpec<D>, WritableDevicePropertySpec<D, String>>> =
property(MetaConverter.string, name, descriptorBuilder, read, write)
public fun <D : DeviceBySpec<D>> DeviceSpec<D>.itemProperty(
name: String? = null,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
read: suspend D.() -> MetaItem,
write: suspend D.(MetaItem) -> Unit
): PropertyDelegateProvider<DeviceSpec<D>, ReadOnlyProperty<DeviceSpec<D>, WritableDevicePropertySpec<D, TypedMetaItem<*>>>> =
property(MetaConverter.item, name, descriptorBuilder, read, write)
public fun <D : DeviceBySpec<D>> DeviceSpec<D>.metaProperty(
name: String? = null,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},

View File

@ -2,7 +2,7 @@ package ru.mipt.npm.controls.controllers
import kotlinx.coroutines.runBlocking
import ru.mipt.npm.controls.base.*
import space.kscience.dataforge.meta.MetaItem
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.transformations.MetaConverter
import kotlin.properties.ReadOnlyProperty
import kotlin.properties.ReadWriteProperty
@ -12,7 +12,7 @@ import kotlin.time.Duration
/**
* Blocking read of the value
*/
public operator fun ReadOnlyDeviceProperty.getValue(thisRef: Any?, property: KProperty<*>): MetaItem =
public operator fun ReadOnlyDeviceProperty.getValue(thisRef: Any?, property: KProperty<*>): Meta =
runBlocking(scope.coroutineContext) {
read()
}
@ -22,7 +22,7 @@ public operator fun <T: Any> TypedReadOnlyDeviceProperty<T>.getValue(thisRef: An
readTyped()
}
public operator fun DeviceProperty.setValue(thisRef: Any?, property: KProperty<*>, value: MetaItem) {
public operator fun DeviceProperty.setValue(thisRef: Any?, property: KProperty<*>, value: Meta) {
this.value = value
}
@ -36,7 +36,8 @@ public fun <T : Any> ReadOnlyDeviceProperty.convert(
): ReadOnlyProperty<Any?, T> {
return ReadOnlyProperty { _, _ ->
runBlocking(scope.coroutineContext) {
read(forceRead).let { metaConverter.itemToObject(it) }
val meta = read(forceRead)
metaConverter.metaToObject(meta)?: error("Meta $meta could not be converted by $metaConverter")
}
}
}
@ -47,11 +48,12 @@ public fun <T : Any> DeviceProperty.convert(
): ReadWriteProperty<Any?, T> {
return object : ReadWriteProperty<Any?, T> {
override fun getValue(thisRef: Any?, property: KProperty<*>): T = runBlocking(scope.coroutineContext) {
read(forceRead).let { metaConverter.itemToObject(it) }
val meta = read(forceRead)
metaConverter.metaToObject(meta)?: error("Meta $meta could not be converted by $metaConverter")
}
override fun setValue(thisRef: Any?, property: KProperty<*>, value: T) {
this@convert.setValue(thisRef, property, value.let { metaConverter.objectToMetaItem(it) })
this@convert.setValue(thisRef, property, value.let { metaConverter.objectToMeta(it) })
}
}
}

View File

@ -7,7 +7,8 @@ import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import ru.mipt.npm.controls.api.DeviceMessage
import ru.mipt.npm.controls.controllers.DeviceManager
import ru.mipt.npm.controls.controllers.respondMessage
import ru.mipt.npm.controls.controllers.hubMessageFlow
import ru.mipt.npm.controls.controllers.respondHubMessage
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixMessage
import space.kscience.dataforge.context.error
@ -25,25 +26,28 @@ internal fun generateId(request: MagixMessage<*>): String = if (request.id != nu
/**
* Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1)
*/
public fun DeviceManager.launchDfMagix(
public fun DeviceManager.connectToMagix(
endpoint: MagixEndpoint<DeviceMessage>,
endpointID: String = DATAFORGE_MAGIX_FORMAT,
): Job = context.launch {
endpoint.subscribe().onEach { request ->
val responsePayload = respondMessage(request.payload)
val response = MagixMessage(
format = DATAFORGE_MAGIX_FORMAT,
id = generateId(request),
parentId = request.id,
origin = endpointID,
payload = responsePayload
)
endpoint.broadcast(response)
val responsePayload = respondHubMessage(request.payload)
if (responsePayload != null) {
val response = MagixMessage(
format = DATAFORGE_MAGIX_FORMAT,
id = generateId(request),
parentId = request.id,
origin = endpointID,
payload = responsePayload
)
endpoint.broadcast(response)
}
}.catch { error ->
logger.error(error) { "Error while responding to message" }
}.launchIn(this)
controller.messageOutput().onEach { payload ->
hubMessageFlow(this).onEach { payload ->
endpoint.broadcast(
MagixMessage(
format = DATAFORGE_MAGIX_FORMAT,

View File

@ -2,7 +2,7 @@ package ru.mipt.npm.controls.client
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import space.kscience.dataforge.meta.MetaItem
import space.kscience.dataforge.meta.Meta
/*
@ -31,7 +31,7 @@ public data class EqData(
@SerialName("type_id")
val typeId: Int,
val type: String? = null,
val value: MetaItem? = null,
val value: Meta? = null,
@SerialName("event_id")
val eventId: Int? = null,
val error: Int? = null,

View File

@ -12,7 +12,7 @@ import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixMessage
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.meta.MetaItem
import space.kscience.dataforge.meta.Meta
public const val TANGO_MAGIX_FORMAT: String = "tango"
@ -54,11 +54,11 @@ public data class TangoPayload(
val host: String,
val device: String,
val name: String,
val value: MetaItem? = null,
val value: Meta? = null,
val quality: TangoQuality = TangoQuality.VALID,
val argin: MetaItem? = null,
val argout: MetaItem? = null,
val data: MetaItem? = null,
val argin: Meta? = null,
val argout: Meta? = null,
val data: Meta? = null,
val errors: List<String>? = null
)

View File

@ -29,13 +29,15 @@ import ru.mipt.npm.controls.api.PropertyGetMessage
import ru.mipt.npm.controls.api.PropertySetMessage
import ru.mipt.npm.controls.api.getOrNull
import ru.mipt.npm.controls.controllers.DeviceManager
import ru.mipt.npm.controls.controllers.respondMessage
import ru.mipt.npm.controls.controllers.respondHubMessage
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.server.GenericMagixMessage
import ru.mipt.npm.magix.server.launchMagixServerRawRSocket
import ru.mipt.npm.magix.server.magixModule
import space.kscience.dataforge.meta.toJson
import space.kscience.dataforge.meta.toMetaItem
import space.kscience.dataforge.meta.toMeta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
/**
* Create and start a web server for several devices
@ -70,7 +72,7 @@ public fun ApplicationEngine.whenStarted(callback: Application.() -> Unit) {
}
public const val WEB_SERVER_TARGET: String = "@webServer"
public val WEB_SERVER_TARGET: Name = "@webServer".asName()
public fun Application.deviceManagerModule(
manager: DeviceManager,
@ -158,7 +160,7 @@ public fun Application.deviceManagerModule(
val request: DeviceMessage = MagixEndpoint.magixJson.decodeFromString(DeviceMessage.serializer(), body)
val response = manager.respondMessage(request)
val response = manager.respondHubMessage(request)
call.respondMessage(response)
}
@ -171,11 +173,11 @@ public fun Application.deviceManagerModule(
val property: String by call.parameters
val request = PropertyGetMessage(
sourceDevice = WEB_SERVER_TARGET,
targetDevice = target,
targetDevice = Name.parse(target),
property = property,
)
val response = manager.respondMessage(request)
val response = manager.respondHubMessage(request)
call.respondMessage(response)
}
post("set") {
@ -186,12 +188,12 @@ public fun Application.deviceManagerModule(
val request = PropertySetMessage(
sourceDevice = WEB_SERVER_TARGET,
targetDevice = target,
targetDevice = Name.parse(target),
property = property,
value = json.toMetaItem()
value = json.toMeta()
)
val response = manager.respondMessage(request)
val response = manager.respondHubMessage(request)
call.respondMessage(response)
}
}

View File

@ -12,15 +12,21 @@ repositories{
maven("https://kotlin.bintray.com/kotlinx")
}
val ktorVersion: String by rootProject.extra
val rsocketVersion: String by rootProject.extra
dependencies{
implementation(projects.controlsCore)
//implementation(projects.controlsServer)
implementation(projects.magix.magixServer)
implementation(projects.controlsMagixClient)
implementation(projects.magix.magixRsocket)
implementation(projects.magix.magixZmq)
implementation("io.ktor:ktor-client-cio:$ktorVersion")
implementation("no.tornado:tornadofx:1.7.20")
implementation("space.kscience:plotlykt-server:0.4.2")
implementation("space.kscience:plotlykt-server:0.5.0-dev-1")
implementation("com.github.Ricky12Awesome:json-schema-serialization:0.6.6")
implementation("ch.qos.logback:logback-classic:1.2.3")
}
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach {

View File

@ -7,11 +7,12 @@ import javafx.scene.layout.Priority
import javafx.stage.Stage
import kotlinx.coroutines.launch
import ru.mipt.npm.controls.api.DeviceMessage
import ru.mipt.npm.controls.client.launchDfMagix
import ru.mipt.npm.controls.client.connectToMagix
import ru.mipt.npm.controls.controllers.DeviceManager
import ru.mipt.npm.controls.controllers.install
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.rsocket.rSocketWithTcp
import ru.mipt.npm.magix.rsocket.rSocketWithWebSockets
import ru.mipt.npm.magix.server.startMagixServer
import space.kscience.dataforge.context.*
import tornadofx.*
@ -34,16 +35,18 @@ class DemoController : Controller(), ContextAware {
context.launch {
device = deviceManager.install("demo", DemoDevice)
//starting magix event loop
magixServer = startMagixServer()
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true)
//Launch device client and connect it to the server
deviceManager.launchDfMagix(MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer()))
visualizer = startDemoDeviceServer()
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer())
deviceManager.connectToMagix(deviceEndpoint)
val visualEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost", DeviceMessage.serializer())
visualizer = visualEndpoint.startDemoDeviceServer()
}
}
fun shutdown() {
logger.info { "Shutting down..." }
visualizer?.stop(1000,5000)
visualizer?.stop(1000, 5000)
logger.info { "Visualization server stopped" }
magixServer?.stop(1000, 5000)
logger.info { "Magix server stopped" }
@ -104,10 +107,10 @@ class DemoControllerView : View(title = " Demo controller remote") {
button("Show plots") {
useMaxWidth = true
action {
controller.magixServer?.run {
controller.visualizer?.run {
val host = "localhost"//environment.connectors.first().host
val port = environment.connectors.first().port
val uri = URI("http", null, host, port, "/plots", null, null)
val uri = URI("http", null, host, port, "/", null, null)
Desktop.getDesktop().browse(uri)
}
}

View File

@ -47,7 +47,7 @@ class DemoDevice : DeviceBySpec<DemoDevice>(DemoDevice) {
@OptIn(ExperimentalTime::class)
override fun DemoDevice.onStartup() {
doRecurring(Duration.milliseconds(50)){
doRecurring(Duration.milliseconds(10)){
sin.read()
cos.read()
}

View File

@ -1,8 +1,12 @@
package ru.mipt.npm.controls.demo
import io.ktor.application.install
import io.ktor.features.CORS
import io.ktor.server.cio.CIO
import io.ktor.server.engine.ApplicationEngine
import io.ktor.server.engine.embeddedServer
import io.ktor.websocket.WebSockets
import io.rsocket.kotlin.transport.ktor.server.RSocketSupport
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.html.div
@ -10,8 +14,7 @@ import kotlinx.html.link
import ru.mipt.npm.controls.api.DeviceMessage
import ru.mipt.npm.controls.api.PropertyChangedMessage
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.rsocket.rSocketWithWebSockets
import space.kscience.dataforge.meta.MetaItem
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.double
import space.kscience.plotly.layout
import space.kscience.plotly.models.Trace
@ -51,85 +54,92 @@ suspend fun Trace.updateXYFrom(flow: Flow<Iterable<Pair<Double, Double>>>) {
}
suspend fun startDemoDeviceServer(magixHost: String = "localhost"): ApplicationEngine = embeddedServer(CIO, 8080) {
val sinFlow = MutableSharedFlow<MetaItem?>()// = device.sin.flow()
val cosFlow = MutableSharedFlow<MetaItem?>()// = device.cos.flow()
suspend fun MagixEndpoint<DeviceMessage>.startDemoDeviceServer(): ApplicationEngine =
embeddedServer(CIO, 9090) {
install(WebSockets)
install(RSocketSupport)
launch {
val endpoint = MagixEndpoint.rSocketWithWebSockets(magixHost, DeviceMessage.serializer())
endpoint.subscribe().collect { magix ->
(magix.payload as? PropertyChangedMessage)?.let { message ->
when (message.property) {
"sin" -> sinFlow.emit(message.value)
"cos" -> cosFlow.emit(message.value)
install(CORS) {
anyHost()
}
val sinFlow = MutableSharedFlow<Meta?>()// = device.sin.flow()
val cosFlow = MutableSharedFlow<Meta?>()// = device.cos.flow()
launch {
subscribe().collect { magix ->
(magix.payload as? PropertyChangedMessage)?.let { message ->
when (message.property) {
"sin" -> sinFlow.emit(message.value)
"cos" -> cosFlow.emit(message.value)
}
}
}
}
}
plotlyModule("plots").apply {
updateMode = PlotlyUpdateMode.PUSH
updateInterval = 50
}.page { container ->
val sinCosFlow = sinFlow.zip(cosFlow) { sin, cos ->
sin.double!! to cos.double!!
}
link {
rel = "stylesheet"
href = "https://stackpath.bootstrapcdn.com/bootstrap/4.5.0/css/bootstrap.min.css"
attributes["integrity"] = "sha384-9aIt2nRpC12Uk9gS9baDl411NQApFmC26EwAOH8WgZl5MYYxFfc+NcPb1dKGj7Sk"
attributes["crossorigin"] = "anonymous"
}
div("row") {
div("col-6") {
plot(renderer = container) {
layout {
title = "sin property"
xaxis.title = "point index"
yaxis.title = "sin"
plotlyModule().apply {
updateMode = PlotlyUpdateMode.PUSH
updateInterval = 50
}.page { container ->
val sinCosFlow = sinFlow.zip(cosFlow) { sin, cos ->
sin.double!! to cos.double!!
}
link {
rel = "stylesheet"
href = "https://stackpath.bootstrapcdn.com/bootstrap/4.5.0/css/bootstrap.min.css"
attributes["integrity"] = "sha384-9aIt2nRpC12Uk9gS9baDl411NQApFmC26EwAOH8WgZl5MYYxFfc+NcPb1dKGj7Sk"
attributes["crossorigin"] = "anonymous"
}
div("row") {
div("col-6") {
plot(renderer = container) {
layout {
title = "sin property"
xaxis.title = "point index"
yaxis.title = "sin"
}
trace {
launch {
val flow: Flow<Iterable<Double>> = sinFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
}
}
}
trace {
launch {
val flow: Flow<Iterable<Double>> = sinFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
}
div("col-6") {
plot(renderer = container) {
layout {
title = "cos property"
xaxis.title = "point index"
yaxis.title = "cos"
}
trace {
launch {
val flow: Flow<Iterable<Double>> = cosFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
}
}
}
}
}
div("col-6") {
plot(renderer = container) {
layout {
title = "cos property"
xaxis.title = "point index"
yaxis.title = "cos"
}
trace {
launch {
val flow: Flow<Iterable<Double>> = cosFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
div("row") {
div("col-12") {
plot(renderer = container) {
layout {
title = "cos vs sin"
xaxis.title = "sin"
yaxis.title = "cos"
}
trace {
name = "non-synchronized"
launch {
val flow: Flow<Iterable<Pair<Double, Double>>> = sinCosFlow.windowed(30)
updateXYFrom(flow)
}
}
}
}
}
}
div("row") {
div("col-12") {
plot(renderer = container) {
layout {
title = "cos vs sin"
xaxis.title = "sin"
yaxis.title = "cos"
}
trace {
name = "non-synchronized"
launch {
val flow: Flow<Iterable<Pair<Double, Double>>> = sinCosFlow.windowed(30)
updateXYFrom(flow)
}
}
}
}
}
}
}
}.apply { start() }

View File

@ -1,5 +1,6 @@
plugins {
id("ru.mipt.npm.gradle.jvm")
application
}
@ -12,4 +13,8 @@ dependencies{
kotlin{
explicitApi = null
}
application{
mainClass.set("ZmqKt")
}

View File

@ -9,6 +9,8 @@ import io.rsocket.kotlin.payload.buildPayload
import io.rsocket.kotlin.payload.data
import io.rsocket.kotlin.transport.ktor.client.RSocketSupport
import io.rsocket.kotlin.transport.ktor.client.rSocket
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
@ -36,7 +38,7 @@ public class RSocketMagixEndpoint<T>(
val flow = rSocket.requestStream(payload)
return flow.map {
MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText())
}.flowOn(coroutineContext)
}.flowOn(coroutineContext[CoroutineDispatcher]?:Dispatchers.Unconfined)
}
override suspend fun broadcast(message: MagixMessage<T>) {

View File

@ -2,9 +2,9 @@ package ru.mipt.npm.magix.rsocket
import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.SocketOptions
import io.ktor.network.sockets.aSocket
import io.ktor.util.InternalAPI
import io.rsocket.kotlin.core.RSocketConnectorBuilder
import io.rsocket.kotlin.transport.ktor.clientTransport
import io.rsocket.kotlin.transport.ktor.TcpClientTransport
import kotlinx.coroutines.Dispatchers
import kotlinx.serialization.KSerializer
import ru.mipt.npm.magix.api.MagixEndpoint
@ -14,6 +14,7 @@ import kotlin.coroutines.coroutineContext
/**
* Create a plain TCP based [RSocketMagixEndpoint] connected to [host] and [port]
*/
@OptIn(InternalAPI::class)
public suspend fun <T> MagixEndpoint.Companion.rSocketWithTcp(
host: String,
payloadSerializer: KSerializer<T>,
@ -21,7 +22,12 @@ public suspend fun <T> MagixEndpoint.Companion.rSocketWithTcp(
tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
): RSocketMagixEndpoint<T> {
val transport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().clientTransport(host, port, tcpConfig)
val transport = TcpClientTransport(
ActorSelectorManager(Dispatchers.IO),
hostname = host,
port = port,
configure = tcpConfig
)
val rSocket = buildConnector(rSocketConfig).connect(transport)
return RSocketMagixEndpoint(payloadSerializer, rSocket, coroutineContext)

View File

@ -2,12 +2,12 @@ package ru.mipt.npm.magix.server
import io.ktor.application.Application
import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.aSocket
import io.ktor.server.cio.CIO
import io.ktor.server.engine.ApplicationEngine
import io.ktor.server.engine.embeddedServer
import io.ktor.util.InternalAPI
import io.rsocket.kotlin.core.RSocketServer
import io.rsocket.kotlin.transport.ktor.serverTransport
import io.rsocket.kotlin.transport.ktor.TcpServerTransport
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
@ -20,11 +20,12 @@ import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT
/**
* Raw TCP magix server
*/
@OptIn(InternalAPI::class)
public fun CoroutineScope.launchMagixServerRawRSocket(
magixFlow: MutableSharedFlow<GenericMagixMessage>,
rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT
): Job {
val tcpTransport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().serverTransport(port = rawSocketPort)
val tcpTransport = TcpServerTransport(ActorSelectorManager(Dispatchers.IO), port = rawSocketPort)
val rSocketJob = RSocketServer().bind(tcpTransport, magixAcceptor(magixFlow))
coroutineContext[Job]?.invokeOnCompletion {
rSocketJob.cancel()

View File

@ -14,6 +14,7 @@ import ru.mipt.npm.magix.api.MagixMessage
import ru.mipt.npm.magix.api.MagixMessageFilter
import ru.mipt.npm.magix.api.filter
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
public class ZmqMagixEndpoint<T>(
private val host: String,
@ -53,7 +54,9 @@ public class ZmqMagixEndpoint<T>(
}
}
}
}.filter(filter).flowOn(coroutineContext) //should be flown on IO because of blocking calls
}.filter(filter).flowOn(
coroutineContext[CoroutineDispatcher] ?: Dispatchers.IO
) //should be flown on IO because of blocking calls
}
private val publishSocket by lazy {
@ -70,4 +73,17 @@ public class ZmqMagixEndpoint<T>(
override fun close() {
zmqContext.close()
}
}
}
public suspend fun <T> MagixEndpoint.Companion.zmq(
host: String,
payloadSerializer: KSerializer<T>,
pubPort: Int = DEFAULT_MAGIX_ZMQ_PUB_PORT,
pullPort: Int = DEFAULT_MAGIX_ZMQ_PULL_PORT,
): ZmqMagixEndpoint<T> = ZmqMagixEndpoint(
host,
payloadSerializer,
pubPort,
pullPort,
coroutineContext = coroutineContext
)

View File

@ -2,19 +2,24 @@
package ru.mipt.npm.devices.pimotionmaster
import kotlinx.coroutines.*
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.flow.transformWhile
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withTimeout
import ru.mipt.npm.controls.api.DeviceHub
import ru.mipt.npm.controls.api.PropertyDescriptor
import ru.mipt.npm.controls.base.*
import ru.mipt.npm.controls.controllers.duration
import ru.mipt.npm.controls.ports.*
import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.double
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.values.asValue
import kotlin.collections.component1
@ -23,7 +28,6 @@ import kotlin.time.Duration
class PiMotionMasterDevice(
context: Context,
override val deviceName: String = "PiMotionMaster",
private val portFactory: PortFactory = KtorTcpPort,
) : DeviceBase(context), DeviceHub {
@ -48,7 +52,7 @@ class PiMotionMasterDevice(
}
//Update port
//address = portSpec.node
port = portFactory(portSpec.node!!, context)
port = portFactory(portSpec ?: Meta.EMPTY, context)
connected.updateLogical(true)
// connector.open()
//Initialize axes
@ -61,7 +65,7 @@ class PiMotionMasterDevice(
//re-define axes if needed
axes = ids.associateWith { Axis(it) }
}
ids.map { it.asValue() }.asValue().asMetaItem()
Meta(ids.map { it.asValue() }.asValue())
initialize()
failIfError()
}
@ -317,10 +321,10 @@ class PiMotionMasterDevice(
}
val move by acting {
val target = it.double ?: it.node["target"].double ?: error("Unacceptable target value $it")
val target = it.double ?: it?.get("target").double ?: error("Unacceptable target value $it")
closedLoop.write(true)
//optionally set velocity
it.node["velocity"].double?.let { v ->
it?.get("velocity").double?.let { v ->
velocity.write(v)
}
targetPosition.write(target)
@ -332,7 +336,7 @@ class PiMotionMasterDevice(
}
suspend fun move(target: Double) {
move(target.asMetaItem())
move(target.asMeta())
}
}

View File

@ -3,7 +3,7 @@ rootProject.name = "controls-kt"
enableFeaturePreview("TYPESAFE_PROJECT_ACCESSORS")
pluginManagement {
val toolsVersion = "0.10.0"
val toolsVersion = "0.10.2"
repositories {
maven("https://repo.kotlin.link")