From aa58674a23ae23296b3fc7c5018427a9ee2e258a Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Thu, 30 Jul 2020 16:15:29 +0300 Subject: [PATCH] Magix client refactoring --- README.md | 5 +- .../dataforge/control/client/MagixClient.kt | 105 ++++++++++-------- 2 files changed, 63 insertions(+), 47 deletions(-) diff --git a/README.md b/README.md index 6218ae3..8e9fb54 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,7 @@ # DataForge-control -DataForge-control is a data acquisition framework (work in progress). It is -based on DataForge, a software framework for automated data processing. +DataForge-control is a data acquisition framework (work in progress). It is based on DataForge, a software framework for automated data processing. This repository contains a prototype of API and simple implementation of a slow control system, including a demo. @@ -37,7 +36,7 @@ Generally, a Device has Properties that can be read and written. Also, some Acti can optionally be applied on a device (may or may not affect properties). - `base` - contains baseline `Device` implementation -[`DeviceBase`](dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt) +[`DeviceBase`](dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt) and property implementation, including property asynchronous flows. - `controllers` - implements Message Controller that can be attached to the event bus, Message diff --git a/dataforge-device-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt b/dataforge-device-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt index fc28e60..d2e82d2 100644 --- a/dataforge-device-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt +++ b/dataforge-device-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt @@ -4,17 +4,21 @@ import hep.dataforge.control.api.respondMessage import hep.dataforge.control.controllers.DeviceManager import hep.dataforge.control.controllers.DeviceMessage import hep.dataforge.meta.toJson +import hep.dataforge.meta.toMeta +import hep.dataforge.meta.wrap import io.ktor.client.HttpClient import io.ktor.client.request.post import io.ktor.http.ContentType import io.ktor.http.Url import io.ktor.http.contentType +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.json +import kotlin.coroutines.CoroutineContext /* { @@ -28,57 +32,70 @@ import kotlinx.serialization.json.json } */ -/** - * A stateful unique id generator - */ -interface IdGenerator{ - operator fun invoke(message: DeviceMessage): String -} -object MagixClient { - /** - * Convert a [DeviceMessage] to [Waltz format](https://github.com/waltz-controls/rfc/tree/master/1) - */ - fun DeviceMessage.toWaltz(id: String, parentId: String? = null): JsonObject = json { - "id" to id - if (parentId != null) { - "parentId" to parentId - } - "target" to "magix" - "origin" to "df" - "payload" to config.toJson() +/** + * Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1) + */ +class MagixClient( + val manager: DeviceManager, + val postUrl: Url, + val inbox: Flow +): CoroutineScope { + + override val coroutineContext: CoroutineContext = manager.context.coroutineContext + Job(manager.context.coroutineContext[Job]) + + private val client = HttpClient() + + protected fun generateId(message: DeviceMessage, requestId: String?): String = if(requestId != null){ + "$requestId.response" + } else{ + "df[${message.hashCode()}" } - fun buildCallback(url: Url, idGenerator: IdGenerator): suspend (DeviceMessage) -> Unit { - val client = HttpClient() - return { message -> - client.post(url) { - val messageId = idGenerator(message) - val waltzMessage = message.toWaltz(messageId) + private fun send(json: JsonObject) { + launch { + client.post(postUrl) { this.contentType(ContentType.Application.Json) - body = waltzMessage.toString() + body = json.toString() } } } + private fun wrapMessage(message: DeviceMessage, requestId: String? = null): JsonObject = json { + "id" to generateId(message, requestId) + if (requestId != null) { + "parentId" to requestId + } + "target" to "magix" + "origin" to "df" + "payload" to message.config.toJson() + } + + + private val listenJob = launch { + manager.controller.messageOutput().collect { message -> + val json = wrapMessage(message) + send(json) + } + } + + private val respondJob = launch { + inbox.collect { json -> + val requestId = json["id"]?.primitive?.content + val payload = json["payload"]?.jsonObject + //TODO analyze action + + if(payload != null){ + val meta = payload.toMeta() + val request = DeviceMessage.wrap(meta) + val response = manager.respondMessage(request) + send(wrapMessage(response,requestId)) + } else { + TODO("process heartbeat and other system messages") + } + } + } } -/** - * Event loop for magix input and output flows - */ -fun DeviceManager.startMagix( - inbox: Flow, // Inbox flow like SSE - outbox: suspend (DeviceMessage) -> Unit // outbox callback -): Job = context.launch { - launch { - controller.messageOutput().collect { message -> - outbox.invoke(message) - } - } - launch { - inbox.collect { message -> - val response = respondMessage(message) - outbox.invoke(response) - } - } -} + +