Magix client refactoring

This commit is contained in:
Alexander Nozik 2020-07-30 16:15:29 +03:00
parent 9c5b6db9d1
commit aa58674a23
2 changed files with 63 additions and 47 deletions

View File

@ -2,8 +2,7 @@
# DataForge-control
DataForge-control is a data acquisition framework (work in progress). It is
based on DataForge, a software framework for automated data processing.
DataForge-control is a data acquisition framework (work in progress). It is based on DataForge, a software framework for automated data processing.
This repository contains a prototype of API and simple implementation
of a slow control system, including a demo.
@ -37,7 +36,7 @@ Generally, a Device has Properties that can be read and written. Also, some Acti
can optionally be applied on a device (may or may not affect properties).
- `base` - contains baseline `Device` implementation
[`DeviceBase`](dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt)
[`DeviceBase`](dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt)
and property implementation, including property asynchronous flows.
- `controllers` - implements Message Controller that can be attached to the event bus, Message

View File

@ -4,17 +4,21 @@ import hep.dataforge.control.api.respondMessage
import hep.dataforge.control.controllers.DeviceManager
import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.meta.toJson
import hep.dataforge.meta.toMeta
import hep.dataforge.meta.wrap
import io.ktor.client.HttpClient
import io.ktor.client.request.post
import io.ktor.http.ContentType
import io.ktor.http.Url
import io.ktor.http.contentType
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.json
import kotlin.coroutines.CoroutineContext
/*
{
@ -28,57 +32,70 @@ import kotlinx.serialization.json.json
}
*/
/**
* A stateful unique id generator
*/
interface IdGenerator{
operator fun invoke(message: DeviceMessage): String
}
object MagixClient {
/**
* Convert a [DeviceMessage] to [Waltz format](https://github.com/waltz-controls/rfc/tree/master/1)
/**
* Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1)
*/
fun DeviceMessage.toWaltz(id: String, parentId: String? = null): JsonObject = json {
"id" to id
if (parentId != null) {
"parentId" to parentId
class MagixClient(
val manager: DeviceManager,
val postUrl: Url,
val inbox: Flow<JsonObject>
): CoroutineScope {
override val coroutineContext: CoroutineContext = manager.context.coroutineContext + Job(manager.context.coroutineContext[Job])
private val client = HttpClient()
protected fun generateId(message: DeviceMessage, requestId: String?): String = if(requestId != null){
"$requestId.response"
} else{
"df[${message.hashCode()}"
}
private fun send(json: JsonObject) {
launch {
client.post<Unit>(postUrl) {
this.contentType(ContentType.Application.Json)
body = json.toString()
}
}
}
private fun wrapMessage(message: DeviceMessage, requestId: String? = null): JsonObject = json {
"id" to generateId(message, requestId)
if (requestId != null) {
"parentId" to requestId
}
"target" to "magix"
"origin" to "df"
"payload" to config.toJson()
"payload" to message.config.toJson()
}
fun buildCallback(url: Url, idGenerator: IdGenerator): suspend (DeviceMessage) -> Unit {
val client = HttpClient()
return { message ->
client.post(url) {
val messageId = idGenerator(message)
val waltzMessage = message.toWaltz(messageId)
this.contentType(ContentType.Application.Json)
body = waltzMessage.toString()
}
private val listenJob = launch {
manager.controller.messageOutput().collect { message ->
val json = wrapMessage(message)
send(json)
}
}
}
private val respondJob = launch {
inbox.collect { json ->
val requestId = json["id"]?.primitive?.content
val payload = json["payload"]?.jsonObject
//TODO analyze action
/**
* Event loop for magix input and output flows
*/
fun DeviceManager.startMagix(
inbox: Flow<DeviceMessage>, // Inbox flow like SSE
outbox: suspend (DeviceMessage) -> Unit // outbox callback
): Job = context.launch {
launch {
controller.messageOutput().collect { message ->
outbox.invoke(message)
if(payload != null){
val meta = payload.toMeta()
val request = DeviceMessage.wrap(meta)
val response = manager.respondMessage(request)
send(wrapMessage(response,requestId))
} else {
TODO("process heartbeat and other system messages")
}
}
launch {
inbox.collect { message ->
val response = respondMessage(message)
outbox.invoke(response)
}
}
}