Merge branch 'specialized-messages' into dev

# Conflicts:
#	build.gradle.kts
#	dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt
#	dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt
#	magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt
This commit is contained in:
Alexander Nozik 2021-01-10 18:45:19 +03:00
commit 2b4503c2fa
29 changed files with 498 additions and 459 deletions

View File

@ -4,20 +4,19 @@ plugins {
kotlin("js") apply false
}
val dataforgeVersion: String by extra("0.2.0-dev-4")
val ktorVersion: String by extra("1.4.1")
val rsocketVersion by extra("0.11.1")
val dataforgeVersion: String by extra("0.2.1-dev-2")
val ktorVersion: String by extra("1.5.0")
val rsocketVersion by extra("0.12.0")
allprojects {
repositories {
mavenLocal()
maven("https://dl.bintray.com/pdvrieze/maven")
maven("http://maven.jzy3d.org/releases")
//maven("https://dl.bintray.com/pdvrieze/maven")
//maven("http://maven.jzy3d.org/releases")
maven("https://kotlin.bintray.com/js-externals")
maven("https://maven.pkg.github.com/altavir/kotlin-logging/")
maven("https://dl.bintray.com/rsocket-admin/RSocket")
maven("https://maven.pkg.github.com/altavir/ktor-client-sse")
// maven("https://oss.jfrog.org/oss-snapshot-local")
//maven("https://dl.bintray.com/rsocket-admin/RSocket")
//maven("https://maven.pkg.github.com/altavir/ktor-client-sse")
}
group = "hep.dataforge"
@ -25,7 +24,7 @@ allprojects {
}
ksciencePublish {
githubProject = "dataforge-control"
githubProject = "controls.kt"
bintrayRepo = "dataforge"
}

View File

@ -2,13 +2,12 @@ package hep.dataforge.control.api
import hep.dataforge.context.ContextAware
import hep.dataforge.control.api.Device.Companion.DEVICE_TARGET
import hep.dataforge.io.Envelope
import hep.dataforge.io.EnvelopeBuilder
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaItem
import hep.dataforge.provider.Type
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.io.Closeable
/**
@ -28,27 +27,15 @@ public interface Device : Closeable, ContextAware {
public val actionDescriptors: Collection<ActionDescriptor>
/**
* The supervisor scope encompassing all operations on a device. When canceled, cancels all running processes
* The supervisor scope encompassing all operations on a device. When canceled, cancels all running processes.
*/
public val scope: CoroutineScope
/**
* Register a new property change listener for this device.
* [owner] is provided optionally in order for listener to be
* easily removable
*/
public fun registerListener(listener: DeviceListener, owner: Any? = listener)
/**
* Remove all listeners belonging to the specified owner
*/
public fun removeListeners(owner: Any?)
/**
* Get the value of the property or throw error if property in not defined.
* Suspend if property value is not available
*/
public suspend fun getProperty(propertyName: String): MetaItem<*>
public suspend fun getProperty(propertyName: String): MetaItem<*>?
/**
* Invalidate property and force recalculate
@ -61,11 +48,16 @@ public interface Device : Closeable, ContextAware {
*/
public suspend fun setProperty(propertyName: String, value: MetaItem<*>)
/**
* The [SharedFlow] of property changes
*/
public val propertyFlow: SharedFlow<Pair<String, MetaItem<*>>>
/**
* Send an action request and suspend caller while request is being processed.
* Could return null if request does not return a meaningful answer.
*/
public suspend fun execute(command: String, argument: MetaItem<*>? = null): MetaItem<*>?
public suspend fun execute(action: String, argument: MetaItem<*>? = null): MetaItem<*>?
override fun close() {
scope.cancel("The device is closed")
@ -76,14 +68,10 @@ public interface Device : Closeable, ContextAware {
}
}
public interface ResponderDevice{
/**
*
* A request with binary data or for binary response (or both). This request does not cover basic functionality like
* [setProperty], [getProperty] or [execute] and not defined for a generic device.
*
*/
public suspend fun respondWithData(request: Envelope): EnvelopeBuilder
public suspend fun Device.getState(): Meta = Meta{
for(descriptor in propertyDescriptors) {
descriptor.name put getProperty(descriptor.name)
}
}
public suspend fun Device.execute(name: String, meta: Meta?): MetaItem<*>? = execute(name, meta?.let { MetaItem.NodeItem(it) })
//public suspend fun Device.execute(name: String, meta: Meta?): MetaItem<*>? = execute(name, meta?.let { MetaItem.NodeItem(it) })

View File

@ -1,14 +0,0 @@
package hep.dataforge.control.api
import hep.dataforge.meta.MetaItem
/**
* PropertyChangeListener Interface
* [value] is a new value that property has after a change; null is for invalid state.
*/
public interface DeviceListener {
public fun propertyChanged(propertyName: String, value: MetaItem<*>?)
public fun actionExecuted(action: String, argument: MetaItem<*>?, result: MetaItem<*>?) {}
//TODO add general message listener method
}

View File

@ -3,18 +3,25 @@ package hep.dataforge.control.base
import hep.dataforge.context.Context
import hep.dataforge.control.api.ActionDescriptor
import hep.dataforge.control.api.Device
import hep.dataforge.control.api.DeviceListener
import hep.dataforge.control.api.PropertyDescriptor
import hep.dataforge.meta.DFExperimental
import hep.dataforge.meta.MetaItem
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
//TODO move to DataForge-core
@DFExperimental
public data class LogEntry(val content: String, val priority: Int = 0)
/**
* Baseline implementation of [Device] interface
*/
@Suppress("EXPERIMENTAL_API_USAGE")
public abstract class DeviceBase(override val context: Context) : Device {
private val _properties = HashMap<String, ReadOnlyDeviceProperty>()
@ -22,25 +29,21 @@ public abstract class DeviceBase(override val context: Context) : Device {
private val _actions = HashMap<String, DeviceAction>()
public val actions: Map<String, DeviceAction> get() = _actions
private val listeners = ArrayList<Pair<Any?, DeviceListener>>(4)
private val sharedPropertyFlow = MutableSharedFlow<Pair<String, MetaItem<*>>>()
override fun registerListener(listener: DeviceListener, owner: Any?) {
listeners.add(owner to listener)
}
override val propertyFlow: SharedFlow<Pair<String, MetaItem<*>>> get() = sharedPropertyFlow
override fun removeListeners(owner: Any?) {
listeners.removeAll { it.first == owner }
}
private val sharedLogFlow = MutableSharedFlow<LogEntry>()
internal fun notifyListeners(block: DeviceListener.() -> Unit) {
listeners.forEach { it.second.block() }
}
/**
* The [SharedFlow] of log messages
*/
@DFExperimental
public val logFlow: SharedFlow<LogEntry>
get() = sharedLogFlow
public fun notifyPropertyChanged(propertyName: String) {
scope.launch {
val value = getProperty(propertyName)
notifyListeners { propertyChanged(propertyName, value) }
}
protected suspend fun log(message: String, priority: Int = 0) {
sharedLogFlow.emit(LogEntry(message, priority))
}
override val propertyDescriptors: Collection<PropertyDescriptor>
@ -72,8 +75,8 @@ public abstract class DeviceBase(override val context: Context) : Device {
)
}
override suspend fun execute(command: String, argument: MetaItem<*>?): MetaItem<*>? =
(_actions[command] ?: error("Request with name $command not defined")).invoke(argument)
override suspend fun execute(action: String, argument: MetaItem<*>?): MetaItem<*>? =
(_actions[action] ?: error("Request with name $action not defined")).invoke(argument)
@OptIn(ExperimentalCoroutinesApi::class)
private open inner class BasicReadOnlyDeviceProperty(
@ -94,8 +97,8 @@ public abstract class DeviceBase(override val context: Context) : Device {
override fun updateLogical(item: MetaItem<*>) {
state.value = item
notifyListeners {
propertyChanged(name, item)
scope.launch {
sharedPropertyFlow.emit(Pair(name, item))
}
}
@ -121,7 +124,7 @@ public abstract class DeviceBase(override val context: Context) : Device {
/**
* Create a bound read-only property with given [getter]
*/
public fun newReadOnlyProperty(
public fun createReadOnlyProperty(
name: String,
default: MetaItem<*>?,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
@ -178,7 +181,7 @@ public abstract class DeviceBase(override val context: Context) : Device {
/**
* Create a bound mutable property with given [getter] and [setter]
*/
public fun newMutableProperty(
internal fun createMutableProperty(
name: String,
default: MetaItem<*>?,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
@ -206,18 +209,14 @@ public abstract class DeviceBase(override val context: Context) : Device {
) : DeviceAction {
override suspend fun invoke(arg: MetaItem<*>?): MetaItem<*>? =
withContext(scope.coroutineContext + SupervisorJob(scope.coroutineContext[Job])) {
block(arg).also {
notifyListeners {
actionExecuted(name, arg, it)
}
}
block(arg)
}
}
/**
* Create a new bound action
*/
public fun newAction(
internal fun createAction(
name: String,
descriptorBuilder: ActionDescriptor.() -> Unit = {},
block: suspend (MetaItem<*>?) -> MetaItem<*>?,

View File

@ -24,7 +24,7 @@ private class ActionProvider<D : DeviceBase>(
) : PropertyDelegateProvider<D, ActionDelegate> {
override operator fun provideDelegate(thisRef: D, property: KProperty<*>): ActionDelegate {
val name = property.name
owner.newAction(name, descriptorBuilder, block)
owner.createAction(name, descriptorBuilder, block)
return owner.provideAction()
}
}

View File

@ -36,7 +36,7 @@ private class ReadOnlyDevicePropertyProvider<D : DeviceBase>(
override operator fun provideDelegate(thisRef: D, property: KProperty<*>): ReadOnlyPropertyDelegate {
val name = property.name
owner.newReadOnlyProperty(name, default, descriptorBuilder, getter)
owner.createReadOnlyProperty(name, default, descriptorBuilder, getter)
return owner.provideProperty(name)
}
}
@ -51,7 +51,7 @@ private class TypedReadOnlyDevicePropertyProvider<D : DeviceBase, T : Any>(
override operator fun provideDelegate(thisRef: D, property: KProperty<*>): TypedReadOnlyPropertyDelegate<T> {
val name = property.name
owner.newReadOnlyProperty(name, default, descriptorBuilder, getter)
owner.createReadOnlyProperty(name, default, descriptorBuilder, getter)
return owner.provideProperty(name, converter)
}
}
@ -178,7 +178,7 @@ private class DevicePropertyProvider<D : DeviceBase>(
override operator fun provideDelegate(thisRef: D, property: KProperty<*>): PropertyDelegate {
val name = property.name
owner.newMutableProperty(name, default, descriptorBuilder, getter, setter)
owner.createMutableProperty(name, default, descriptorBuilder, getter, setter)
return owner.provideMutableProperty(name)
}
}
@ -194,7 +194,7 @@ private class TypedDevicePropertyProvider<D : DeviceBase, T : Any>(
override operator fun provideDelegate(thisRef: D, property: KProperty<*>): TypedPropertyDelegate<T> {
val name = property.name
owner.newMutableProperty(name, default, descriptorBuilder, getter, setter)
owner.createMutableProperty(name, default, descriptorBuilder, getter, setter)
return owner.provideMutableProperty(name, converter)
}
}

View File

@ -1,63 +1,42 @@
package hep.dataforge.control.controllers
import hep.dataforge.control.api.*
import hep.dataforge.control.controllers.DeviceMessage.Companion.PROPERTY_CHANGED_ACTION
import hep.dataforge.io.Consumer
import hep.dataforge.io.Envelope
import hep.dataforge.io.Responder
import hep.dataforge.io.SimpleEnvelope
import hep.dataforge.meta.*
import hep.dataforge.control.api.Device
import hep.dataforge.control.api.DeviceHub
import hep.dataforge.control.api.get
import hep.dataforge.control.messages.*
import hep.dataforge.meta.DFExperimental
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaItem
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.launch
import kotlinx.io.Binary
import kotlinx.coroutines.flow.map
/**
* The [DeviceController] wraps device operations in [DeviceMessage]
*/
@OptIn(DFExperimental::class)
public class DeviceController(
public val device: Device,
public val deviceTarget: String,
public val scope: CoroutineScope = device.scope,
) : Responder, Consumer, DeviceListener {
public val deviceName: String,
) {
init {
device.registerListener(this, this)
private val propertyChanges = device.propertyFlow.map { (propertyName: String, value: MetaItem<*>) ->
PropertyChangedMessage(
sourceDevice = deviceName,
key = propertyName,
value = value,
)
}
private val outputChannel = Channel<Envelope>(Channel.CONFLATED)
/**
* The flow of outgoing messages
*/
public val messages: Flow<DeviceMessage> get() = propertyChanges
public suspend fun respondMessage(message: DeviceMessage): DeviceMessage =
respondMessage(device, deviceTarget, message)
respondMessage(device, deviceName, message)
override suspend fun respond(request: Envelope): Envelope = respond(device, deviceTarget, request)
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
if (value == null) return
scope.launch {
val change = DeviceMessage(
sourceName = deviceTarget,
action = PROPERTY_CHANGED_ACTION,
key = propertyName,
value = value,
)
val envelope = SimpleEnvelope(change.toMeta(), Binary.EMPTY)
outputChannel.send(envelope)
}
}
public fun receiving(): Flow<Envelope> = outputChannel.consumeAsFlow()
@DFExperimental
override fun consume(message: Envelope) {
// Fire the respond procedure and forget about the result
scope.launch {
respond(message)
}
}
public companion object {
public const val GET_PROPERTY_ACTION: String = "read"
@ -66,86 +45,99 @@ public class DeviceController(
public const val PROPERTY_LIST_ACTION: String = "propertyList"
public const val ACTION_LIST_ACTION: String = "actionList"
internal suspend fun respond(device: Device, deviceTarget: String, request: Envelope): Envelope {
val target = request.meta["target"].string
return try {
if (request.data == null) {
respondMessage(device, deviceTarget, DeviceMessage.fromMeta(request.meta)).toEnvelope()
} else if (target != null && target != deviceTarget) {
error("Wrong target name $deviceTarget expected but $target found")
} else {
if (device is ResponderDevice) {
val response = device.respondWithData(request).apply {
meta {
"target" put request.meta["source"].string
"source" put deviceTarget
}
}
response.seal()
} else error("Device does not support binary response")
}
} catch (ex: Exception) {
DeviceMessage.fail(ex).toEnvelope()
}
}
// internal suspend fun respond(device: Device, deviceTarget: String, request: Envelope): Envelope {
// val target = request.meta["target"].string
// return try {
// if (device is Responder) {
// device.respond(request)
// } else if (request.data == null) {
// respondMessage(device, deviceTarget, DeviceMessage.fromMeta(request.meta)).toEnvelope()
// } else if (target != null && target != deviceTarget) {
// error("Wrong target name $deviceTarget expected but $target found")
// } else error("Device does not support binary response")
// } catch (ex: Exception) {
// val requestSourceName = request.meta[DeviceMessage.SOURCE_KEY].string
// DeviceMessage.error(ex, sourceDevice = deviceTarget, targetDevice = requestSourceName).toEnvelope()
// }
// }
internal suspend fun respondMessage(
device: Device,
deviceTarget: String,
request: DeviceMessage,
): DeviceMessage = try {
val requestKey = request.key
val requestValue = request.value
var key: String? = null
var value: MetaItem<*>? = null
when (val action = request.action) {
GET_PROPERTY_ACTION -> {
key = requestKey
value = device.getProperty(requestKey ?: error("Key field is not defined in request"))
when (request) {
is PropertyGetMessage -> {
PropertyChangedMessage(
key = request.property,
value = device.getProperty(request.property),
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice
)
}
SET_PROPERTY_ACTION -> {
require(requestKey != null) { "Key field is not defined in request" }
if (requestValue == null) {
device.invalidateProperty(requestKey)
} else {
device.setProperty(requestKey, requestValue)
}
key = requestKey
value = device.getProperty(requestKey)
}
EXECUTE_ACTION -> {
require(requestKey != null) { "Key field is not defined in request" }
key = requestKey
value = device.execute(requestKey, requestValue)
is PropertySetMessage -> {
if (request.value == null) {
device.invalidateProperty(request.property)
} else {
device.setProperty(request.property, request.value)
}
PropertyChangedMessage(
key = request.property,
value = device.getProperty(request.property),
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice
)
}
PROPERTY_LIST_ACTION -> {
value = Meta {
device.propertyDescriptors.map { descriptor ->
descriptor.name put descriptor.config
is ActionExecuteMessage -> {
ActionResultMessage(
action = request.action,
result = device.execute(request.action, request.argument),
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice
)
}
is GetDescriptionMessage -> {
val descriptionMeta = Meta {
"properties" put {
device.propertyDescriptors.map { descriptor ->
descriptor.name put descriptor.config
}
}
}.asMetaItem()
}
ACTION_LIST_ACTION -> {
value = Meta {
device.actionDescriptors.map { descriptor ->
descriptor.name put descriptor.config
"actions" put {
device.actionDescriptors.map { descriptor ->
descriptor.name put descriptor.config
}
}
}.asMetaItem()
}
DescriptionMessage(
description = descriptionMeta,
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice
)
}
else -> {
error("Unrecognized action $action")
is DescriptionMessage,
is PropertyChangedMessage,
is ActionResultMessage,
is BinaryNotificationMessage,
is DeviceErrorMessage,
is EmptyDeviceMessage,
is DeviceLogMessage,
-> {
//Those messages are ignored
EmptyDeviceMessage(
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice,
comment = "The message is ignored"
)
}
}
DeviceMessage(
targetName = request.sourceName,
sourceName = deviceTarget,
action = "response.${request.action}",
key = key,
value = value
)
} catch (ex: Exception) {
DeviceMessage.fail(ex, request.action).respondsTo(request)
DeviceMessage.error(ex, sourceDevice = deviceTarget, targetDevice = request.sourceDevice)
}
}
}
@ -153,10 +145,10 @@ public class DeviceController(
public suspend fun DeviceHub.respondMessage(request: DeviceMessage): DeviceMessage {
return try {
val targetName = request.targetName?.toName() ?: Name.EMPTY
val targetName = request.targetDevice?.toName() ?: Name.EMPTY
val device = this[targetName] ?: error("The device with name $targetName not found in $this")
DeviceController.respondMessage(device, targetName.toString(), request)
} catch (ex: Exception) {
DeviceMessage.fail(ex, request.action).respondsTo(request)
DeviceMessage.error(ex, sourceDevice = request.targetDevice, targetDevice = request.sourceDevice)
}
}

View File

@ -20,7 +20,7 @@ public class DeviceManager : AbstractPlugin(), DeviceHub {
override val devices: Map<NameToken, Device> get() = top
public val controller: HubController by lazy {
HubController(this, context)
HubController(this)
}
public fun registerDevice(name: NameToken, device: Device) {

View File

@ -1,61 +0,0 @@
package hep.dataforge.control.controllers
import hep.dataforge.io.SimpleEnvelope
import hep.dataforge.meta.*
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.decodeFromJsonElement
import kotlinx.serialization.json.encodeToJsonElement
@Serializable
public data class DeviceMessage(
public val action: String,
public val status: String = OK_STATUS,
public val sourceName: String? = null,
public val targetName: String? = null,
public val comment: String? = null,
public val key: String? = null,
public val value: MetaItem<*>? = null,
) {
public companion object {
public val SOURCE_KEY: Name = DeviceMessage::sourceName.name.asName()
public val TARGET_KEY: Name = DeviceMessage::targetName.name.asName()
public val MESSAGE_ACTION_KEY: Name = DeviceMessage::action.name.asName()
public val MESSAGE_KEY_KEY: Name = DeviceMessage::key.name.asName()
public val MESSAGE_VALUE_KEY: Name = DeviceMessage::value.name.asName()
public const val OK_STATUS: String = "OK"
public const val FAIL_STATUS: String = "FAIL"
public const val PROPERTY_CHANGED_ACTION: String = "event.propertyChanged"
private fun Throwable.toMeta(): Meta = Meta {
"type" put this::class.simpleName
"message" put message
"trace" put stackTraceToString()
}
public fun fail(
cause: Throwable,
action: String = "undefined",
): DeviceMessage = DeviceMessage(
action = action,
status = FAIL_STATUS,
value = cause.toMeta().asMetaItem()
)
public fun fromMeta(meta: Meta): DeviceMessage = Json.decodeFromJsonElement(meta.toJson())
}
}
public fun DeviceMessage.ok(): DeviceMessage =
copy(status = DeviceMessage.OK_STATUS)
public fun DeviceMessage.respondsTo(request: DeviceMessage): DeviceMessage =
copy(sourceName = request.targetName, targetName = request.sourceName)
public fun DeviceMessage.toMeta(): JsonMeta = Json.encodeToJsonElement(this).toMetaItem().node!!
public fun DeviceMessage.toEnvelope(): SimpleEnvelope = SimpleEnvelope(toMeta(), null)

View File

@ -1,89 +1,79 @@
package hep.dataforge.control.controllers
import hep.dataforge.control.api.DeviceHub
import hep.dataforge.control.api.DeviceListener
import hep.dataforge.control.api.get
import hep.dataforge.io.Consumer
import hep.dataforge.io.Envelope
import hep.dataforge.io.Responder
import hep.dataforge.control.messages.DeviceMessage
import hep.dataforge.meta.DFExperimental
import hep.dataforge.meta.MetaItem
import hep.dataforge.meta.get
import hep.dataforge.meta.string
import hep.dataforge.names.Name
import hep.dataforge.names.NameToken
import hep.dataforge.names.toName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
@OptIn(DFExperimental::class)
public class HubController(
public val hub: DeviceHub,
public val scope: CoroutineScope,
) : Consumer, Responder {
) {
private val messageOutbox = Channel<DeviceMessage>(Channel.CONFLATED)
private val messageOutbox = MutableSharedFlow<DeviceMessage>()
// private val envelopeOutbox = Channel<Envelope>(Channel.CONFLATED)
private val envelopeOutbox = MutableSharedFlow<Envelope>()
public fun messageOutput(): Flow<DeviceMessage> = messageOutbox.consumeAsFlow()
public val messageOutput: SharedFlow<DeviceMessage> get() = messageOutbox
// public fun envelopeOutput(): Flow<Envelope> = envelopeOutbox.consumeAsFlow()
public val envelopeOutput: SharedFlow<Envelope> get() = envelopeOutbox
// private val packJob = scope.launch {
// while (isActive) {
// val message = messageOutbox.receive()
// envelopeOutbox.send(message.toEnvelope())
// }
// }
private val packJob = messageOutbox.onEach { message ->
envelopeOutbox.emit(message.toEnvelope())
}.launchIn(scope)
private val listeners: Map<NameToken, DeviceListener> = hub.devices.mapValues { (name, device) ->
object : DeviceListener {
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
if (value == null) return
scope.launch {
val change = DeviceMessage(
sourceName = name.toString(),
action = DeviceMessage.PROPERTY_CHANGED_ACTION,
key = propertyName,
value = value
)
messageOutbox.emit(change)
}
}
}.also {
device.registerListener(it)
}
}
// private val listeners: Map<NameToken, DeviceListener> = hub.devices.mapValues { (deviceNameToken, device) ->
// object : DeviceListener {
// override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
// if (value == null) return
// scope.launch {
// val change = PropertyChangedMessage(
// sourceDevice = deviceNameToken.toString(),
// key = propertyName,
// value = value
// )
// messageOutbox.send(change)
// }
// }
// }.also {
// device.registerListener(it)
// }
// }
public suspend fun respondMessage(message: DeviceMessage): DeviceMessage = try {
val targetName = message.targetName?.toName() ?: Name.EMPTY
val targetName = message.targetDevice?.toName() ?: Name.EMPTY
val device = hub[targetName] ?: error("The device with name $targetName not found in $hub")
DeviceController.respondMessage(device, targetName.toString(), message)
} catch (ex: Exception) {
DeviceMessage.fail(ex, message.action).respondsTo(message)
}
override suspend fun respond(request: Envelope): Envelope = try {
val targetName = request.meta[DeviceMessage.TARGET_KEY].string?.toName() ?: Name.EMPTY
val device = hub[targetName] ?: error("The device with name $targetName not found in $hub")
if (request.data == null) {
DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.fromMeta(request.meta))
.toEnvelope()
} else {
DeviceController.respond(device, targetName.toString(), request)
}
} catch (ex: Exception) {
DeviceMessage.fail(ex).toEnvelope()
}
override fun consume(message: Envelope) {
// Fire the respond procedure and forget about the result
scope.launch {
respond(message)
}
DeviceMessage.error(ex, sourceDevice = null, targetDevice = message.sourceDevice)
}
//
// override suspend fun respond(request: Envelope): Envelope = try {
// val targetName = request.meta[DeviceMessage.TARGET_KEY].string?.toName() ?: Name.EMPTY
// val device = hub[targetName] ?: error("The device with name $targetName not found in $hub")
// if (request.data == null) {
// DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.fromMeta(request.meta))
// .toEnvelope()
// } else {
// DeviceController.respond(device, targetName.toString(), request)
// }
// } catch (ex: Exception) {
// DeviceMessage.error(ex, sourceDevice = null).toEnvelope()
// }
//
// override fun consume(message: Envelope) {
// // Fire the respond procedure and forget about the result
// scope.launch {
// respond(message)
// }
// }
}

View File

@ -1,27 +0,0 @@
package hep.dataforge.control.controllers
import hep.dataforge.control.api.Device
import hep.dataforge.control.api.DeviceListener
import hep.dataforge.meta.MetaItem
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.launch
@OptIn(ExperimentalCoroutinesApi::class)
public suspend fun Device.flowValues(): Flow<Pair<String, MetaItem<*>>> = callbackFlow {
val listener = object : DeviceListener {
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
if (value != null) {
launch {
send(propertyName to value)
}
}
}
}
registerListener(listener)
awaitClose {
removeListeners(listener)
}
}

View File

@ -0,0 +1,159 @@
package hep.dataforge.control.messages
import hep.dataforge.io.SimpleEnvelope
import hep.dataforge.meta.*
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.decodeFromJsonElement
import kotlinx.serialization.json.encodeToJsonElement
@Serializable
public sealed class DeviceMessage {
public abstract val sourceDevice: String?
public abstract val targetDevice: String?
public abstract val comment: String?
public companion object {
public fun error(
cause: Throwable,
sourceDevice: String?,
targetDevice: String? = null,
): DeviceErrorMessage = DeviceErrorMessage(
errorMessage = cause.message,
errorType = cause::class.simpleName,
errorStackTrace = cause.stackTraceToString()
)
public fun fromMeta(meta: Meta): DeviceMessage = Json.decodeFromJsonElement(meta.toJson())
}
}
@Serializable
@SerialName("property.changed")
public data class PropertyChangedMessage(
public val key: String,
public val value: MetaItem<*>?,
override val sourceDevice: String,
override val targetDevice: String? = null,
override val comment: String? = null,
) : DeviceMessage()
@Serializable
@SerialName("property.set")
public data class PropertySetMessage(
public val property: String,
public val value: MetaItem<*>?,
override val sourceDevice: String? = null,
override val targetDevice: String,
override val comment: String? = null,
) : DeviceMessage()
@Serializable
@SerialName("property.get")
public data class PropertyGetMessage(
public val property: String,
override val sourceDevice: String? = null,
override val targetDevice: String,
override val comment: String? = null,
) : DeviceMessage()
/**
* Request device description
*/
@Serializable
@SerialName("description.get")
public data class GetDescriptionMessage(
override val sourceDevice: String? = null,
override val targetDevice: String,
override val comment: String? = null,
) : DeviceMessage()
/**
* The full device description message
*/
@Serializable
@SerialName("description")
public data class DescriptionMessage(
val description: Meta,
override val sourceDevice: String,
override val targetDevice: String? = null,
override val comment: String? = null,
) : DeviceMessage()
@Serializable
@SerialName("action.execute")
public data class ActionExecuteMessage(
public val action: String,
public val argument: MetaItem<*>?,
override val sourceDevice: String? = null,
override val targetDevice: String? = null,
override val comment: String? = null,
) : DeviceMessage()
@Serializable
@SerialName("action.result")
public data class ActionResultMessage(
public val action: String,
public val result: MetaItem<*>?,
override val sourceDevice: String? = null,
override val targetDevice: String? = null,
override val comment: String? = null,
) : DeviceMessage()
/**
* Notifies listeners that a new binary with given [binaryID] is available. The binary itself could not be provided via [DeviceMessage] API.
*/
@Serializable
@SerialName("binary.notification")
public data class BinaryNotificationMessage(
val binaryID: String,
override val sourceDevice: String,
override val targetDevice: String? = null,
override val comment: String? = null,
) : DeviceMessage()
/**
* The message states that the message is received, but no meaningful response is produced.
* This message could be used for a heartbeat.
*/
@Serializable
@SerialName("empty")
public data class EmptyDeviceMessage(
override val sourceDevice: String? = null,
override val targetDevice: String? = null,
override val comment: String? = null,
) : DeviceMessage()
/**
* Information log message
*/
@Serializable
@SerialName("log")
public data class DeviceLogMessage(
val message: String,
val data: MetaItem<*>? = null,
override val sourceDevice: String? = null,
override val targetDevice: String? = null,
override val comment: String? = null,
) : DeviceMessage()
/**
* The evaluation of the message produced a service error
*/
@Serializable
@SerialName("error")
public data class DeviceErrorMessage(
public val errorMessage: String?,
public val errorType: String? = null,
public val errorStackTrace: String? = null,
override val sourceDevice: String? = null,
override val targetDevice: String? = null,
override val comment: String? = null,
) : DeviceMessage()
public fun DeviceMessage.toMeta(): JsonMeta = Json.encodeToJsonElement(this).toMetaItem().node!!
public fun DeviceMessage.toEnvelope(): SimpleEnvelope = SimpleEnvelope(toMeta(), null)

View File

@ -3,10 +3,6 @@ plugins {
id("ru.mipt.npm.publish")
}
kscience {
useSerialization()
}
val dataforgeVersion: String by rootProject.extra
val ktorVersion: String by rootProject.extra

View File

@ -1,7 +1,7 @@
package hep.dataforge.control.server
import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.control.controllers.toMeta
import hep.dataforge.control.messages.DeviceMessage
import hep.dataforge.control.messages.toMeta
import hep.dataforge.io.*
import hep.dataforge.meta.MetaSerializer
import io.ktor.application.ApplicationCall
@ -30,15 +30,6 @@ internal suspend fun ApplicationCall.respondJson(builder: JsonObjectBuilder.() -
respondText(json.toString(), contentType = ContentType.Application.Json)
}
public suspend fun ApplicationCall.respondMessage(message: DeviceMessage) {
respondText(Json.encodeToString(MetaSerializer, message.toMeta()), contentType = ContentType.Application.Json)
}
//public suspend fun ApplicationCall.respondMessage(builder: DeviceMessage.() -> Unit) {
// respondMessage(DeviceMessage(builder))
//}
//
//public suspend fun ApplicationCall.respondFail(builder: DeviceMessage.() -> Unit) {
// respondMessage(DeviceMessage.fail(null, block = builder))
//}
}

View File

@ -4,11 +4,11 @@ package hep.dataforge.control.server
import hep.dataforge.control.api.get
import hep.dataforge.control.controllers.DeviceController.Companion.GET_PROPERTY_ACTION
import hep.dataforge.control.controllers.DeviceController.Companion.SET_PROPERTY_ACTION
import hep.dataforge.control.controllers.DeviceManager
import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.control.controllers.respondMessage
import hep.dataforge.control.messages.DeviceMessage
import hep.dataforge.control.messages.PropertyGetMessage
import hep.dataforge.control.messages.PropertySetMessage
import hep.dataforge.meta.toJson
import hep.dataforge.meta.toMeta
import hep.dataforge.meta.toMetaItem
@ -163,24 +163,24 @@ public fun Application.deviceModule(
}
}
}
//Check if application supports websockets and if it does add a push channel
if (this.application.featureOrNull(WebSockets) != null) {
webSocket("ws") {
//subscribe on device
val target: String? by call.request.queryParameters
try {
application.log.debug("Opened server socket for ${call.request.queryParameters}")
manager.controller.envelopeOutput.collect {
outgoing.send(it.toFrame())
}
} catch (ex: Exception) {
application.log.debug("Closed server socket for ${call.request.queryParameters}")
}
}
}
// //Check if application supports websockets and if it does add a push channel
// if (this.application.featureOrNull(WebSockets) != null) {
// webSocket("ws") {
// //subscribe on device
// val target: String? by call.request.queryParameters
//
// try {
// application.log.debug("Opened server socket for ${call.request.queryParameters}")
//
// manager.controller.envelopeOutput().collect {
// outgoing.send(it.toFrame())
// }
//
// } catch (ex: Exception) {
// application.log.debug("Closed server socket for ${call.request.queryParameters}")
// }
// }
// }
post("message") {
val body = call.receiveText()
@ -201,11 +201,10 @@ public fun Application.deviceModule(
get("get") {
val target: String by call.parameters
val property: String by call.parameters
val request = DeviceMessage(
action = GET_PROPERTY_ACTION,
sourceName = WEB_SERVER_TARGET,
targetName = target,
key = property,
val request = PropertyGetMessage(
sourceDevice = WEB_SERVER_TARGET,
targetDevice = target,
property = property,
)
val response = manager.respondMessage(request)
@ -217,11 +216,10 @@ public fun Application.deviceModule(
val body = call.receiveText()
val json = Json.parseToJsonElement(body)
val request = DeviceMessage(
action = SET_PROPERTY_ACTION,
sourceName = WEB_SERVER_TARGET,
targetName = target,
key = property,
val request = PropertySetMessage(
sourceDevice = WEB_SERVER_TARGET,
targetDevice = target,
property = property,
value = json.toMetaItem()
)

View File

@ -2,13 +2,8 @@ plugins {
id("ru.mipt.npm.mpp")
}
val ktorVersion: String by rootProject.extra
kscience{
useCoroutines()
}
kotlin {
sourceSets {
commonMain {

View File

@ -5,6 +5,12 @@ plugins {
val ktorVersion: String by rootProject.extra
kscience{
useSerialization {
json()
}
}
kotlin {
sourceSets {
commonMain {

View File

@ -1,8 +1,8 @@
package hep.dataforge.control.client
import hep.dataforge.control.controllers.DeviceManager
import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.control.controllers.respondMessage
import hep.dataforge.control.messages.DeviceMessage
import hep.dataforge.magix.api.MagixEndpoint
import hep.dataforge.magix.api.MagixMessage
import kotlinx.coroutines.Job

View File

@ -22,7 +22,9 @@ dependencies{
implementation(project(":dataforge-magix-client"))
implementation("no.tornado:tornadofx:1.7.20")
implementation(kotlin("stdlib-jdk8"))
implementation("kscience.plotlykt:plotlykt-server:0.3.0-dev-2")
implementation("kscience.plotlykt:plotlykt-server:0.3.0")
implementation("com.github.Ricky12Awesome:json-schema-serialization:0.6.6")
}
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach {
@ -37,5 +39,5 @@ javafx{
}
application{
mainClassName = "hep.dataforge.control.demo.DemoControllerViewKt"
mainClass.set("hep.dataforge.control.demo.DemoControllerViewKt")
}

View File

@ -0,0 +1,10 @@
package hep.dataforge.control.demo
import com.github.ricky12awesome.jss.encodeToSchema
import com.github.ricky12awesome.jss.globalJson
import hep.dataforge.control.messages.DeviceMessage
fun main() {
val schema = globalJson.encodeToSchema(DeviceMessage.serializer(), generateDefinitions = false)
println(schema)
}

View File

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.7-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.8-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

View File

@ -4,10 +4,10 @@ plugins {
}
kscience {
useCoroutines()
useSerialization{
json()
}
useCoroutines("1.4.0", configuration = ru.mipt.npm.gradle.DependencyConfiguration.API)
}
val dataforgeVersion: String by rootProject.extra

View File

@ -6,8 +6,8 @@ import kotlinx.serialization.Serializable
@Serializable
public data class MagixMessageFilter(
val format: List<String>? = null,
val origin: List<String>? = null,
val format: List<String?>? = null,
val origin: List<String?>? = null,
val target: List<String?>? = null,
val action: List<String?>? = null,
) {

View File

@ -4,36 +4,57 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.JsonElement
public interface MagixProcessor {
public fun interface MagixProcessor {
public fun process(endpoint: MagixEndpoint): Job
}
/**
* A converter from one (or several) format to another. It captures all events with the given filter then transforms it
* with given [transformer] and sends back to the loop with given [outputFormat].
*
* If [newOrigin] is not null, it is used as a replacement for old [MagixMessage.origin] tag.
*/
public class MagixConverter(
private val scope: CoroutineScope,
private val filter: MagixMessageFilter,
private val outputFormat: String,
private val newOrigin: String? = null,
private val transformer: suspend (JsonElement) -> JsonElement,
) : MagixProcessor {
override fun process(endpoint: MagixEndpoint): Job = scope.launch {
endpoint.subscribe(filter).onEach { message ->
val newPayload = transformer(message.payload)
val transformed = message.copy(
payload = newPayload,
format = outputFormat,
origin = newOrigin ?: message.origin
)
endpoint.broadcast(transformed)
}.launchIn(this)
//TODO add catch logic here
public companion object {
/**
* A converter from one (or several) format to another. It captures all events with the given filter then transforms it
* with given [transformer] and sends back to the loop with given [outputFormat].
*
* If [newOrigin] is not null, it is used as a replacement for old [MagixMessage.origin] tag.
*/
public fun <T : Any, R : Any> convert(
scope: CoroutineScope,
filter: MagixMessageFilter,
outputFormat: String,
inputSerializer: KSerializer<T>,
outputSerializer: KSerializer<R>,
newOrigin: String? = null,
transformer: suspend (T) -> R,
): MagixProcessor = MagixProcessor { endpoint ->
endpoint.subscribe(inputSerializer, filter).onEach { message ->
val newPayload = transformer(message.payload)
val transformed: MagixMessage<R> = MagixMessage(
outputFormat,
newOrigin ?: message.origin,
newPayload,
message.target,
message.id,
message.parentId,
message.user
)
endpoint.broadcast(outputSerializer, transformed)
}.launchIn(scope)
}
}
}
public fun convert(
scope: CoroutineScope,
filter: MagixMessageFilter,
outputFormat: String,
newOrigin: String? = null,
transformer: suspend (JsonElement) -> JsonElement,
): MagixProcessor = convert(
scope,
filter,
outputFormat,
JsonElement.serializer(),
JsonElement.serializer(),
newOrigin,
transformer
)
}

View File

@ -52,7 +52,7 @@ internal fun CoroutineScope.magixAcceptor(magixFlow: MutableSharedFlow<GenericMa
magixFlow.emit(message)
}
// bi-directional connection
requestChannel { input: Flow<Payload> ->
requestChannel { _: Payload, input: Flow<Payload> ->
input.onEach {
magixFlow.emit(magixJson.decodeFromString(genericMessageSerializer, it.data.readText()))
}.launchIn(this@magixAcceptor)

View File

@ -7,7 +7,6 @@ kscience {
useSerialization{
json()
}
useCoroutines("1.4.1", configuration = ru.mipt.npm.gradle.DependencyConfiguration.API)
}
val dataforgeVersion: String by rootProject.extra

View File

@ -13,15 +13,16 @@ import io.rsocket.kotlin.payload.buildPayload
import io.rsocket.kotlin.payload.data
import io.rsocket.kotlin.transport.ktor.client.RSocketSupport
import io.rsocket.kotlin.transport.ktor.client.rSocket
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import kotlinx.serialization.KSerializer
import kotlinx.serialization.encodeToString
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
public class RSocketMagixEndpoint(
override val scope: CoroutineScope,
private val coroutineContext: CoroutineContext,
private val rSocket: RSocket,
) : MagixEndpoint {
@ -30,19 +31,15 @@ public class RSocketMagixEndpoint(
filter: MagixMessageFilter,
): Flow<MagixMessage<T>> {
val serializer = MagixMessage.serializer(payloadSerializer)
val payload = buildPayload {
data(MagixEndpoint.magixJson.encodeToString(filter))
}
val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(filter)) }
val flow = rSocket.requestStream(payload)
return flow.map { MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) }
}
override suspend fun <T> broadcast(payloadSerializer: KSerializer<T>, message: MagixMessage<T>) {
scope.launch {
withContext(coroutineContext) {
val serializer = MagixMessage.serializer(payloadSerializer)
val payload = buildPayload {
data(MagixEndpoint.magixJson.encodeToString(serializer, message))
}
val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(serializer, message)) }
rSocket.fireAndForget(payload)
}
}
@ -57,7 +54,6 @@ public class RSocketMagixEndpoint(
@OptIn(KtorExperimentalAPI::class)
public suspend fun withWebSockets(
scope: CoroutineScope,
host: String,
port: Int,
path: String = "/rsocket",
@ -77,8 +73,7 @@ public class RSocketMagixEndpoint(
client.close()
}
return RSocketMagixEndpoint(scope, rSocket)
return RSocketMagixEndpoint(coroutineContext, rSocket)
}
}
}
}

View File

@ -1,5 +1,3 @@
import ru.mipt.npm.gradle.useFx
plugins {
id("ru.mipt.npm.jvm")
id("ru.mipt.npm.publish")
@ -9,11 +7,14 @@ plugins {
//TODO to be moved to a separate project
application{
mainClassName = "ru.mipt.npm.devices.pimotionmaster.PiMotionMasterAppKt"
mainClass.set("ru.mipt.npm.devices.pimotionmaster.PiMotionMasterAppKt")
}
kotlin{
explicitApi = null
}
kscience{
useFx(ru.mipt.npm.gradle.FXModule.CONTROLS, configuration = ru.mipt.npm.gradle.DependencyConfiguration.IMPLEMENTATION)
}

View File

@ -1,6 +1,6 @@
pluginManagement {
val kotlinVersion = "1.4.20-M2"
val toolsVersion = "0.6.4-dev-1.4.20-M2"
val kotlinVersion = "1.4.21"
val toolsVersion = "0.7.1"
repositories {
mavenLocal()
@ -24,20 +24,20 @@ pluginManagement {
}
}
rootProject.name = "dataforge-control"
rootProject.name = "controls.kt"
include(
":dataforge-device-core",
":dataforge-device-tcp",
":dataforge-device-serial",
":dataforge-device-server",
":dataforge-magix-client",
":motors",
":demo",
":magix",
":magix:magix-api",
":magix:magix-server",
":magix:magix-service"
":magix:magix-service",
":dataforge-magix-client",
":motors"
)
//includeBuild("../dataforge-core")