Device plugin refactoring and waltz client.
This commit is contained in:
parent
06f52a73bc
commit
9c5b6db9d1
@ -7,17 +7,32 @@ val ktorVersion: String by extra("1.3.2")
|
||||
|
||||
|
||||
kotlin {
|
||||
js {
|
||||
browser {
|
||||
dceTask {
|
||||
keep("ktor-ktor-io.\$\$importsForInline\$\$.ktor-ktor-io.io.ktor.utils.io")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sourceSets {
|
||||
commonMain{
|
||||
commonMain {
|
||||
dependencies {
|
||||
implementation(project(":dataforge-device-core"))
|
||||
implementation("io.ktor:ktor-client-core:$ktorVersion")
|
||||
}
|
||||
}
|
||||
jvmMain{
|
||||
jvmMain {
|
||||
dependencies {
|
||||
implementation("io.ktor:ktor-client-cio:$ktorVersion")
|
||||
}
|
||||
}
|
||||
jsMain {
|
||||
dependencies {
|
||||
implementation("io.ktor:ktor-client-js:$ktorVersion")
|
||||
implementation(npm("text-encoding", "0.7.0"))
|
||||
implementation(npm("abort-controller", "3.0.0"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,84 @@
|
||||
package hep.dataforge.control.client
|
||||
|
||||
import hep.dataforge.control.api.respondMessage
|
||||
import hep.dataforge.control.controllers.DeviceManager
|
||||
import hep.dataforge.control.controllers.DeviceMessage
|
||||
import hep.dataforge.meta.toJson
|
||||
import io.ktor.client.HttpClient
|
||||
import io.ktor.client.request.post
|
||||
import io.ktor.http.ContentType
|
||||
import io.ktor.http.Url
|
||||
import io.ktor.http.contentType
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.serialization.json.JsonObject
|
||||
import kotlinx.serialization.json.json
|
||||
|
||||
/*
|
||||
{
|
||||
"id":"string|number[optional, but desired]",
|
||||
"parentId": "string|number[optional]",
|
||||
"target":"string[optional]",
|
||||
"origin":"string[required]",
|
||||
"user":"string[optional]",
|
||||
"action":"string[optional, default='heartbeat']",
|
||||
"payload":"object[optional]"
|
||||
}
|
||||
*/
|
||||
|
||||
/**
|
||||
* A stateful unique id generator
|
||||
*/
|
||||
interface IdGenerator{
|
||||
operator fun invoke(message: DeviceMessage): String
|
||||
}
|
||||
|
||||
object MagixClient {
|
||||
/**
|
||||
* Convert a [DeviceMessage] to [Waltz format](https://github.com/waltz-controls/rfc/tree/master/1)
|
||||
*/
|
||||
fun DeviceMessage.toWaltz(id: String, parentId: String? = null): JsonObject = json {
|
||||
"id" to id
|
||||
if (parentId != null) {
|
||||
"parentId" to parentId
|
||||
}
|
||||
"target" to "magix"
|
||||
"origin" to "df"
|
||||
"payload" to config.toJson()
|
||||
}
|
||||
|
||||
fun buildCallback(url: Url, idGenerator: IdGenerator): suspend (DeviceMessage) -> Unit {
|
||||
val client = HttpClient()
|
||||
return { message ->
|
||||
client.post(url) {
|
||||
val messageId = idGenerator(message)
|
||||
val waltzMessage = message.toWaltz(messageId)
|
||||
this.contentType(ContentType.Application.Json)
|
||||
body = waltzMessage.toString()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Event loop for magix input and output flows
|
||||
*/
|
||||
fun DeviceManager.startMagix(
|
||||
inbox: Flow<DeviceMessage>, // Inbox flow like SSE
|
||||
outbox: suspend (DeviceMessage) -> Unit // outbox callback
|
||||
): Job = context.launch {
|
||||
launch {
|
||||
controller.messageOutput().collect { message ->
|
||||
outbox.invoke(message)
|
||||
}
|
||||
}
|
||||
launch {
|
||||
inbox.collect { message ->
|
||||
val response = respondMessage(message)
|
||||
outbox.invoke(response)
|
||||
}
|
||||
}
|
||||
}
|
@ -1,72 +0,0 @@
|
||||
package hep.dataforge.control.client
|
||||
|
||||
import hep.dataforge.control.api.getDevice
|
||||
import hep.dataforge.control.controllers.DeviceManager
|
||||
import hep.dataforge.control.controllers.DeviceMessage
|
||||
import hep.dataforge.control.controllers.MessageController
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.toJson
|
||||
import hep.dataforge.meta.toMeta
|
||||
import hep.dataforge.meta.wrap
|
||||
import io.ktor.client.HttpClient
|
||||
import io.ktor.client.request.post
|
||||
import io.ktor.http.ContentType
|
||||
import io.ktor.http.Url
|
||||
import io.ktor.http.contentType
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.flow.*
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.serialization.json.JsonObject
|
||||
import kotlinx.serialization.json.json
|
||||
|
||||
/*
|
||||
{
|
||||
"id":"string|number[optional, but desired]",
|
||||
"parentId": "string|number[optional]",
|
||||
"target":"string[optional]",
|
||||
"origin":"string[required]",
|
||||
"user":"string[optional]",
|
||||
"action":"string[optional, default='heartbeat']",
|
||||
"payload":"object[optional]"
|
||||
}
|
||||
*/
|
||||
|
||||
/**
|
||||
* Convert a [DeviceMessage] to [Waltz format](https://github.com/waltz-controls/rfc/tree/master/1)
|
||||
*/
|
||||
fun DeviceMessage.toWaltz(id: String, parentId: String): JsonObject = json {
|
||||
"id" to id
|
||||
"parentId" to parentId
|
||||
"target" to "magix"
|
||||
"origin" to "df"
|
||||
"payload" to config.toJson()
|
||||
}
|
||||
|
||||
fun DeviceMessage.fromWaltz(json: JsonObject): DeviceMessage =
|
||||
DeviceMessage.wrap(json["payload"]?.jsonObject?.toMeta() ?: Meta.EMPTY)
|
||||
|
||||
fun DeviceManager.startWaltzClient(
|
||||
waltzUrl: Url,
|
||||
deviceNames: Collection<String> = devices.keys.map { it.toString() }
|
||||
): Job {
|
||||
|
||||
val controllers = deviceNames.map { name ->
|
||||
val device = getDevice(name)
|
||||
MessageController(device, name, context)
|
||||
}
|
||||
|
||||
val client = HttpClient()
|
||||
|
||||
val outputFlow = controllers.asFlow().flatMapMerge {
|
||||
it.output()
|
||||
}.filter { it.data == null }.map { DeviceMessage.wrap(it.meta) }
|
||||
|
||||
return context.launch {
|
||||
outputFlow.collect { message ->
|
||||
client.post(waltzUrl){
|
||||
this.contentType(ContentType.Application.Json)
|
||||
body = message.config.toJson().toString()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,8 +1,12 @@
|
||||
package hep.dataforge.control.api
|
||||
|
||||
import hep.dataforge.control.api.Device.Companion.ACTION_LIST_ACTION
|
||||
import hep.dataforge.control.api.Device.Companion.DEVICE_TARGET
|
||||
import hep.dataforge.control.api.Device.Companion.EXECUTE_ACTION
|
||||
import hep.dataforge.control.api.Device.Companion.GET_PROPERTY_ACTION
|
||||
import hep.dataforge.control.api.Device.Companion.PROPERTY_LIST_ACTION
|
||||
import hep.dataforge.control.api.Device.Companion.SET_PROPERTY_ACTION
|
||||
import hep.dataforge.control.controllers.DeviceMessage
|
||||
import hep.dataforge.control.controllers.MessageController
|
||||
import hep.dataforge.control.controllers.MessageData
|
||||
import hep.dataforge.io.Envelope
|
||||
import hep.dataforge.io.Responder
|
||||
@ -84,6 +88,11 @@ interface Device: Closeable, Responder {
|
||||
|
||||
companion object{
|
||||
const val DEVICE_TARGET = "device"
|
||||
const val GET_PROPERTY_ACTION = "read"
|
||||
const val SET_PROPERTY_ACTION = "write"
|
||||
const val EXECUTE_ACTION = "execute"
|
||||
const val PROPERTY_LIST_ACTION = "propertyList"
|
||||
const val ACTION_LIST_ACTION = "actionList"
|
||||
}
|
||||
}
|
||||
|
||||
@ -91,7 +100,7 @@ suspend fun Device.respondMessage(
|
||||
request: DeviceMessage
|
||||
): DeviceMessage {
|
||||
val result: List<MessageData> = when (val action = request.type) {
|
||||
MessageController.GET_PROPERTY_ACTION -> {
|
||||
GET_PROPERTY_ACTION -> {
|
||||
request.data.map { property ->
|
||||
MessageData {
|
||||
name = property.name
|
||||
@ -99,7 +108,7 @@ suspend fun Device.respondMessage(
|
||||
}
|
||||
}
|
||||
}
|
||||
MessageController.SET_PROPERTY_ACTION -> {
|
||||
SET_PROPERTY_ACTION -> {
|
||||
request.data.map { property ->
|
||||
val propertyName: String = property.name
|
||||
val propertyValue = property.value
|
||||
@ -114,7 +123,7 @@ suspend fun Device.respondMessage(
|
||||
}
|
||||
}
|
||||
}
|
||||
MessageController.EXECUTE_ACTION -> {
|
||||
EXECUTE_ACTION -> {
|
||||
request.data.map { payload ->
|
||||
MessageData {
|
||||
name = payload.name
|
||||
@ -122,7 +131,7 @@ suspend fun Device.respondMessage(
|
||||
}
|
||||
}
|
||||
}
|
||||
MessageController.PROPERTY_LIST_ACTION -> {
|
||||
PROPERTY_LIST_ACTION -> {
|
||||
propertyDescriptors.map { descriptor ->
|
||||
MessageData {
|
||||
name = descriptor.name
|
||||
@ -131,7 +140,7 @@ suspend fun Device.respondMessage(
|
||||
}
|
||||
}
|
||||
|
||||
MessageController.ACTION_LIST_ACTION -> {
|
||||
ACTION_LIST_ACTION -> {
|
||||
actionDescriptors.map { descriptor ->
|
||||
MessageData {
|
||||
name = descriptor.name
|
||||
|
@ -1,9 +1,11 @@
|
||||
package hep.dataforge.control.api
|
||||
|
||||
import hep.dataforge.control.controllers.DeviceMessage
|
||||
import hep.dataforge.io.Envelope
|
||||
import hep.dataforge.meta.MetaItem
|
||||
import hep.dataforge.meta.get
|
||||
import hep.dataforge.meta.string
|
||||
import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.NameToken
|
||||
import hep.dataforge.names.asName
|
||||
import hep.dataforge.names.toName
|
||||
import hep.dataforge.provider.Provider
|
||||
|
||||
@ -11,13 +13,15 @@ import hep.dataforge.provider.Provider
|
||||
* A hub that could locate multiple devices and redirect actions to them
|
||||
*/
|
||||
interface DeviceHub : Provider {
|
||||
val devices: Map<NameToken, Device>
|
||||
val devices: Map<Name, Device>
|
||||
|
||||
override val defaultTarget: String get() = Device.DEVICE_TARGET
|
||||
|
||||
override val defaultChainTarget: String get() = Device.DEVICE_TARGET
|
||||
|
||||
override fun provideTop(target: String): Map<Name, Any> {
|
||||
if (target == Device.DEVICE_TARGET) {
|
||||
return devices.mapKeys { it.key.asName() }
|
||||
return devices
|
||||
} else {
|
||||
throw IllegalArgumentException("Target $target is not supported for $this")
|
||||
}
|
||||
@ -28,31 +32,30 @@ interface DeviceHub : Provider {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve the device by its full name if it is present. Hubs are resolved recursively.
|
||||
*/
|
||||
fun DeviceHub.getDevice(name: Name): Device = when (name.length) {
|
||||
0 -> (this as? Device) ?: error("The DeviceHub is resolved by name but it is not a Device")
|
||||
1 -> {
|
||||
val token = name.first()!!
|
||||
devices[token] ?: error("Device with name $token not found in the hub $this")
|
||||
}
|
||||
else -> {
|
||||
val hub = getDevice(name.cutLast()) as? DeviceHub
|
||||
?: error("The device with name ${name.cutLast()} does not exist or is not a hub")
|
||||
hub.getDevice(name.last()!!.asName())
|
||||
}
|
||||
operator fun DeviceHub.get(deviceName: Name) =
|
||||
devices[deviceName] ?: error("Device with name $deviceName not found in $this")
|
||||
|
||||
operator fun DeviceHub.get(deviceName: String) = get(deviceName.toName())
|
||||
|
||||
suspend fun DeviceHub.getProperty(deviceName: Name, propertyName: String): MetaItem<*> =
|
||||
this[deviceName].getProperty(propertyName)
|
||||
|
||||
suspend fun DeviceHub.setProperty(deviceName: Name, propertyName: String, value: MetaItem<*>) {
|
||||
this[deviceName].setProperty(propertyName, value)
|
||||
}
|
||||
|
||||
suspend fun DeviceHub.exec(deviceName: Name, command: String, argument: MetaItem<*>?): MetaItem<*>? =
|
||||
this[deviceName].exec(command, argument)
|
||||
|
||||
fun DeviceHub.getDevice(deviceName: String) = getDevice(deviceName.toName())
|
||||
suspend fun DeviceHub.respondMessage(request: DeviceMessage): DeviceMessage {
|
||||
val device = this[request.target?.toName() ?: Name.EMPTY]
|
||||
|
||||
suspend fun DeviceHub.getProperty(deviceName: String, propertyName: String): MetaItem<*> =
|
||||
getDevice(deviceName).getProperty(propertyName)
|
||||
|
||||
suspend fun DeviceHub.setProperty(deviceName: String, propertyName: String, value: MetaItem<*>) {
|
||||
getDevice(deviceName).setProperty(propertyName, value)
|
||||
return device.respondMessage(request)
|
||||
}
|
||||
|
||||
suspend fun DeviceHub.exec(deviceName: String, command: String, argument: MetaItem<*>?): MetaItem<*>? =
|
||||
getDevice(deviceName).exec(command, argument)
|
||||
suspend fun DeviceHub.respond(request: Envelope): Envelope {
|
||||
val target = request.meta[DeviceMessage.TARGET_KEY].string
|
||||
val device = this[target?.toName() ?: Name.EMPTY]
|
||||
|
||||
return device.respond(request)
|
||||
}
|
@ -4,6 +4,7 @@ import hep.dataforge.control.api.Device
|
||||
import hep.dataforge.control.api.DeviceListener
|
||||
import hep.dataforge.control.api.respondMessage
|
||||
import hep.dataforge.control.controllers.DeviceMessage.Companion.PROPERTY_CHANGED_ACTION
|
||||
import hep.dataforge.io.Consumer
|
||||
import hep.dataforge.io.Envelope
|
||||
import hep.dataforge.io.Responder
|
||||
import hep.dataforge.io.SimpleEnvelope
|
||||
@ -14,14 +15,7 @@ import kotlinx.coroutines.flow.consumeAsFlow
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.io.Binary
|
||||
|
||||
/**
|
||||
* A consumer of envelopes
|
||||
*/
|
||||
interface Consumer {
|
||||
fun consume(message: Envelope): Unit
|
||||
}
|
||||
|
||||
class MessageController(
|
||||
class DeviceController(
|
||||
val device: Device,
|
||||
val deviceTarget: String,
|
||||
val scope: CoroutineScope = device.scope
|
||||
@ -95,10 +89,6 @@ class MessageController(
|
||||
|
||||
|
||||
companion object {
|
||||
const val GET_PROPERTY_ACTION = "read"
|
||||
const val SET_PROPERTY_ACTION = "write"
|
||||
const val EXECUTE_ACTION = "execute"
|
||||
const val PROPERTY_LIST_ACTION = "propertyList"
|
||||
const val ACTION_LIST_ACTION = "actionList"
|
||||
|
||||
}
|
||||
}
|
@ -8,7 +8,6 @@ import hep.dataforge.control.api.Device
|
||||
import hep.dataforge.control.api.DeviceHub
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.NameToken
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
class DeviceManager : AbstractPlugin(), DeviceHub {
|
||||
@ -17,12 +16,15 @@ class DeviceManager : AbstractPlugin(), DeviceHub {
|
||||
/**
|
||||
* Actual list of connected devices
|
||||
*/
|
||||
private val top = HashMap<NameToken, Device>()
|
||||
override val devices: Map<NameToken, Device> get() = top
|
||||
private val top = HashMap<Name, Device>()
|
||||
override val devices: Map<Name, Device> get() = top
|
||||
|
||||
fun registerDevice(name: String, device: Device, index: String? = null) {
|
||||
val token = NameToken(name, index)
|
||||
top[token] = device
|
||||
val controller by lazy {
|
||||
HubController(this, context)
|
||||
}
|
||||
|
||||
fun registerDevice(name: Name, device: Device) {
|
||||
top[name] = device
|
||||
}
|
||||
|
||||
override fun provideTop(target: String): Map<Name, Any> = super<DeviceHub>.provideTop(target)
|
||||
@ -36,4 +38,5 @@ class DeviceManager : AbstractPlugin(), DeviceHub {
|
||||
}
|
||||
|
||||
|
||||
val Context.devices: DeviceManager get() = plugins.fetch(DeviceManager)
|
||||
val Context.devices: DeviceManager get() = plugins.fetch(DeviceManager)
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
package hep.dataforge.control.controllers
|
||||
|
||||
import hep.dataforge.control.controllers.DeviceMessage.Companion.DATA_VALUE_KEY
|
||||
import hep.dataforge.control.api.Device.Companion.GET_PROPERTY_ACTION
|
||||
import hep.dataforge.io.SimpleEnvelope
|
||||
import hep.dataforge.meta.*
|
||||
import hep.dataforge.names.asName
|
||||
@ -10,9 +10,9 @@ import kotlinx.serialization.KSerializer
|
||||
import kotlinx.serialization.SerialDescriptor
|
||||
|
||||
class DeviceMessage : Scheme() {
|
||||
var source by string()
|
||||
var target by string()
|
||||
var type by string(default = MessageController.GET_PROPERTY_ACTION, key = MESSAGE_TYPE_KEY)
|
||||
var source by string(key = SOURCE_KEY)
|
||||
var target by string(key = TARGET_KEY)
|
||||
var type by string(default = GET_PROPERTY_ACTION, key = MESSAGE_TYPE_KEY)
|
||||
var comment by string()
|
||||
var status by string(RESPONSE_OK_STATUS)
|
||||
var data: List<MessageData>
|
||||
@ -28,9 +28,11 @@ class DeviceMessage : Scheme() {
|
||||
spec.invoke(block).also { config.append(MESSAGE_DATA_KEY, it) }
|
||||
|
||||
companion object : SchemeSpec<DeviceMessage>(::DeviceMessage), KSerializer<DeviceMessage> {
|
||||
val MESSAGE_TYPE_KEY = "action".asName()
|
||||
val SOURCE_KEY = "source".asName()
|
||||
val TARGET_KEY = "target".asName()
|
||||
val MESSAGE_TYPE_KEY = "type".asName()
|
||||
val MESSAGE_DATA_KEY = "data".asName()
|
||||
val DATA_VALUE_KEY = "value".asName()
|
||||
|
||||
const val RESPONSE_OK_STATUS = "response.OK"
|
||||
const val RESPONSE_FAIL_STATUS = "response.FAIL"
|
||||
const val PROPERTY_CHANGED_ACTION = "event.propertyChange"
|
||||
@ -67,7 +69,9 @@ class MessageData : Scheme() {
|
||||
var name by string { error("Property name could not be empty") }
|
||||
var value by item(key = DATA_VALUE_KEY)
|
||||
|
||||
companion object : SchemeSpec<MessageData>(::MessageData)
|
||||
companion object : SchemeSpec<MessageData>(::MessageData) {
|
||||
val DATA_VALUE_KEY = "value".asName()
|
||||
}
|
||||
}
|
||||
|
||||
@DFBuilder
|
||||
|
@ -0,0 +1,103 @@
|
||||
package hep.dataforge.control.controllers
|
||||
|
||||
import hep.dataforge.control.api.DeviceHub
|
||||
import hep.dataforge.control.api.DeviceListener
|
||||
import hep.dataforge.control.api.get
|
||||
import hep.dataforge.control.api.respondMessage
|
||||
import hep.dataforge.io.Consumer
|
||||
import hep.dataforge.io.Envelope
|
||||
import hep.dataforge.io.Responder
|
||||
import hep.dataforge.io.SimpleEnvelope
|
||||
import hep.dataforge.meta.*
|
||||
import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.toName
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.flow.consumeAsFlow
|
||||
import kotlinx.coroutines.isActive
|
||||
import kotlinx.coroutines.launch
|
||||
|
||||
class HubController(
|
||||
val hub: DeviceHub,
|
||||
val scope: CoroutineScope
|
||||
) : Consumer, Responder {
|
||||
|
||||
private val messageOutbox = Channel<DeviceMessage>(Channel.CONFLATED)
|
||||
|
||||
private val envelopeOutbox = Channel<Envelope>(Channel.CONFLATED)
|
||||
|
||||
fun messageOutput() = messageOutbox.consumeAsFlow()
|
||||
|
||||
fun envelopeOutput() = envelopeOutbox.consumeAsFlow()
|
||||
|
||||
private val packJob = scope.launch {
|
||||
while (isActive) {
|
||||
val message = messageOutbox.receive()
|
||||
envelopeOutbox.send(message.wrap())
|
||||
}
|
||||
}
|
||||
|
||||
private val listeners: Map<Name, DeviceListener> = hub.devices.mapValues { (name, device) ->
|
||||
object : DeviceListener {
|
||||
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
|
||||
if (value == null) return
|
||||
scope.launch {
|
||||
val change = DeviceMessage.ok {
|
||||
source = name.toString()
|
||||
type = DeviceMessage.PROPERTY_CHANGED_ACTION
|
||||
data {
|
||||
this.name = propertyName
|
||||
this.value = value
|
||||
}
|
||||
}
|
||||
|
||||
messageOutbox.send(change)
|
||||
}
|
||||
}
|
||||
}.also {
|
||||
device.registerListener(it)
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun respondMessage(message: DeviceMessage): DeviceMessage {
|
||||
return try {
|
||||
val targetName = message.target?.toName() ?: Name.EMPTY
|
||||
val device = hub[targetName]
|
||||
device.respondMessage(message).apply {
|
||||
target = message.source
|
||||
source = targetName.toString()
|
||||
}
|
||||
} catch (ex: Exception) {
|
||||
DeviceMessage.fail {
|
||||
comment = ex.message
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun respond(request: Envelope): Envelope {
|
||||
val targetName = request.meta[DeviceMessage.TARGET_KEY].string?.toName() ?: Name.EMPTY
|
||||
return try {
|
||||
val device = hub[targetName]
|
||||
if (request.data == null) {
|
||||
respondMessage(DeviceMessage.wrap(request.meta)).wrap()
|
||||
} else {
|
||||
val response = device.respond(request)
|
||||
return SimpleEnvelope(response.meta.edit {
|
||||
DeviceMessage.TARGET_KEY put request.meta[DeviceMessage.SOURCE_KEY].string
|
||||
DeviceMessage.SOURCE_KEY put targetName.toString()
|
||||
}, response.data)
|
||||
}
|
||||
} catch (ex: Exception) {
|
||||
DeviceMessage.fail {
|
||||
comment = ex.message
|
||||
}.wrap()
|
||||
}
|
||||
}
|
||||
|
||||
override fun consume(message: Envelope) {
|
||||
// Fire the respond procedure and forget about the result
|
||||
scope.launch {
|
||||
respond(message)
|
||||
}
|
||||
}
|
||||
}
|
@ -2,12 +2,12 @@
|
||||
|
||||
package hep.dataforge.control.server
|
||||
|
||||
import hep.dataforge.control.api.getDevice
|
||||
import hep.dataforge.control.api.Device.Companion.GET_PROPERTY_ACTION
|
||||
import hep.dataforge.control.api.Device.Companion.SET_PROPERTY_ACTION
|
||||
import hep.dataforge.control.api.get
|
||||
import hep.dataforge.control.api.respondMessage
|
||||
import hep.dataforge.control.controllers.DeviceManager
|
||||
import hep.dataforge.control.controllers.DeviceMessage
|
||||
import hep.dataforge.control.controllers.MessageController
|
||||
import hep.dataforge.control.controllers.MessageController.Companion.GET_PROPERTY_ACTION
|
||||
import hep.dataforge.control.controllers.MessageController.Companion.SET_PROPERTY_ACTION
|
||||
import hep.dataforge.control.controllers.data
|
||||
import hep.dataforge.meta.toJson
|
||||
import hep.dataforge.meta.toMeta
|
||||
@ -32,9 +32,7 @@ import io.ktor.websocket.webSocket
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.FlowPreview
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.flow.flatMapMerge
|
||||
import kotlinx.html.*
|
||||
import kotlinx.serialization.UnstableDefault
|
||||
import kotlinx.serialization.json.Json
|
||||
@ -79,68 +77,22 @@ fun ApplicationEngine.whenStarted(callback: Application.() -> Unit) {
|
||||
|
||||
const val WEB_SERVER_TARGET = "@webServer"
|
||||
|
||||
private suspend fun ApplicationCall.message(target: MessageController) {
|
||||
val body = receiveText()
|
||||
val json = Json.parseJson(body) as? JsonObject
|
||||
?: throw IllegalArgumentException("The body is not a json object")
|
||||
val meta = json.toMeta()
|
||||
|
||||
val request = DeviceMessage.wrap(meta)
|
||||
|
||||
val response = target.respondMessage(request)
|
||||
respondMessage(response)
|
||||
}
|
||||
|
||||
private suspend fun ApplicationCall.getProperty(target: MessageController) {
|
||||
val property: String by parameters
|
||||
val request = DeviceMessage {
|
||||
type = GET_PROPERTY_ACTION
|
||||
source = WEB_SERVER_TARGET
|
||||
this.target = target.deviceTarget
|
||||
data {
|
||||
name = property
|
||||
}
|
||||
}
|
||||
|
||||
val response = target.respondMessage(request)
|
||||
respondMessage(response)
|
||||
}
|
||||
|
||||
private suspend fun ApplicationCall.setProperty(target: MessageController) {
|
||||
val property: String by parameters
|
||||
val body = receiveText()
|
||||
val json = Json.parseJson(body)
|
||||
|
||||
val request = DeviceMessage {
|
||||
type = SET_PROPERTY_ACTION
|
||||
source = WEB_SERVER_TARGET
|
||||
this.target = target.deviceTarget
|
||||
data {
|
||||
name = property
|
||||
value = json.toMetaItem()
|
||||
}
|
||||
}
|
||||
|
||||
val response = target.respondMessage(request)
|
||||
respondMessage(response)
|
||||
}
|
||||
|
||||
@OptIn(KtorExperimentalAPI::class)
|
||||
fun Application.deviceModule(
|
||||
manager: DeviceManager,
|
||||
deviceNames: Collection<String> = manager.devices.keys.map { it.toString() },
|
||||
route: String = "/"
|
||||
) {
|
||||
val controllers = deviceNames.associateWith { name ->
|
||||
val device = manager.getDevice(name)
|
||||
MessageController(device, name, manager.context)
|
||||
}
|
||||
|
||||
fun generateFlow(target: String?) = if (target == null) {
|
||||
controllers.values.asFlow().flatMapMerge { it.output() }
|
||||
} else {
|
||||
controllers[target]?.output() ?: error("The device with target $target not found")
|
||||
}
|
||||
// val controllers = deviceNames.associateWith { name ->
|
||||
// val device = manager.devices[name.toName()]
|
||||
// DeviceController(device, name, manager.context)
|
||||
// }
|
||||
//
|
||||
// fun generateFlow(target: String?) = if (target == null) {
|
||||
// controllers.values.asFlow().flatMapMerge { it.output() }
|
||||
// } else {
|
||||
// controllers[target]?.output() ?: error("The device with target $target not found")
|
||||
// }
|
||||
|
||||
if (featureOrNull(WebSockets) == null) {
|
||||
install(WebSockets)
|
||||
@ -164,7 +116,7 @@ fun Application.deviceModule(
|
||||
+"Device server dashboard"
|
||||
}
|
||||
deviceNames.forEach { deviceName ->
|
||||
val device = controllers[deviceName]!!.device
|
||||
val device = manager[deviceName]
|
||||
div {
|
||||
id = deviceName
|
||||
h2 { +deviceName }
|
||||
@ -198,9 +150,8 @@ fun Application.deviceModule(
|
||||
|
||||
get("list") {
|
||||
call.respondJson {
|
||||
controllers.values.forEach { controller ->
|
||||
"target" to controller.deviceTarget
|
||||
val device = controller.device
|
||||
manager.devices.forEach { (name, device) ->
|
||||
"target" to name.toString()
|
||||
"properties" to jsonArray {
|
||||
device.propertyDescriptors.forEach { descriptor ->
|
||||
+descriptor.config.toJson()
|
||||
@ -223,7 +174,7 @@ fun Application.deviceModule(
|
||||
try {
|
||||
application.log.debug("Opened server socket for ${call.request.queryParameters}")
|
||||
|
||||
generateFlow(target).collect {
|
||||
manager.controller.envelopeOutput().collect {
|
||||
outgoing.send(it.toFrame())
|
||||
}
|
||||
|
||||
@ -235,9 +186,16 @@ fun Application.deviceModule(
|
||||
|
||||
post("message") {
|
||||
val target: String by call.request.queryParameters
|
||||
val controller =
|
||||
controllers[target] ?: throw IllegalArgumentException("Target $target not found in $controllers")
|
||||
call.message(controller)
|
||||
val device = manager[target]
|
||||
val body = call.receiveText()
|
||||
val json = Json.parseJson(body) as? JsonObject
|
||||
?: throw IllegalArgumentException("The body is not a json object")
|
||||
val meta = json.toMeta()
|
||||
|
||||
val request = DeviceMessage.wrap(meta)
|
||||
|
||||
val response = device.respondMessage(request)
|
||||
call.respondMessage(response)
|
||||
}
|
||||
|
||||
route("{target}") {
|
||||
@ -246,18 +204,40 @@ fun Application.deviceModule(
|
||||
route("{property}") {
|
||||
get("get") {
|
||||
val target: String by call.parameters
|
||||
val controller = controllers[target]
|
||||
?: throw IllegalArgumentException("Target $target not found in $controllers")
|
||||
val device = manager[target]
|
||||
val property: String by call.parameters
|
||||
val request = DeviceMessage {
|
||||
type = GET_PROPERTY_ACTION
|
||||
source = WEB_SERVER_TARGET
|
||||
this.target = target
|
||||
data {
|
||||
name = property
|
||||
}
|
||||
}
|
||||
|
||||
call.getProperty(controller)
|
||||
val response = device.respondMessage(request)
|
||||
call.respondMessage(response)
|
||||
}
|
||||
post("set") {
|
||||
val target: String by call.parameters
|
||||
val controller =
|
||||
controllers[target]
|
||||
?: throw IllegalArgumentException("Target $target not found in $controllers")
|
||||
val device = manager[target]
|
||||
|
||||
call.setProperty(controller)
|
||||
val property: String by call.parameters
|
||||
val body = call.receiveText()
|
||||
val json = Json.parseJson(body)
|
||||
|
||||
val request = DeviceMessage {
|
||||
type = SET_PROPERTY_ACTION
|
||||
source = WEB_SERVER_TARGET
|
||||
this.target = target
|
||||
data {
|
||||
name = property
|
||||
value = json.toMetaItem()
|
||||
}
|
||||
}
|
||||
|
||||
val response = device.respondMessage(request)
|
||||
call.respondMessage(response)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ repositories{
|
||||
dependencies{
|
||||
implementation(project(":dataforge-device-core"))
|
||||
implementation(project(":dataforge-device-server"))
|
||||
implementation(project(":dataforge-device-client"))
|
||||
implementation("no.tornado:tornadofx:1.7.20")
|
||||
implementation(kotlin("stdlib-jdk8"))
|
||||
implementation("scientifik:plotlykt-server:$plotlyVersion")
|
||||
|
@ -6,6 +6,7 @@ import hep.dataforge.control.server.startDeviceServer
|
||||
import hep.dataforge.control.server.whenStarted
|
||||
import hep.dataforge.meta.double
|
||||
import hep.dataforge.meta.invoke
|
||||
import hep.dataforge.names.asName
|
||||
import io.ktor.server.engine.ApplicationEngine
|
||||
import kotlinx.coroutines.flow.*
|
||||
import kotlinx.coroutines.launch
|
||||
@ -49,7 +50,7 @@ suspend fun Trace.updateXYFrom(flow: Flow<Iterable<Pair<Double, Double>>>) {
|
||||
|
||||
|
||||
fun startDemoDeviceServer(context: Context, device: DemoDevice): ApplicationEngine {
|
||||
context.devices.registerDevice("demo", device)
|
||||
context.devices.registerDevice("demo".asName(), device)
|
||||
val server = context.startDeviceServer(context.devices)
|
||||
server.whenStarted {
|
||||
plotlyModule("plots").apply {
|
||||
|
Loading…
Reference in New Issue
Block a user