diff --git a/build.gradle.kts b/build.gradle.kts index 9f13b1a..01e4311 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,4 +1,4 @@ -val dataforgeVersion by extra("0.1.8") +val dataforgeVersion by extra("0.1.9-dev") allprojects { diff --git a/dataforge-device-client/build.gradle.kts b/dataforge-device-client/build.gradle.kts index 1404ffe..0453db3 100644 --- a/dataforge-device-client/build.gradle.kts +++ b/dataforge-device-client/build.gradle.kts @@ -11,6 +11,11 @@ kotlin { commonMain{ dependencies { implementation(project(":dataforge-device-core")) + implementation("io.ktor:ktor-client-core:$ktorVersion") + } + } + jvmMain{ + dependencies { implementation("io.ktor:ktor-client-cio:$ktorVersion") } } diff --git a/dataforge-device-client/src/jvmMain/kotlin/hep/dataforge/control/client/waltzClient.kt b/dataforge-device-client/src/jvmMain/kotlin/hep/dataforge/control/client/waltzClient.kt new file mode 100644 index 0000000..0d49bb3 --- /dev/null +++ b/dataforge-device-client/src/jvmMain/kotlin/hep/dataforge/control/client/waltzClient.kt @@ -0,0 +1,72 @@ +package hep.dataforge.control.client + +import hep.dataforge.control.api.getDevice +import hep.dataforge.control.controllers.DeviceManager +import hep.dataforge.control.controllers.DeviceMessage +import hep.dataforge.control.controllers.MessageController +import hep.dataforge.meta.Meta +import hep.dataforge.meta.toJson +import hep.dataforge.meta.toMeta +import hep.dataforge.meta.wrap +import io.ktor.client.HttpClient +import io.ktor.client.request.post +import io.ktor.http.ContentType +import io.ktor.http.Url +import io.ktor.http.contentType +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.launch +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.json + +/* +{ + "id":"string|number[optional, but desired]", + "parentId": "string|number[optional]", + "target":"string[optional]", + "origin":"string[required]", + "user":"string[optional]", + "action":"string[optional, default='heartbeat']", + "payload":"object[optional]" +} + */ + +/** + * Convert a [DeviceMessage] to [Waltz format](https://github.com/waltz-controls/rfc/tree/master/1) + */ +fun DeviceMessage.toWaltz(id: String, parentId: String): JsonObject = json { + "id" to id + "parentId" to parentId + "target" to "magix" + "origin" to "df" + "payload" to config.toJson() +} + +fun DeviceMessage.fromWaltz(json: JsonObject): DeviceMessage = + DeviceMessage.wrap(json["payload"]?.jsonObject?.toMeta() ?: Meta.EMPTY) + +fun DeviceManager.startWaltzClient( + waltzUrl: Url, + deviceNames: Collection = devices.keys.map { it.toString() } +): Job { + + val controllers = deviceNames.map { name -> + val device = getDevice(name) + MessageController(device, name, context) + } + + val client = HttpClient() + + val outputFlow = controllers.asFlow().flatMapMerge { + it.output() + }.filter { it.data == null }.map { DeviceMessage.wrap(it.meta) } + + return context.launch { + outputFlow.collect { message -> + client.post(waltzUrl){ + this.contentType(ContentType.Application.Json) + body = message.config.toJson().toString() + } + } + } +} \ No newline at end of file diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt index d6c4cee..45c715c 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt @@ -1,5 +1,6 @@ package hep.dataforge.control.api +import hep.dataforge.control.api.Device.Companion.DEVICE_TARGET import hep.dataforge.control.controllers.DeviceMessage import hep.dataforge.control.controllers.MessageController import hep.dataforge.control.controllers.MessageData @@ -9,6 +10,7 @@ import hep.dataforge.io.SimpleEnvelope import hep.dataforge.meta.Meta import hep.dataforge.meta.MetaItem import hep.dataforge.meta.wrap +import hep.dataforge.provider.Type import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.cancel import kotlinx.io.Binary @@ -17,6 +19,7 @@ import kotlinx.io.Closeable /** * General interface describing a managed Device */ +@Type(DEVICE_TARGET) interface Device: Closeable, Responder { /** * List of supported property descriptors @@ -80,7 +83,7 @@ interface Device: Closeable, Responder { } companion object{ - + const val DEVICE_TARGET = "device" } } diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceHub.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceHub.kt index 3ad2014..d2c7862 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceHub.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceHub.kt @@ -1,23 +1,58 @@ package hep.dataforge.control.api import hep.dataforge.meta.MetaItem +import hep.dataforge.names.Name +import hep.dataforge.names.NameToken +import hep.dataforge.names.asName +import hep.dataforge.names.toName +import hep.dataforge.provider.Provider /** * A hub that could locate multiple devices and redirect actions to them */ -interface DeviceHub { - fun getDevice(deviceName: String): Device? +interface DeviceHub : Provider { + val devices: Map + + override val defaultTarget: String get() = Device.DEVICE_TARGET + + override fun provideTop(target: String): Map { + if (target == Device.DEVICE_TARGET) { + return devices.mapKeys { it.key.asName() } + } else { + throw IllegalArgumentException("Target $target is not supported for $this") + } + } + + companion object { + + } } +/** + * Resolve the device by its full name if it is present. Hubs are resolved recursively. + */ +fun DeviceHub.getDevice(name: Name): Device = when (name.length) { + 0 -> (this as? Device) ?: error("The DeviceHub is resolved by name but it is not a Device") + 1 -> { + val token = name.first()!! + devices[token] ?: error("Device with name $token not found in the hub $this") + } + else -> { + val hub = getDevice(name.cutLast()) as? DeviceHub + ?: error("The device with name ${name.cutLast()} does not exist or is not a hub") + hub.getDevice(name.last()!!.asName()) + } +} + + +fun DeviceHub.getDevice(deviceName: String) = getDevice(deviceName.toName()) + suspend fun DeviceHub.getProperty(deviceName: String, propertyName: String): MetaItem<*> = - (getDevice(deviceName) ?: error("Device with name $deviceName not found in the hub")) - .getProperty(propertyName) + getDevice(deviceName).getProperty(propertyName) suspend fun DeviceHub.setProperty(deviceName: String, propertyName: String, value: MetaItem<*>) { - (getDevice(deviceName) ?: error("Device with name $deviceName not found in the hub")) - .setProperty(propertyName, value) + getDevice(deviceName).setProperty(propertyName, value) } suspend fun DeviceHub.exec(deviceName: String, command: String, argument: MetaItem<*>?): MetaItem<*>? = - (getDevice(deviceName) ?: error("Device with name $deviceName not found in the hub")) - .exec(command, argument) \ No newline at end of file + getDevice(deviceName).exec(command, argument) \ No newline at end of file diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceManager.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceManager.kt new file mode 100644 index 0000000..3527689 --- /dev/null +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceManager.kt @@ -0,0 +1,39 @@ +package hep.dataforge.control.controllers + +import hep.dataforge.context.AbstractPlugin +import hep.dataforge.context.Context +import hep.dataforge.context.PluginFactory +import hep.dataforge.context.PluginTag +import hep.dataforge.control.api.Device +import hep.dataforge.control.api.DeviceHub +import hep.dataforge.meta.Meta +import hep.dataforge.names.Name +import hep.dataforge.names.NameToken +import kotlin.reflect.KClass + +class DeviceManager : AbstractPlugin(), DeviceHub { + override val tag: PluginTag get() = Companion.tag + + /** + * Actual list of connected devices + */ + private val top = HashMap() + override val devices: Map get() = top + + fun registerDevice(name: String, device: Device, index: String? = null) { + val token = NameToken(name, index) + top[token] = device + } + + override fun provideTop(target: String): Map = super.provideTop(target) + + companion object : PluginFactory { + override val tag: PluginTag = PluginTag("devices", group = PluginTag.DATAFORGE_GROUP) + override val type: KClass = DeviceManager::class + + override fun invoke(meta: Meta, context: Context): DeviceManager = DeviceManager() + } +} + + +val Context.devices: DeviceManager get() = plugins.fetch(DeviceManager) \ No newline at end of file diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/MessageFlow.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/MessageFlow.kt deleted file mode 100644 index e69de29..0000000 diff --git a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt index bd9ee5f..a9a4a33 100644 --- a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt +++ b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt @@ -2,7 +2,8 @@ package hep.dataforge.control.server -import hep.dataforge.control.api.Device +import hep.dataforge.control.api.getDevice +import hep.dataforge.control.controllers.DeviceManager import hep.dataforge.control.controllers.DeviceMessage import hep.dataforge.control.controllers.MessageController import hep.dataforge.control.controllers.MessageController.Companion.GET_PROPERTY_ACTION @@ -34,10 +35,7 @@ import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flatMapMerge -import kotlinx.html.body -import kotlinx.html.h1 -import kotlinx.html.head -import kotlinx.html.title +import kotlinx.html.* import kotlinx.serialization.UnstableDefault import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonObject @@ -47,15 +45,11 @@ import kotlinx.serialization.json.jsonArray * Create and start a web server for several devices */ fun CoroutineScope.startDeviceServer( - devices: Map, + manager: DeviceManager, port: Int = 8111, host: String = "localhost" ): ApplicationEngine { - val controllers = devices.mapValues { - MessageController(it.value, it.key, this) - } - return this.embeddedServer(CIO, port, host) { install(WebSockets) install(CORS) { @@ -69,7 +63,7 @@ fun CoroutineScope.startDeviceServer( call.respond(HttpStatusCode.BadRequest, cause.message ?: "") } } - deviceModule(controllers) + deviceModule(manager) routing { get("/") { call.respondRedirect("/dashboard") @@ -78,7 +72,7 @@ fun CoroutineScope.startDeviceServer( }.start() } -fun ApplicationEngine.whenStarted(callback: Application.() -> Unit){ +fun ApplicationEngine.whenStarted(callback: Application.() -> Unit) { environment.monitor.subscribe(ApplicationStarted, callback) } @@ -132,20 +126,32 @@ private suspend fun ApplicationCall.setProperty(target: MessageController) { } @OptIn(KtorExperimentalAPI::class) -fun Application.deviceModule(targets: Map, route: String = "/") { - if(featureOrNull(WebSockets) == null) { +fun Application.deviceModule( + manager: DeviceManager, + deviceNames: Collection = manager.devices.keys.map { it.toString() }, + route: String = "/" +) { + val controllers = deviceNames.associateWith { name -> + val device = manager.getDevice(name) + MessageController(device, name, manager.context) + } + + fun generateFlow(target: String?) = if (target == null) { + controllers.values.asFlow().flatMapMerge { it.output() } + } else { + controllers[target]?.output() ?: error("The device with target $target not found") + } + + if (featureOrNull(WebSockets) == null) { install(WebSockets) } - if(featureOrNull(CORS)==null){ + + if (featureOrNull(CORS) == null) { install(CORS) { anyHost() } } - fun generateFlow(target: String?) = if (target == null) { - targets.values.asFlow().flatMapMerge { it.output() } - } else { - targets[target]?.output() ?: error("The device with target $target not found") - } + routing { route(route) { get("dashboard") { @@ -155,7 +161,36 @@ fun Application.deviceModule(targets: Map, route: Str } body { h1 { - +"Under construction" + +"Device server dashboard" + } + deviceNames.forEach { deviceName -> + val device = controllers[deviceName]!!.device + div { + id = deviceName + h2 { +deviceName } + h3 { +"Properties" } + ul { + device.propertyDescriptors.forEach { property -> + li { + a(href = "../$deviceName/${property.name}/get") { +"${property.name}: " } + code { + +property.config.toJson().toString() + } + } + } + } + h3 { +"Actions" } + ul { + device.actionDescriptors.forEach { action -> + li { + +("${action.name}: ") + code { + +action.config.toJson().toString() + } + } + } + } + } } } } @@ -163,7 +198,7 @@ fun Application.deviceModule(targets: Map, route: Str get("list") { call.respondJson { - targets.values.forEach { controller -> + controllers.values.forEach { controller -> "target" to controller.deviceTarget val device = controller.device "properties" to jsonArray { @@ -201,7 +236,7 @@ fun Application.deviceModule(targets: Map, route: Str post("message") { val target: String by call.request.queryParameters val controller = - targets[target] ?: throw IllegalArgumentException("Target $target not found in $targets") + controllers[target] ?: throw IllegalArgumentException("Target $target not found in $controllers") call.message(controller) } @@ -211,15 +246,16 @@ fun Application.deviceModule(targets: Map, route: Str route("{property}") { get("get") { val target: String by call.parameters - val controller = targets[target] - ?: throw IllegalArgumentException("Target $target not found in $targets") + val controller = controllers[target] + ?: throw IllegalArgumentException("Target $target not found in $controllers") call.getProperty(controller) } post("set") { val target: String by call.parameters val controller = - targets[target] ?: throw IllegalArgumentException("Target $target not found in $targets") + controllers[target] + ?: throw IllegalArgumentException("Target $target not found in $controllers") call.setProperty(controller) } diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/DemoControllerView.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/DemoControllerView.kt index ba3b733..03b1d73 100644 --- a/demo/src/main/kotlin/hep/dataforge/control/demo/DemoControllerView.kt +++ b/demo/src/main/kotlin/hep/dataforge/control/demo/DemoControllerView.kt @@ -1,29 +1,28 @@ package hep.dataforge.control.demo +import hep.dataforge.context.ContextAware +import hep.dataforge.context.Global import io.ktor.server.engine.ApplicationEngine import javafx.scene.Parent import javafx.scene.control.Slider import javafx.scene.layout.Priority import javafx.stage.Stage -import kotlinx.coroutines.* -import org.slf4j.LoggerFactory +import kotlinx.coroutines.launch import tornadofx.* import java.awt.Desktop import java.net.URI -import kotlin.coroutines.CoroutineContext -val logger = LoggerFactory.getLogger("Demo") - -class DemoController : Controller(), CoroutineScope { +class DemoController : Controller(), ContextAware { var device: DemoDevice? = null var server: ApplicationEngine? = null - override val coroutineContext: CoroutineContext = GlobalScope.newCoroutineContext(Dispatchers.Default) + Job() + override val context = Global.context("demoDevice") fun init() { - launch { - device = DemoDevice(this) - server = device?.let { this.startDemoDeviceServer(it) } + context.launch { + val demo = DemoDevice(context) + device = demo + server = startDemoDeviceServer(context, demo) } } @@ -33,7 +32,7 @@ class DemoController : Controller(), CoroutineScope { logger.info("Visualization server stopped") device?.close() logger.info("Device server stopped") - cancel("Application context closed") + context.close() } } diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt index 52b4c56..544c065 100644 --- a/demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt +++ b/demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt @@ -1,10 +1,12 @@ package hep.dataforge.control.demo +import hep.dataforge.context.Context +import hep.dataforge.context.Factory import hep.dataforge.control.base.* import hep.dataforge.control.controllers.double +import hep.dataforge.meta.Meta import hep.dataforge.values.asValue import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.Job import kotlinx.coroutines.asCoroutineDispatcher import java.time.Instant @@ -15,7 +17,7 @@ import kotlin.time.ExperimentalTime import kotlin.time.seconds @OptIn(ExperimentalTime::class) -class DemoDevice(parentScope: CoroutineScope = GlobalScope) : DeviceBase() { +class DemoDevice(parentScope: CoroutineScope) : DeviceBase() { private val executor = Executors.newSingleThreadExecutor() @@ -30,21 +32,21 @@ class DemoDevice(parentScope: CoroutineScope = GlobalScope) : DeviceBase() { var sinScaleValue by sinScale.double() val sin by readingNumber { val time = Instant.now() - sin(time.toEpochMilli().toDouble() / timeScaleValue)*sinScaleValue + sin(time.toEpochMilli().toDouble() / timeScaleValue) * sinScaleValue } val cosScale by writingVirtual(1.0.asValue()) var cosScaleValue by cosScale.double() val cos by readingNumber { val time = Instant.now() - cos(time.toEpochMilli().toDouble() / timeScaleValue)*cosScaleValue + cos(time.toEpochMilli().toDouble() / timeScaleValue) * cosScaleValue } val coordinates by readingMeta { val time = Instant.now() "time" put time.toEpochMilli() - "x" put sin(time.toEpochMilli().toDouble() / timeScaleValue)*sinScaleValue - "y" put cos(time.toEpochMilli().toDouble() / timeScaleValue)*cosScaleValue + "x" put sin(time.toEpochMilli().toDouble() / timeScaleValue) * sinScaleValue + "y" put cos(time.toEpochMilli().toDouble() / timeScaleValue) * cosScaleValue } @@ -64,4 +66,8 @@ class DemoDevice(parentScope: CoroutineScope = GlobalScope) : DeviceBase() { super.close() executor.shutdown() } + + companion object : Factory { + override fun invoke(meta: Meta, context: Context): DemoDevice = DemoDevice(context) + } } \ No newline at end of file diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceServer.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceServer.kt index 7a69455..74e0684 100644 --- a/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceServer.kt +++ b/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceServer.kt @@ -1,11 +1,12 @@ package hep.dataforge.control.demo +import hep.dataforge.context.Context +import hep.dataforge.control.controllers.devices import hep.dataforge.control.server.startDeviceServer import hep.dataforge.control.server.whenStarted import hep.dataforge.meta.double import hep.dataforge.meta.invoke import io.ktor.server.engine.ApplicationEngine -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.* import kotlinx.coroutines.launch import kotlinx.html.div @@ -47,8 +48,9 @@ suspend fun Trace.updateXYFrom(flow: Flow>>) { } -fun CoroutineScope.startDemoDeviceServer(device: DemoDevice): ApplicationEngine { - val server = startDeviceServer(mapOf("demo" to device)) +fun startDemoDeviceServer(context: Context, device: DemoDevice): ApplicationEngine { + context.devices.registerDevice("demo", device) + val server = context.startDeviceServer(context.devices) server.whenStarted { plotlyModule("plots").apply { updateMode = PlotlyUpdateMode.PUSH @@ -74,7 +76,7 @@ fun CoroutineScope.startDemoDeviceServer(device: DemoDevice): ApplicationEngine yaxis.title = "sin" } trace { - launch { + context.launch { val flow: Flow> = sinFlow.mapNotNull { it.double }.windowed(100) updateFrom(Trace.Y_AXIS, flow) } @@ -89,7 +91,7 @@ fun CoroutineScope.startDemoDeviceServer(device: DemoDevice): ApplicationEngine yaxis.title = "cos" } trace { - launch { + context.launch { val flow: Flow> = cosFlow.mapNotNull { it.double }.windowed(100) updateFrom(Trace.Y_AXIS, flow) } @@ -107,7 +109,7 @@ fun CoroutineScope.startDemoDeviceServer(device: DemoDevice): ApplicationEngine } trace { name = "non-synchronized" - launch { + context.launch { val flow: Flow>> = sinCosFlow.windowed(30) updateXYFrom(flow) }