Replace Scheme by data class in DeviceMessage

This commit is contained in:
Alexander Nozik 2020-11-05 11:29:40 +03:00
parent f3cfe9c6db
commit 78ee05371b
13 changed files with 211 additions and 270 deletions

View File

@ -8,7 +8,9 @@ val ktorVersion: String by rootProject.extra
kscience { kscience {
useCoroutines() useCoroutines()
useSerialization() useSerialization{
json()
}
} }
kotlin { kotlin {

View File

@ -37,19 +37,19 @@ public class DeviceController(
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) { override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
if (value == null) return if (value == null) return
scope.launch { scope.launch {
val change = DeviceMessage.ok { val change = DeviceMessage(
this.sourceName = deviceTarget sourceName = deviceTarget,
this.action = PROPERTY_CHANGED_ACTION action = PROPERTY_CHANGED_ACTION,
this.key = propertyName key = propertyName,
this.value = value value = value,
} )
val envelope = SimpleEnvelope(change.toMeta(), Binary.EMPTY) val envelope = SimpleEnvelope(change.toMeta(), Binary.EMPTY)
outputChannel.send(envelope) outputChannel.send(envelope)
} }
} }
public fun recieving(): Flow<Envelope> = outputChannel.consumeAsFlow() public fun receiving(): Flow<Envelope> = outputChannel.consumeAsFlow()
@DFExperimental @DFExperimental
override fun consume(message: Envelope) { override fun consume(message: Envelope) {
@ -70,7 +70,7 @@ public class DeviceController(
val target = request.meta["target"].string val target = request.meta["target"].string
return try { return try {
if (request.data == null) { if (request.data == null) {
respondMessage(device, deviceTarget, DeviceMessage.wrap(request.meta)).wrap() respondMessage(device, deviceTarget, DeviceMessage.fromMeta(request.meta)).toEnvelope()
} else if (target != null && target != deviceTarget) { } else if (target != null && target != deviceTarget) {
error("Wrong target name $deviceTarget expected but $target found") error("Wrong target name $deviceTarget expected but $target found")
} else { } else {
@ -85,7 +85,7 @@ public class DeviceController(
} else error("Device does not support binary response") } else error("Device does not support binary response")
} }
} catch (ex: Exception) { } catch (ex: Exception) {
DeviceMessage.fail(cause = ex).wrap() DeviceMessage.fail(ex).toEnvelope()
} }
} }
@ -93,58 +93,59 @@ public class DeviceController(
device: Device, device: Device,
deviceTarget: String, deviceTarget: String,
request: DeviceMessage, request: DeviceMessage,
): DeviceMessage { ): DeviceMessage = try {
return try { val requestKey = request.key
DeviceMessage.ok { val requestValue = request.value
targetName = request.sourceName var key: String? = null
sourceName = deviceTarget var value: MetaItem<*>? = null
action = "response.${request.action}" when (val action = request.action) {
val requestKey = request.key GET_PROPERTY_ACTION -> {
val requestValue = request.value key = requestKey
value = device.getProperty(requestKey ?: error("Key field is not defined in request"))
when (val action = request.action) { }
GET_PROPERTY_ACTION -> { SET_PROPERTY_ACTION -> {
key = requestKey require(requestKey != null) { "Key field is not defined in request" }
value = device.getProperty(requestKey ?: error("Key field is not defined in request")) if (requestValue == null) {
} device.invalidateProperty(requestKey)
SET_PROPERTY_ACTION -> { } else {
require(requestKey != null) { "Key field is not defined in request" } device.setProperty(requestKey, requestValue)
if (requestValue == null) { }
device.invalidateProperty(requestKey) key = requestKey
} else { value = device.getProperty(requestKey)
device.setProperty(requestKey, requestValue) }
} EXECUTE_ACTION -> {
key = requestKey require(requestKey != null) { "Key field is not defined in request" }
value = device.getProperty(requestKey) key = requestKey
} value = device.execute(requestKey, requestValue)
EXECUTE_ACTION -> {
require(requestKey != null) { "Key field is not defined in request" } }
key = requestKey PROPERTY_LIST_ACTION -> {
value = device.execute(requestKey, requestValue) value = Meta {
device.propertyDescriptors.map { descriptor ->
} descriptor.name put descriptor.config
PROPERTY_LIST_ACTION -> { }
value = Meta { }.asMetaItem()
device.propertyDescriptors.map { descriptor -> }
descriptor.name put descriptor.config ACTION_LIST_ACTION -> {
} value = Meta {
}.asMetaItem() device.actionDescriptors.map { descriptor ->
} descriptor.name put descriptor.config
ACTION_LIST_ACTION -> { }
value = Meta { }.asMetaItem()
device.actionDescriptors.map { descriptor -> }
descriptor.name put descriptor.config else -> {
} error("Unrecognized action $action")
}.asMetaItem()
}
else -> {
error("Unrecognized action $action")
}
}
} }
} catch (ex: Exception) {
DeviceMessage.fail(request, cause = ex)
} }
DeviceMessage(
targetName = request.sourceName,
sourceName = deviceTarget,
action = "response.${request.action}",
key = key,
value = value
)
} catch (ex: Exception) {
DeviceMessage.fail(ex, request.action).respondsTo(request)
} }
} }
} }
@ -156,6 +157,6 @@ public suspend fun DeviceHub.respondMessage(request: DeviceMessage): DeviceMessa
val device = this[targetName] ?: error("The device with name $targetName not found in $this") val device = this[targetName] ?: error("The device with name $targetName not found in $this")
DeviceController.respondMessage(device, targetName.toString(), request) DeviceController.respondMessage(device, targetName.toString(), request)
} catch (ex: Exception) { } catch (ex: Exception) {
DeviceMessage.fail(request, cause = ex) DeviceMessage.fail(ex, request.action).respondsTo(request)
} }
} }

View File

@ -4,21 +4,22 @@ import hep.dataforge.io.SimpleEnvelope
import hep.dataforge.meta.* import hep.dataforge.meta.*
import hep.dataforge.names.Name import hep.dataforge.names.Name
import hep.dataforge.names.asName import hep.dataforge.names.asName
import kotlinx.serialization.KSerializer import kotlinx.serialization.Serializable
import kotlinx.serialization.descriptors.SerialDescriptor import kotlinx.serialization.json.Json
import kotlinx.serialization.encoding.Decoder import kotlinx.serialization.json.decodeFromJsonElement
import kotlinx.serialization.encoding.Encoder import kotlinx.serialization.json.encodeToJsonElement
public class DeviceMessage : Scheme() { @Serializable
public var action: String by string { error("Action not defined") } public data class DeviceMessage(
public var status: String by string(default = OK_STATUS) public val action: String,
public var sourceName: String? by string() public val status: String = OK_STATUS,
public var targetName: String? by string() public val sourceName: String? = null,
public var comment: String? by string() public val targetName: String? = null,
public var key: String? by string() public val comment: String? = null,
public var value: MetaItem<*>? by item() public val key: String? = null,
public val value: MetaItem<*>? = null,
public companion object : SchemeSpec<DeviceMessage>(::DeviceMessage), KSerializer<DeviceMessage> { ) {
public companion object {
public val SOURCE_KEY: Name = DeviceMessage::sourceName.name.asName() public val SOURCE_KEY: Name = DeviceMessage::sourceName.name.asName()
public val TARGET_KEY: Name = DeviceMessage::targetName.name.asName() public val TARGET_KEY: Name = DeviceMessage::targetName.name.asName()
public val MESSAGE_ACTION_KEY: Name = DeviceMessage::action.name.asName() public val MESSAGE_ACTION_KEY: Name = DeviceMessage::action.name.asName()
@ -29,42 +30,32 @@ public class DeviceMessage : Scheme() {
public const val FAIL_STATUS: String = "FAIL" public const val FAIL_STATUS: String = "FAIL"
public const val PROPERTY_CHANGED_ACTION: String = "event.propertyChanged" public const val PROPERTY_CHANGED_ACTION: String = "event.propertyChanged"
public inline fun ok( private fun Throwable.toMeta(): Meta = Meta {
request: DeviceMessage? = null, "type" put this::class.simpleName
block: DeviceMessage.() -> Unit = {}, "message" put message
): DeviceMessage = DeviceMessage { "trace" put stackTraceToString()
targetName = request?.sourceName
}.apply(block)
public inline fun fail(
request: DeviceMessage? = null,
cause: Throwable? = null,
block: DeviceMessage.() -> Unit = {},
): DeviceMessage = DeviceMessage {
targetName = request?.sourceName
status = FAIL_STATUS
if (cause != null) {
configure {
set("error.type", cause::class.simpleName)
set("error.message", cause.message)
//set("error.trace", ex.stackTraceToString())
}
comment = cause.message
}
}.apply(block)
override val descriptor: SerialDescriptor = MetaSerializer.descriptor
override fun deserialize(decoder: Decoder): DeviceMessage {
val meta = MetaSerializer.deserialize(decoder)
return wrap(meta)
} }
override fun serialize(encoder: Encoder, value: DeviceMessage) { public fun fail(
MetaSerializer.serialize(encoder, value.toMeta()) cause: Throwable,
} action: String = "undefined",
): DeviceMessage = DeviceMessage(
action = action,
status = FAIL_STATUS,
value = cause.toMeta().asMetaItem()
)
public fun fromMeta(meta: Meta): DeviceMessage = Json.decodeFromJsonElement(meta.toJson())
} }
} }
public fun DeviceMessage.wrap(): SimpleEnvelope = SimpleEnvelope(this.config, null)
public fun DeviceMessage.ok(): DeviceMessage =
copy(status = DeviceMessage.OK_STATUS)
public fun DeviceMessage.respondsTo(request: DeviceMessage): DeviceMessage =
copy(sourceName = request.targetName, targetName = request.sourceName)
public fun DeviceMessage.toMeta(): JsonMeta = Json.encodeToJsonElement(this).toMetaItem().node!!
public fun DeviceMessage.toEnvelope(): SimpleEnvelope = SimpleEnvelope(toMeta(), null)

View File

@ -6,7 +6,10 @@ import hep.dataforge.control.api.get
import hep.dataforge.io.Consumer import hep.dataforge.io.Consumer
import hep.dataforge.io.Envelope import hep.dataforge.io.Envelope
import hep.dataforge.io.Responder import hep.dataforge.io.Responder
import hep.dataforge.meta.* import hep.dataforge.meta.DFExperimental
import hep.dataforge.meta.MetaItem
import hep.dataforge.meta.get
import hep.dataforge.meta.string
import hep.dataforge.names.Name import hep.dataforge.names.Name
import hep.dataforge.names.NameToken import hep.dataforge.names.NameToken
import hep.dataforge.names.toName import hep.dataforge.names.toName
@ -35,7 +38,7 @@ public class HubController(
private val packJob = scope.launch { private val packJob = scope.launch {
while (isActive) { while (isActive) {
val message = messageOutbox.receive() val message = messageOutbox.receive()
envelopeOutbox.send(message.wrap()) envelopeOutbox.send(message.toEnvelope())
} }
} }
@ -44,12 +47,12 @@ public class HubController(
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) { override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
if (value == null) return if (value == null) return
scope.launch { scope.launch {
val change = DeviceMessage.ok { val change = DeviceMessage(
sourceName = name.toString() sourceName = name.toString(),
action = DeviceMessage.PROPERTY_CHANGED_ACTION action = DeviceMessage.PROPERTY_CHANGED_ACTION,
this.key = propertyName key = propertyName,
this.value = value value = value
} )
messageOutbox.send(change) messageOutbox.send(change)
} }
@ -64,23 +67,19 @@ public class HubController(
val device = hub[targetName] ?: error("The device with name $targetName not found in $hub") val device = hub[targetName] ?: error("The device with name $targetName not found in $hub")
DeviceController.respondMessage(device, targetName.toString(), message) DeviceController.respondMessage(device, targetName.toString(), message)
} catch (ex: Exception) { } catch (ex: Exception) {
DeviceMessage.fail { DeviceMessage.fail(ex, message.action).respondsTo(message)
comment = ex.message
}
} }
override suspend fun respond(request: Envelope): Envelope = try { override suspend fun respond(request: Envelope): Envelope = try {
val targetName = request.meta[DeviceMessage.TARGET_KEY].string?.toName() ?: Name.EMPTY val targetName = request.meta[DeviceMessage.TARGET_KEY].string?.toName() ?: Name.EMPTY
val device = hub[targetName] ?: error("The device with name $targetName not found in $hub") val device = hub[targetName] ?: error("The device with name $targetName not found in $hub")
if (request.data == null) { if (request.data == null) {
DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.wrap(request.meta)).wrap() DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.fromMeta(request.meta)).toEnvelope()
} else { } else {
DeviceController.respond(device, targetName.toString(), request) DeviceController.respond(device, targetName.toString(), request)
} }
} catch (ex: Exception) { } catch (ex: Exception) {
DeviceMessage.fail { DeviceMessage.fail(ex).toEnvelope()
comment = ex.message
}.wrap()
} }
override fun consume(message: Envelope) { override fun consume(message: Envelope) {

View File

@ -1,6 +1,7 @@
package hep.dataforge.control.server package hep.dataforge.control.server
import hep.dataforge.control.controllers.DeviceMessage import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.control.controllers.toMeta
import hep.dataforge.io.* import hep.dataforge.io.*
import hep.dataforge.meta.MetaSerializer import hep.dataforge.meta.MetaSerializer
import io.ktor.application.ApplicationCall import io.ktor.application.ApplicationCall
@ -34,10 +35,10 @@ public suspend fun ApplicationCall.respondMessage(message: DeviceMessage) {
respondText(Json.encodeToString(MetaSerializer, message.toMeta()), contentType = ContentType.Application.Json) respondText(Json.encodeToString(MetaSerializer, message.toMeta()), contentType = ContentType.Application.Json)
} }
public suspend fun ApplicationCall.respondMessage(builder: DeviceMessage.() -> Unit) { //public suspend fun ApplicationCall.respondMessage(builder: DeviceMessage.() -> Unit) {
respondMessage(DeviceMessage(builder)) // respondMessage(DeviceMessage(builder))
} //}
//
public suspend fun ApplicationCall.respondFail(builder: DeviceMessage.() -> Unit) { //public suspend fun ApplicationCall.respondFail(builder: DeviceMessage.() -> Unit) {
respondMessage(DeviceMessage.fail(null, block = builder)) // respondMessage(DeviceMessage.fail(null, block = builder))
} //}

View File

@ -12,7 +12,6 @@ import hep.dataforge.control.controllers.respondMessage
import hep.dataforge.meta.toJson import hep.dataforge.meta.toJson
import hep.dataforge.meta.toMeta import hep.dataforge.meta.toMeta
import hep.dataforge.meta.toMetaItem import hep.dataforge.meta.toMetaItem
import hep.dataforge.meta.wrap
import io.ktor.application.* import io.ktor.application.*
import io.ktor.features.CORS import io.ktor.features.CORS
import io.ktor.features.StatusPages import io.ktor.features.StatusPages
@ -189,7 +188,7 @@ public fun Application.deviceModule(
?: throw IllegalArgumentException("The body is not a json object") ?: throw IllegalArgumentException("The body is not a json object")
val meta = json.toMeta() val meta = json.toMeta()
val request = DeviceMessage.wrap(meta) val request = DeviceMessage.fromMeta(meta)
val response = manager.respondMessage(request) val response = manager.respondMessage(request)
call.respondMessage(response) call.respondMessage(response)
@ -202,12 +201,12 @@ public fun Application.deviceModule(
get("get") { get("get") {
val target: String by call.parameters val target: String by call.parameters
val property: String by call.parameters val property: String by call.parameters
val request = DeviceMessage { val request = DeviceMessage(
action = GET_PROPERTY_ACTION action = GET_PROPERTY_ACTION,
sourceName = WEB_SERVER_TARGET sourceName = WEB_SERVER_TARGET,
this.targetName = target targetName = target,
key = property key = property,
} )
val response = manager.respondMessage(request) val response = manager.respondMessage(request)
call.respondMessage(response) call.respondMessage(response)
@ -218,14 +217,13 @@ public fun Application.deviceModule(
val body = call.receiveText() val body = call.receiveText()
val json = Json.parseToJsonElement(body) val json = Json.parseToJsonElement(body)
val request = DeviceMessage { val request = DeviceMessage(
action = SET_PROPERTY_ACTION action = SET_PROPERTY_ACTION,
sourceName = WEB_SERVER_TARGET sourceName = WEB_SERVER_TARGET,
this.targetName = target targetName = target,
key = property key = property,
value = json.toMetaItem() value = json.toMetaItem()
)
}
val response = manager.respondMessage(request) val response = manager.respondMessage(request)
call.respondMessage(response) call.respondMessage(response)

View File

@ -5,28 +5,12 @@ plugins {
val ktorVersion: String by rootProject.extra val ktorVersion: String by rootProject.extra
repositories{
maven("https://maven.pkg.github.com/altavir/ktor-client-sse")
}
kotlin { kotlin {
sourceSets { sourceSets {
commonMain { commonMain {
dependencies { dependencies {
implementation(project(":magix:magix-service"))
implementation(project(":dataforge-device-core")) implementation(project(":dataforge-device-core"))
implementation(project(":dataforge-device-tcp"))
implementation("io.ktor:ktor-client-core:$ktorVersion")
implementation("ru.mipt.npm:ktor-client-sse:0.1.0")
}
}
jvmMain {
dependencies {
}
}
jsMain {
dependencies {
} }
} }
} }

View File

@ -1,104 +0,0 @@
package hep.dataforge.control.client
import hep.dataforge.control.controllers.DeviceManager
import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.control.controllers.respondMessage
import hep.dataforge.meta.toJson
import hep.dataforge.meta.toMeta
import hep.dataforge.meta.wrap
import io.ktor.client.HttpClient
import io.ktor.client.request.post
import io.ktor.http.ContentType
import io.ktor.http.Url
import io.ktor.http.contentType
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.serialization.json.*
import ru.mipt.npm.ktor.sse.readSse
import kotlin.coroutines.CoroutineContext
/*
{
"id":"string|number[optional, but desired]",
"parentId": "string|number[optional]",
"target":"string[optional]",
"origin":"string[required]",
"user":"string[optional]",
"action":"string[optional, default='heartbeat']",
"payload":"object[optional]"
}
*/
/**
* Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1)
*/
public class MagixClient(
private val manager: DeviceManager,
private val postUrl: Url,
private val sseUrl: Url,
//private val inbox: Flow<JsonObject>
) : CoroutineScope {
override val coroutineContext: CoroutineContext =
manager.context.coroutineContext + Job(manager.context.coroutineContext[Job])
private val client = HttpClient()
protected fun generateId(message: DeviceMessage, requestId: String?): String = if (requestId != null) {
"$requestId.response"
} else {
"df[${message.hashCode()}"
}
private fun send(json: JsonObject) {
launch {
client.post<Unit>(postUrl) {
this.contentType(ContentType.Application.Json)
body = json.toString()
}
}
}
private fun wrapMessage(message: DeviceMessage, requestId: String? = null): JsonObject = buildJsonObject {
put("id", generateId(message, requestId))
if (requestId != null) {
put("parentId", requestId)
}
put("target", "magix")
put("origin", "df")
put("payload", message.config.toJson())
}
private val listenJob = launch {
manager.controller.messageOutput().collect { message ->
val json = wrapMessage(message)
send(json)
}
}
private val respondJob = launch {
client.readSse(sseUrl.toString()) {
val json = Json.parseToJsonElement(it.data) as JsonObject
val requestId = json["id"]?.jsonPrimitive?.content
val payload = json["payload"]?.jsonObject
//TODO analyze action
if (payload != null) {
val meta = payload.toMeta()
val request = DeviceMessage.wrap(meta)
val response = manager.respondMessage(request)
send(wrapMessage(response, requestId))
} else {
TODO("process heartbeat and other system messages")
}
}
}
}

View File

@ -0,0 +1,14 @@
package hep.dataforge.control.client
public data class TangoPayload(
val host: String,
val device: String,
val name: String,
val value: Any? = null,
val timestamp: Long? = null,
val quality: String = "VALID",
val event: String? = null,
val input: Any? = null,
val output: Any? = null,
val errors: Iterable<Any?>?,
)

View File

@ -0,0 +1,54 @@
package hep.dataforge.control.client
import hep.dataforge.control.controllers.DeviceManager
import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.control.controllers.respondMessage
import hep.dataforge.magix.api.MagixEndpoint
import hep.dataforge.magix.api.MagixMessage
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
public const val DATAFORGE_FORMAT: String = "dataforge"
private fun generateId(request: MagixMessage<DeviceMessage>): String = if (request.id != null) {
"${request.id}.response"
} else {
"df[${request.payload.hashCode()}"
}
/**
* Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1)
*/
public fun DeviceManager.launchMagixClient(
endpoint: MagixEndpoint,
endpointID: String = "dataforge",
): Job = context.launch {
endpoint.subscribe(DeviceMessage.serializer()).onEach { request ->
//TODO analyze action
val responsePayload = respondMessage(request.payload)
val response = MagixMessage(
format = DATAFORGE_FORMAT,
id = generateId(request),
parentId = request.id,
origin = endpointID,
payload = responsePayload
)
endpoint.broadcast(DeviceMessage.serializer(), response)
}.launchIn(endpoint.scope)
controller.messageOutput().onEach { payload ->
MagixMessage(
format = DATAFORGE_FORMAT,
id = "df[${payload.hashCode()}]",
origin = endpointID,
payload = payload
)
}.launchIn(endpoint.scope)
}

View File

@ -15,7 +15,7 @@ public interface MagixEndpoint {
/** /**
* Subscribe to a [Flow] of messages using specific [payloadSerializer] * Subscribe to a [Flow] of messages using specific [payloadSerializer]
*/ */
public suspend fun <T> subscribe( public fun <T> subscribe(
payloadSerializer: KSerializer<T>, payloadSerializer: KSerializer<T>,
filter: MagixMessageFilter = MagixMessageFilter.ALL, filter: MagixMessageFilter = MagixMessageFilter.ALL,
): Flow<MagixMessage<T>> ): Flow<MagixMessage<T>>
@ -36,7 +36,7 @@ public interface MagixEndpoint {
} }
} }
public suspend fun MagixEndpoint.subscribe( public fun MagixEndpoint.subscribe(
filter: MagixMessageFilter = MagixMessageFilter.ALL, filter: MagixMessageFilter = MagixMessageFilter.ALL,
): Flow<MagixMessage<JsonElement>> = subscribe(JsonElement.serializer()) ): Flow<MagixMessage<JsonElement>> = subscribe(JsonElement.serializer())

View File

@ -24,7 +24,7 @@ public class RSocketMagixEndpoint(
public val rSocket: RSocket, public val rSocket: RSocket,
) : MagixEndpoint { ) : MagixEndpoint {
override suspend fun <T> subscribe( override fun <T> subscribe(
payloadSerializer: KSerializer<T>, payloadSerializer: KSerializer<T>,
filter: MagixMessageFilter, filter: MagixMessageFilter,
): Flow<MagixMessage<T>> { ): Flow<MagixMessage<T>> {

View File

@ -142,6 +142,7 @@ class PiMotionMasterDevice(
/** /**
* Send a synchronous request and receive a list of lines as a response * Send a synchronous request and receive a list of lines as a response
*/ */
@OptIn(ExperimentalCoroutinesApi::class)
private suspend fun request(command: String, vararg arguments: String): List<String> = mutex.withLock { private suspend fun request(command: String, vararg arguments: String): List<String> = mutex.withLock {
try { try {
withTimeout(timeoutValue) { withTimeout(timeoutValue) {