Message class hierarchy.

This commit is contained in:
Alexander Nozik 2020-12-02 12:35:16 +03:00
parent 92ab801967
commit 17beb29217
18 changed files with 239 additions and 167 deletions

View File

@ -4,8 +4,8 @@ plugins {
kotlin("js") apply false
}
val dataforgeVersion: String by extra("0.2.0-dev-4")
val ktorVersion: String by extra("1.4.1")
val dataforgeVersion: String by extra("0.2.0")
val ktorVersion: String by extra("1.4.3")
val rsocketVersion by extra("0.11.1")
allprojects {

View File

@ -121,7 +121,7 @@ public abstract class DeviceBase(override val context: Context) : Device {
/**
* Create a bound read-only property with given [getter]
*/
public fun newReadOnlyProperty(
public fun createReadOnlyProperty(
name: String,
default: MetaItem<*>?,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
@ -178,7 +178,7 @@ public abstract class DeviceBase(override val context: Context) : Device {
/**
* Create a bound mutable property with given [getter] and [setter]
*/
public fun newMutableProperty(
internal fun createMutableProperty(
name: String,
default: MetaItem<*>?,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
@ -217,7 +217,7 @@ public abstract class DeviceBase(override val context: Context) : Device {
/**
* Create a new bound action
*/
public fun newAction(
internal fun createAction(
name: String,
descriptorBuilder: ActionDescriptor.() -> Unit = {},
block: suspend (MetaItem<*>?) -> MetaItem<*>?,

View File

@ -24,7 +24,7 @@ private class ActionProvider<D : DeviceBase>(
) : PropertyDelegateProvider<D, ActionDelegate> {
override operator fun provideDelegate(thisRef: D, property: KProperty<*>): ActionDelegate {
val name = property.name
owner.newAction(name, descriptorBuilder, block)
owner.createAction(name, descriptorBuilder, block)
return owner.provideAction()
}
}

View File

@ -36,7 +36,7 @@ private class ReadOnlyDevicePropertyProvider<D : DeviceBase>(
override operator fun provideDelegate(thisRef: D, property: KProperty<*>): ReadOnlyPropertyDelegate {
val name = property.name
owner.newReadOnlyProperty(name, default, descriptorBuilder, getter)
owner.createReadOnlyProperty(name, default, descriptorBuilder, getter)
return owner.provideProperty(name)
}
}
@ -51,7 +51,7 @@ private class TypedReadOnlyDevicePropertyProvider<D : DeviceBase, T : Any>(
override operator fun provideDelegate(thisRef: D, property: KProperty<*>): TypedReadOnlyPropertyDelegate<T> {
val name = property.name
owner.newReadOnlyProperty(name, default, descriptorBuilder, getter)
owner.createReadOnlyProperty(name, default, descriptorBuilder, getter)
return owner.provideProperty(name, converter)
}
}
@ -178,7 +178,7 @@ private class DevicePropertyProvider<D : DeviceBase>(
override operator fun provideDelegate(thisRef: D, property: KProperty<*>): PropertyDelegate {
val name = property.name
owner.newMutableProperty(name, default, descriptorBuilder, getter, setter)
owner.createMutableProperty(name, default, descriptorBuilder, getter, setter)
return owner.provideMutableProperty(name)
}
}
@ -194,7 +194,7 @@ private class TypedDevicePropertyProvider<D : DeviceBase, T : Any>(
override operator fun provideDelegate(thisRef: D, property: KProperty<*>): TypedPropertyDelegate<T> {
val name = property.name
owner.newMutableProperty(name, default, descriptorBuilder, getter, setter)
owner.createMutableProperty(name, default, descriptorBuilder, getter, setter)
return owner.provideMutableProperty(name, converter)
}
}

View File

@ -1,11 +1,7 @@
package hep.dataforge.control.controllers
import hep.dataforge.control.api.*
import hep.dataforge.control.messages.DeviceMessage
import hep.dataforge.control.messages.DeviceMessage.Companion.PROPERTY_CHANGED_ACTION
import hep.dataforge.control.messages.respondsTo
import hep.dataforge.control.messages.toEnvelope
import hep.dataforge.control.messages.toMeta
import hep.dataforge.control.messages.*
import hep.dataforge.io.Consumer
import hep.dataforge.io.Envelope
import hep.dataforge.io.Responder
@ -23,7 +19,7 @@ import kotlinx.io.Binary
@OptIn(DFExperimental::class)
public class DeviceController(
public val device: Device,
public val deviceTarget: String,
public val deviceName: String,
public val scope: CoroutineScope = device.scope,
) : Responder, Consumer, DeviceListener {
@ -34,16 +30,15 @@ public class DeviceController(
private val outputChannel = Channel<Envelope>(Channel.CONFLATED)
public suspend fun respondMessage(message: DeviceMessage): DeviceMessage =
respondMessage(device, deviceTarget, message)
respondMessage(device, deviceName, message)
override suspend fun respond(request: Envelope): Envelope = respond(device, deviceTarget, request)
override suspend fun respond(request: Envelope): Envelope = respond(device, deviceName, request)
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
if (value == null) return
scope.launch {
val change = DeviceMessage(
sourceName = deviceTarget,
action = PROPERTY_CHANGED_ACTION,
val change = PropertyChangedMessage(
sourceDevice = deviceName,
key = propertyName,
value = value,
)
@ -89,7 +84,8 @@ public class DeviceController(
} else error("Device does not support binary response")
}
} catch (ex: Exception) {
DeviceMessage.error(ex).toEnvelope()
val requestSourceName = request.meta[DeviceMessage.SOURCE_KEY].string
DeviceMessage.error(ex, sourceDevice = deviceTarget, targetDevice = requestSourceName).toEnvelope()
}
}
@ -98,58 +94,68 @@ public class DeviceController(
deviceTarget: String,
request: DeviceMessage,
): DeviceMessage = try {
val requestKey = request.key
val requestValue = request.value
var key: String? = null
var value: MetaItem<*>? = null
when (val action = request.action) {
GET_PROPERTY_ACTION -> {
key = requestKey
value = device.getProperty(requestKey ?: error("Key field is not defined in request"))
when (request) {
is PropertyGetMessage -> {
PropertyChangedMessage(
key = request.property,
value = device.getProperty(request.property),
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice
)
}
SET_PROPERTY_ACTION -> {
require(requestKey != null) { "Key field is not defined in request" }
if (requestValue == null) {
device.invalidateProperty(requestKey)
is PropertySetMessage -> {
if (request.value == null) {
device.invalidateProperty(request.property)
} else {
device.setProperty(requestKey, requestValue)
device.setProperty(request.property, request.value)
}
key = requestKey
value = device.getProperty(requestKey)
PropertyChangedMessage(
key = request.property,
value = device.getProperty(request.property),
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice
)
}
EXECUTE_ACTION -> {
require(requestKey != null) { "Key field is not defined in request" }
key = requestKey
value = device.execute(requestKey, requestValue)
is ActionExecuteMessage -> {
ActionResultMessage(
action = request.action,
result = device.execute(request.action, request.argument),
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice
)
}
is GetDescriptionMessage -> {
val descriptionMeta = Meta {
"properties" put {
device.propertyDescriptors.map { descriptor ->
descriptor.name put descriptor.config
}
}
"actions" put {
device.actionDescriptors.map { descriptor ->
descriptor.name put descriptor.config
}
}
}
DescriptionMessage(
description = descriptionMeta,
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice
)
}
PROPERTY_LIST_ACTION -> {
value = Meta {
device.propertyDescriptors.map { descriptor ->
descriptor.name put descriptor.config
}
}.asMetaItem()
}
ACTION_LIST_ACTION -> {
value = Meta {
device.actionDescriptors.map { descriptor ->
descriptor.name put descriptor.config
}
}.asMetaItem()
}
else -> {
error("Unrecognized action $action")
is DescriptionMessage, is PropertyChangedMessage, is ActionResultMessage, is BinaryNotificationMessage, is DeviceErrorMessage, is EmptyDeviceMessage -> {
//Those messages are ignored
EmptyDeviceMessage(
sourceDevice = deviceTarget,
targetDevice = request.sourceDevice,
comment = "The message is ignored"
)
}
}
DeviceMessage(
targetName = request.sourceName,
sourceName = deviceTarget,
action = "response.${request.action}",
key = key,
value = value
)
} catch (ex: Exception) {
DeviceMessage.error(ex, request.action).respondsTo(request)
DeviceMessage.error(ex, sourceDevice = deviceTarget, targetDevice = request.sourceDevice)
}
}
}
@ -157,10 +163,10 @@ public class DeviceController(
public suspend fun DeviceHub.respondMessage(request: DeviceMessage): DeviceMessage {
return try {
val targetName = request.targetName?.toName() ?: Name.EMPTY
val targetName = request.targetDevice?.toName() ?: Name.EMPTY
val device = this[targetName] ?: error("The device with name $targetName not found in $this")
DeviceController.respondMessage(device, targetName.toString(), request)
} catch (ex: Exception) {
DeviceMessage.error(ex, request.action).respondsTo(request)
DeviceMessage.error(ex, sourceDevice = request.targetDevice, targetDevice = request.sourceDevice)
}
}

View File

@ -1,8 +1,10 @@
package hep.dataforge.control.controllers
import hep.dataforge.control.api.*
import hep.dataforge.control.api.DeviceHub
import hep.dataforge.control.api.DeviceListener
import hep.dataforge.control.api.get
import hep.dataforge.control.messages.DeviceMessage
import hep.dataforge.control.messages.respondsTo
import hep.dataforge.control.messages.PropertyChangedMessage
import hep.dataforge.control.messages.toEnvelope
import hep.dataforge.io.Consumer
import hep.dataforge.io.Envelope
@ -48,13 +50,11 @@ public class HubController(
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
if (value == null) return
scope.launch {
val change = DeviceMessage(
sourceName = name.toString(),
action = DeviceMessage.PROPERTY_CHANGED_ACTION,
val change = PropertyChangedMessage(
sourceDevice = name.toString(),
key = propertyName,
value = value
)
messageOutbox.send(change)
}
}
@ -64,23 +64,24 @@ public class HubController(
}
public suspend fun respondMessage(message: DeviceMessage): DeviceMessage = try {
val targetName = message.targetName?.toName() ?: Name.EMPTY
val targetName = message.targetDevice?.toName() ?: Name.EMPTY
val device = hub[targetName] ?: error("The device with name $targetName not found in $hub")
DeviceController.respondMessage(device, targetName.toString(), message)
} catch (ex: Exception) {
DeviceMessage.error(ex, message.action).respondsTo(message)
DeviceMessage.error(ex, sourceDevice = null, targetDevice = message.sourceDevice)
}
override suspend fun respond(request: Envelope): Envelope = try {
val targetName = request.meta[DeviceMessage.TARGET_KEY].string?.toName() ?: Name.EMPTY
val device = hub[targetName] ?: error("The device with name $targetName not found in $hub")
if (request.data == null) {
DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.fromMeta(request.meta)).toEnvelope()
DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.fromMeta(request.meta))
.toEnvelope()
} else {
DeviceController.respond(device, targetName.toString(), request)
}
} catch (ex: Exception) {
DeviceMessage.error(ex).toEnvelope()
DeviceMessage.error(ex, sourceDevice = null).toEnvelope()
}
override fun consume(message: Envelope) {

View File

@ -9,8 +9,11 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.launch
/**
* Flow changes of all properties of a given device ignoring invalidation events
*/
@OptIn(ExperimentalCoroutinesApi::class)
public suspend fun Device.flowValues(): Flow<Pair<String, MetaItem<*>>> = callbackFlow {
public suspend fun Device.flowPropertyChanges(): Flow<Pair<String, MetaItem<*>>> = callbackFlow {
val listener = object : DeviceListener {
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
if (value != null) {

View File

@ -12,24 +12,19 @@ import kotlinx.serialization.json.encodeToJsonElement
@Serializable
public sealed class DeviceMessage{
public abstract val sourceName: String?
public abstract val targetName: String?
public abstract val sourceDevice: String?
public abstract val targetDevice: String?
public abstract val comment: String?
public companion object {
public val SOURCE_KEY: Name = DeviceMessage::sourceName.name.asName()
public val TARGET_KEY: Name = DeviceMessage::targetName.name.asName()
public val MESSAGE_ACTION_KEY: Name = DeviceMessage::action.name.asName()
public val MESSAGE_KEY_KEY: Name = DeviceMessage::key.name.asName()
public val MESSAGE_VALUE_KEY: Name = DeviceMessage::value.name.asName()
public const val OK_STATUS: String = "OK"
public const val FAIL_STATUS: String = "FAIL"
public const val PROPERTY_CHANGED_ACTION: String = "event.propertyChanged"
public val SOURCE_KEY: Name = DeviceMessage::sourceDevice.name.asName()
public val TARGET_KEY: Name = DeviceMessage::targetDevice.name.asName()
public fun error(
cause: Throwable,
sourceDevice: String?,
targetDevice: String? = null,
): DeviceErrorMessage = DeviceErrorMessage(
errorMessage = cause.message,
errorType = cause::class.simpleName,
@ -45,27 +40,50 @@ public sealed class DeviceMessage{
public data class PropertyChangedMessage(
public val key: String,
public val value: MetaItem<*>?,
override val sourceName: String? = null,
override val targetName: String? = null,
override val sourceDevice: String,
override val targetDevice: String? = null,
override val comment: String? = null,
) : DeviceMessage()
@Serializable
@SerialName("property.set")
public data class PropertySetMessage(
public val key: String,
public val value: MetaItem<*>,
override val sourceName: String? = null,
override val targetName: String? = null,
public val property: String,
public val value: MetaItem<*>?,
override val sourceDevice: String? = null,
override val targetDevice: String,
override val comment: String? = null,
) : DeviceMessage()
@Serializable
@SerialName("property.read")
public data class PropertyReadMessage(
public val key: String,
override val sourceName: String? = null,
override val targetName: String? = null,
@SerialName("property.get")
public data class PropertyGetMessage(
public val property: String,
override val sourceDevice: String? = null,
override val targetDevice: String,
override val comment: String? = null,
) : DeviceMessage()
/**
* Request device description
*/
@Serializable
@SerialName("description.get")
public data class GetDescriptionMessage(
override val sourceDevice: String? = null,
override val targetDevice: String,
override val comment: String? = null,
) : DeviceMessage()
/**
* The full device description message
*/
@Serializable
@SerialName("description")
public data class DescriptionMessage(
val description: Meta,
override val sourceDevice: String,
override val targetDevice: String? = null,
override val comment: String? = null,
) : DeviceMessage()
@ -74,8 +92,8 @@ public data class PropertyReadMessage(
public data class ActionExecuteMessage(
public val action: String,
public val argument: MetaItem<*>?,
override val sourceName: String? = null,
override val targetName: String? = null,
override val sourceDevice: String? = null,
override val targetDevice: String? = null,
override val comment: String? = null,
) : DeviceMessage()
@ -84,19 +102,46 @@ public data class ActionExecuteMessage(
public data class ActionResultMessage(
public val action: String,
public val result: MetaItem<*>?,
override val sourceName: String? = null,
override val targetName: String? = null,
override val sourceDevice: String? = null,
override val targetDevice: String? = null,
override val comment: String? = null,
) : DeviceMessage()
/**
* Notifies listeners that a new binary with given [binaryID] is available. The binary itself could not be provided via [DeviceMessage] API.
*/
@Serializable
@SerialName("binary.notification")
public data class BinaryNotificationMessage(
val binaryID: String,
override val sourceDevice: String,
override val targetDevice: String? = null,
override val comment: String? = null,
) : DeviceMessage()
/**
* The message states that the message is received, but no meaningful response is produced.
* This message could be used for a heartbeat.
*/
@Serializable
@SerialName("empty")
public data class EmptyDeviceMessage(
override val sourceDevice: String? = null,
override val targetDevice: String? = null,
override val comment: String? = null,
) : DeviceMessage()
/**
* The evaluation of the message produced a service error
*/
@Serializable
@SerialName("error")
public data class DeviceErrorMessage(
public val errorMessage: String?,
public val errorType: String? = null,
public val errorStackTrace: String? = null,
override val sourceName: String? = null,
override val targetName: String? = null,
override val sourceDevice: String? = null,
override val targetDevice: String? = null,
override val comment: String? = null,
) : DeviceMessage()

View File

@ -30,15 +30,6 @@ internal suspend fun ApplicationCall.respondJson(builder: JsonObjectBuilder.() -
respondText(json.toString(), contentType = ContentType.Application.Json)
}
public suspend fun ApplicationCall.respondMessage(message: DeviceMessage) {
respondText(Json.encodeToString(MetaSerializer, message.toMeta()), contentType = ContentType.Application.Json)
}
//public suspend fun ApplicationCall.respondMessage(builder: DeviceMessage.() -> Unit) {
// respondMessage(DeviceMessage(builder))
//}
//
//public suspend fun ApplicationCall.respondFail(builder: DeviceMessage.() -> Unit) {
// respondMessage(DeviceMessage.fail(null, block = builder))
//}

View File

@ -4,11 +4,11 @@ package hep.dataforge.control.server
import hep.dataforge.control.api.get
import hep.dataforge.control.controllers.DeviceController.Companion.GET_PROPERTY_ACTION
import hep.dataforge.control.controllers.DeviceController.Companion.SET_PROPERTY_ACTION
import hep.dataforge.control.controllers.DeviceManager
import hep.dataforge.control.controllers.respondMessage
import hep.dataforge.control.messages.DeviceMessage
import hep.dataforge.control.messages.PropertyGetMessage
import hep.dataforge.control.messages.PropertySetMessage
import hep.dataforge.meta.toJson
import hep.dataforge.meta.toMeta
import hep.dataforge.meta.toMetaItem
@ -201,11 +201,10 @@ public fun Application.deviceModule(
get("get") {
val target: String by call.parameters
val property: String by call.parameters
val request = DeviceMessage(
action = GET_PROPERTY_ACTION,
sourceName = WEB_SERVER_TARGET,
targetName = target,
key = property,
val request = PropertyGetMessage(
sourceDevice = WEB_SERVER_TARGET,
targetDevice = target,
property = property,
)
val response = manager.respondMessage(request)
@ -217,11 +216,10 @@ public fun Application.deviceModule(
val body = call.receiveText()
val json = Json.parseToJsonElement(body)
val request = DeviceMessage(
action = SET_PROPERTY_ACTION,
sourceName = WEB_SERVER_TARGET,
targetName = target,
key = property,
val request = PropertySetMessage(
sourceDevice = WEB_SERVER_TARGET,
targetDevice = target,
property = property,
value = json.toMetaItem()
)

View File

@ -5,6 +5,12 @@ plugins {
val ktorVersion: String by rootProject.extra
kscience{
useSerialization {
json()
}
}
kotlin {
sourceSets {
commonMain {

View File

@ -22,7 +22,7 @@ dependencies{
implementation(project(":dataforge-magix-client"))
implementation("no.tornado:tornadofx:1.7.20")
implementation(kotlin("stdlib-jdk8"))
implementation("kscience.plotlykt:plotlykt-server:0.3.0-dev-2")
implementation("kscience.plotlykt:plotlykt-server:0.3.0")
}
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach {

View File

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.7-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.7.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

View File

@ -6,8 +6,8 @@ import kotlinx.serialization.Serializable
@Serializable
public data class MagixMessageFilter(
val format: List<String>? = null,
val origin: List<String>? = null,
val format: List<String?>? = null,
val origin: List<String?>? = null,
val target: List<String?>? = null,
val action: List<String?>? = null,
) {

View File

@ -4,36 +4,57 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.JsonElement
public interface MagixProcessor {
public fun interface MagixProcessor {
public fun process(endpoint: MagixEndpoint): Job
}
/**
* A converter from one (or several) format to another. It captures all events with the given filter then transforms it
* with given [transformer] and sends back to the loop with given [outputFormat].
*
* If [newOrigin] is not null, it is used as a replacement for old [MagixMessage.origin] tag.
*/
public class MagixConverter(
private val scope: CoroutineScope,
private val filter: MagixMessageFilter,
private val outputFormat: String,
private val newOrigin: String? = null,
private val transformer: suspend (JsonElement) -> JsonElement,
) : MagixProcessor {
override fun process(endpoint: MagixEndpoint): Job = scope.launch {
endpoint.subscribe(filter).onEach { message ->
val newPayload = transformer(message.payload)
val transformed = message.copy(
payload = newPayload,
format = outputFormat,
origin = newOrigin ?: message.origin
)
endpoint.broadcast(transformed)
}.launchIn(this)
//TODO add catch logic here
public companion object {
/**
* A converter from one (or several) format to another. It captures all events with the given filter then transforms it
* with given [transformer] and sends back to the loop with given [outputFormat].
*
* If [newOrigin] is not null, it is used as a replacement for old [MagixMessage.origin] tag.
*/
public fun <T : Any, R : Any> convert(
scope: CoroutineScope,
filter: MagixMessageFilter,
outputFormat: String,
inputSerializer: KSerializer<T>,
outputSerializer: KSerializer<R>,
newOrigin: String? = null,
transformer: suspend (T) -> R,
): MagixProcessor = MagixProcessor { endpoint ->
endpoint.subscribe(inputSerializer, filter).onEach { message ->
val newPayload = transformer(message.payload)
val transformed: MagixMessage<R> = MagixMessage(
outputFormat,
newOrigin ?: message.origin,
newPayload,
message.target,
message.id,
message.parentId,
message.user
)
endpoint.broadcast(outputSerializer, transformed)
}.launchIn(scope)
}
}
public fun convert(
scope: CoroutineScope,
filter: MagixMessageFilter,
outputFormat: String,
newOrigin: String? = null,
transformer: suspend (JsonElement) -> JsonElement,
): MagixProcessor = convert(
scope,
filter,
outputFormat,
JsonElement.serializer(),
JsonElement.serializer(),
newOrigin,
transformer
)
}

View File

@ -22,8 +22,8 @@ import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
public class RSocketMagixEndpoint(
val coroutineContext: CoroutineContext,
public val rSocket: RSocket,
private val coroutineContext: CoroutineContext,
private val rSocket: RSocket,
) : MagixEndpoint {
override fun <T> subscribe(

View File

@ -1,5 +1,3 @@
import ru.mipt.npm.gradle.useFx
plugins {
id("ru.mipt.npm.jvm")
id("ru.mipt.npm.publish")
@ -9,11 +7,14 @@ plugins {
//TODO to be moved to a separate project
application{
mainClassName = "ru.mipt.npm.devices.pimotionmaster.PiMotionMasterAppKt"
mainClass.set("ru.mipt.npm.devices.pimotionmaster.PiMotionMasterAppKt")
}
kotlin{
explicitApi = null
}
kscience{
useFx(ru.mipt.npm.gradle.FXModule.CONTROLS, configuration = ru.mipt.npm.gradle.DependencyConfiguration.IMPLEMENTATION)
}

View File

@ -1,6 +1,6 @@
pluginManagement {
val kotlinVersion = "1.4.20-M2"
val toolsVersion = "0.6.4-dev-1.4.20-M2"
val kotlinVersion = "1.4.20"
val toolsVersion = "0.7.0"
repositories {
mavenLocal()