Dev #6

altavir merged 75 commits from dev into master 2021-10-23 11:02:48 +03:00
11 changed files with 256 additions and 59 deletions
Showing only changes of commit 06f52a73bc - Show all commits

@ -1,4 +1,4 @@
val dataforgeVersion by extra("0.1.8")
val dataforgeVersion by extra("0.1.9-dev")
allprojects {

@ -11,6 +11,11 @@ kotlin {
dependencies {
dependencies {

@ -0,0 +1,72 @@
package hep.dataforge.control.client
import hep.dataforge.control.api.getDevice
import hep.dataforge.control.controllers.DeviceManager
import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.control.controllers.MessageController
import hep.dataforge.meta.Meta
import hep.dataforge.meta.toJson
import hep.dataforge.meta.toMeta
import hep.dataforge.meta.wrap
import io.ktor.client.HttpClient
import io.ktor.http.ContentType
import io.ktor.http.Url
import io.ktor.http.contentType
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.json
"id":"string|number[optional, but desired]",
"parentId": "string|number[optional]",
"action":"string[optional, default='heartbeat']",
* Convert a [DeviceMessage] to [Waltz format](
fun DeviceMessage.toWaltz(id: String, parentId: String): JsonObject = json {
"id" to id
"parentId" to parentId
"target" to "magix"
"origin" to "df"
"payload" to config.toJson()
fun DeviceMessage.fromWaltz(json: JsonObject): DeviceMessage =
DeviceMessage.wrap(json["payload"]?.jsonObject?.toMeta() ?: Meta.EMPTY)
fun DeviceManager.startWaltzClient(
waltzUrl: Url,
deviceNames: Collection<String> = { it.toString() }
): Job {
val controllers = { name ->
val device = getDevice(name)
MessageController(device, name, context)
val client = HttpClient()
val outputFlow = controllers.asFlow().flatMapMerge {
}.filter { == null }.map { DeviceMessage.wrap(it.meta) }
return context.launch {
outputFlow.collect { message ->{
body = message.config.toJson().toString()

@ -1,5 +1,6 @@
package hep.dataforge.control.api
import hep.dataforge.control.api.Device.Companion.DEVICE_TARGET
import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.control.controllers.MessageController
import hep.dataforge.control.controllers.MessageData
@ -9,6 +10,7 @@ import
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaItem
import hep.dataforge.meta.wrap
import hep.dataforge.provider.Type
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
@ -17,6 +19,7 @@ import
* General interface describing a managed Device
interface Device: Closeable, Responder {
* List of supported property descriptors
@ -80,7 +83,7 @@ interface Device: Closeable, Responder {
companion object{
const val DEVICE_TARGET = "device"

@ -1,23 +1,58 @@
package hep.dataforge.control.api
import hep.dataforge.meta.MetaItem
import hep.dataforge.names.Name
import hep.dataforge.names.NameToken
import hep.dataforge.names.asName
import hep.dataforge.names.toName
import hep.dataforge.provider.Provider
* A hub that could locate multiple devices and redirect actions to them
interface DeviceHub {
fun getDevice(deviceName: String): Device?
interface DeviceHub : Provider {
val devices: Map<NameToken, Device>
override val defaultTarget: String get() = Device.DEVICE_TARGET
override fun provideTop(target: String): Map<Name, Any> {
if (target == Device.DEVICE_TARGET) {
return devices.mapKeys { it.key.asName() }
} else {
throw IllegalArgumentException("Target $target is not supported for $this")
companion object {
* Resolve the device by its full name if it is present. Hubs are resolved recursively.
fun DeviceHub.getDevice(name: Name): Device = when (name.length) {
0 -> (this as? Device) ?: error("The DeviceHub is resolved by name but it is not a Device")
1 -> {
val token = name.first()!!
devices[token] ?: error("Device with name $token not found in the hub $this")
else -> {
val hub = getDevice(name.cutLast()) as? DeviceHub
?: error("The device with name ${name.cutLast()} does not exist or is not a hub")
fun DeviceHub.getDevice(deviceName: String) = getDevice(deviceName.toName())
suspend fun DeviceHub.getProperty(deviceName: String, propertyName: String): MetaItem<*> =
(getDevice(deviceName) ?: error("Device with name $deviceName not found in the hub"))
suspend fun DeviceHub.setProperty(deviceName: String, propertyName: String, value: MetaItem<*>) {
(getDevice(deviceName) ?: error("Device with name $deviceName not found in the hub"))
.setProperty(propertyName, value)
getDevice(deviceName).setProperty(propertyName, value)
suspend fun DeviceHub.exec(deviceName: String, command: String, argument: MetaItem<*>?): MetaItem<*>? =
(getDevice(deviceName) ?: error("Device with name $deviceName not found in the hub"))
.exec(command, argument)
getDevice(deviceName).exec(command, argument)

@ -0,0 +1,39 @@
package hep.dataforge.control.controllers
import hep.dataforge.context.AbstractPlugin
import hep.dataforge.context.Context
import hep.dataforge.context.PluginFactory
import hep.dataforge.context.PluginTag
import hep.dataforge.control.api.Device
import hep.dataforge.control.api.DeviceHub
import hep.dataforge.meta.Meta
import hep.dataforge.names.Name
import hep.dataforge.names.NameToken
import kotlin.reflect.KClass
class DeviceManager : AbstractPlugin(), DeviceHub {
override val tag: PluginTag get() = Companion.tag
* Actual list of connected devices
private val top = HashMap<NameToken, Device>()
override val devices: Map<NameToken, Device> get() = top
fun registerDevice(name: String, device: Device, index: String? = null) {
val token = NameToken(name, index)
top[token] = device
override fun provideTop(target: String): Map<Name, Any> = super<DeviceHub>.provideTop(target)
companion object : PluginFactory<DeviceManager> {
override val tag: PluginTag = PluginTag("devices", group = PluginTag.DATAFORGE_GROUP)
override val type: KClass<out DeviceManager> = DeviceManager::class
override fun invoke(meta: Meta, context: Context): DeviceManager = DeviceManager()
val Context.devices: DeviceManager get() = plugins.fetch(DeviceManager)

@ -2,7 +2,8 @@
package hep.dataforge.control.server
import hep.dataforge.control.api.Device
import hep.dataforge.control.api.getDevice
import hep.dataforge.control.controllers.DeviceManager
import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.control.controllers.MessageController
import hep.dataforge.control.controllers.MessageController.Companion.GET_PROPERTY_ACTION
@ -34,10 +35,7 @@ import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.html.body
import kotlinx.html.h1
import kotlinx.html.head
import kotlinx.html.title
import kotlinx.html.*
import kotlinx.serialization.UnstableDefault
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonObject
@ -47,15 +45,11 @@ import kotlinx.serialization.json.jsonArray
* Create and start a web server for several devices
fun CoroutineScope.startDeviceServer(
devices: Map<String, Device>,
manager: DeviceManager,
port: Int = 8111,
host: String = "localhost"
): ApplicationEngine {
val controllers = devices.mapValues {
MessageController(it.value, it.key, this)
return this.embeddedServer(CIO, port, host) {
install(CORS) {
@ -69,7 +63,7 @@ fun CoroutineScope.startDeviceServer(
call.respond(HttpStatusCode.BadRequest, cause.message ?: "")
routing {
get("/") {
@ -78,7 +72,7 @@ fun CoroutineScope.startDeviceServer(
fun ApplicationEngine.whenStarted(callback: Application.() -> Unit){
fun ApplicationEngine.whenStarted(callback: Application.() -> Unit) {
environment.monitor.subscribe(ApplicationStarted, callback)
@ -132,20 +126,32 @@ private suspend fun ApplicationCall.setProperty(target: MessageController) {
fun Application.deviceModule(targets: Map<String, MessageController>, route: String = "/") {
if(featureOrNull(WebSockets) == null) {
fun Application.deviceModule(
manager: DeviceManager,
deviceNames: Collection<String> = { it.toString() },
route: String = "/"
) {
val controllers = deviceNames.associateWith { name ->
val device = manager.getDevice(name)
MessageController(device, name, manager.context)
fun generateFlow(target: String?) = if (target == null) {
controllers.values.asFlow().flatMapMerge { it.output() }
} else {
controllers[target]?.output() ?: error("The device with target $target not found")
if (featureOrNull(WebSockets) == null) {
if (featureOrNull(CORS) == null) {
install(CORS) {
fun generateFlow(target: String?) = if (target == null) {
targets.values.asFlow().flatMapMerge { it.output() }
} else {
targets[target]?.output() ?: error("The device with target $target not found")
routing {
route(route) {
get("dashboard") {
@ -155,7 +161,36 @@ fun Application.deviceModule(targets: Map<String, MessageController>, route: Str
body {
h1 {
+"Under construction"
+"Device server dashboard"
deviceNames.forEach { deviceName ->
val device = controllers[deviceName]!!.device
div {
id = deviceName
h2 { +deviceName }
h3 { +"Properties" }
ul {
device.propertyDescriptors.forEach { property ->
li {
a(href = "../$deviceName/${}/get") { +"${}: " }
code {
h3 { +"Actions" }
ul {
device.actionDescriptors.forEach { action ->
li {
+("${}: ")
code {
@ -163,7 +198,7 @@ fun Application.deviceModule(targets: Map<String, MessageController>, route: Str
get("list") {
call.respondJson {
targets.values.forEach { controller ->
controllers.values.forEach { controller ->
"target" to controller.deviceTarget
val device = controller.device
"properties" to jsonArray {
@ -201,7 +236,7 @@ fun Application.deviceModule(targets: Map<String, MessageController>, route: Str
post("message") {
val target: String by call.request.queryParameters
val controller =
targets[target] ?: throw IllegalArgumentException("Target $target not found in $targets")
controllers[target] ?: throw IllegalArgumentException("Target $target not found in $controllers")
@ -211,15 +246,16 @@ fun Application.deviceModule(targets: Map<String, MessageController>, route: Str
route("{property}") {
get("get") {
val target: String by call.parameters
val controller = targets[target]
?: throw IllegalArgumentException("Target $target not found in $targets")
val controller = controllers[target]
?: throw IllegalArgumentException("Target $target not found in $controllers")
post("set") {
val target: String by call.parameters
val controller =
targets[target] ?: throw IllegalArgumentException("Target $target not found in $targets")
?: throw IllegalArgumentException("Target $target not found in $controllers")

@ -1,29 +1,28 @@
package hep.dataforge.control.demo
import hep.dataforge.context.ContextAware
import hep.dataforge.context.Global
import io.ktor.server.engine.ApplicationEngine
import javafx.scene.Parent
import javafx.scene.control.Slider
import javafx.scene.layout.Priority
import javafx.stage.Stage
import kotlinx.coroutines.*
import org.slf4j.LoggerFactory
import kotlinx.coroutines.launch
import tornadofx.*
import java.awt.Desktop
import kotlin.coroutines.CoroutineContext
val logger = LoggerFactory.getLogger("Demo")
class DemoController : Controller(), CoroutineScope {
class DemoController : Controller(), ContextAware {
var device: DemoDevice? = null
var server: ApplicationEngine? = null
override val coroutineContext: CoroutineContext = GlobalScope.newCoroutineContext(Dispatchers.Default) + Job()
override val context = Global.context("demoDevice")
fun init() {
launch {
device = DemoDevice(this)
server = device?.let { this.startDemoDeviceServer(it) }
context.launch {
val demo = DemoDevice(context)
device = demo
server = startDemoDeviceServer(context, demo)
@ -33,7 +32,7 @@ class DemoController : Controller(), CoroutineScope {"Visualization server stopped")
device?.close()"Device server stopped")
cancel("Application context closed")

@ -1,10 +1,12 @@
package hep.dataforge.control.demo
import hep.dataforge.context.Context
import hep.dataforge.context.Factory
import hep.dataforge.control.base.*
import hep.dataforge.control.controllers.double
import hep.dataforge.meta.Meta
import hep.dataforge.values.asValue
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.asCoroutineDispatcher
import java.time.Instant
@ -15,7 +17,7 @@ import kotlin.time.ExperimentalTime
import kotlin.time.seconds
class DemoDevice(parentScope: CoroutineScope = GlobalScope) : DeviceBase() {
class DemoDevice(parentScope: CoroutineScope) : DeviceBase() {
private val executor = Executors.newSingleThreadExecutor()
@ -30,21 +32,21 @@ class DemoDevice(parentScope: CoroutineScope = GlobalScope) : DeviceBase() {
var sinScaleValue by sinScale.double()
val sin by readingNumber {
val time =
sin(time.toEpochMilli().toDouble() / timeScaleValue)*sinScaleValue
sin(time.toEpochMilli().toDouble() / timeScaleValue) * sinScaleValue
val cosScale by writingVirtual(1.0.asValue())
var cosScaleValue by cosScale.double()
val cos by readingNumber {
val time =
cos(time.toEpochMilli().toDouble() / timeScaleValue)*cosScaleValue
cos(time.toEpochMilli().toDouble() / timeScaleValue) * cosScaleValue
val coordinates by readingMeta {
val time =
"time" put time.toEpochMilli()
"x" put sin(time.toEpochMilli().toDouble() / timeScaleValue)*sinScaleValue
"y" put cos(time.toEpochMilli().toDouble() / timeScaleValue)*cosScaleValue
"x" put sin(time.toEpochMilli().toDouble() / timeScaleValue) * sinScaleValue
"y" put cos(time.toEpochMilli().toDouble() / timeScaleValue) * cosScaleValue
@ -64,4 +66,8 @@ class DemoDevice(parentScope: CoroutineScope = GlobalScope) : DeviceBase() {
companion object : Factory<DemoDevice> {
override fun invoke(meta: Meta, context: Context): DemoDevice = DemoDevice(context)

@ -1,11 +1,12 @@
package hep.dataforge.control.demo
import hep.dataforge.context.Context
import hep.dataforge.control.controllers.devices
import hep.dataforge.control.server.startDeviceServer
import hep.dataforge.control.server.whenStarted
import hep.dataforge.meta.double
import hep.dataforge.meta.invoke
import io.ktor.server.engine.ApplicationEngine
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.html.div
@ -47,8 +48,9 @@ suspend fun Trace.updateXYFrom(flow: Flow<Iterable<Pair<Double, Double>>>) {
fun CoroutineScope.startDemoDeviceServer(device: DemoDevice): ApplicationEngine {
val server = startDeviceServer(mapOf("demo" to device))
fun startDemoDeviceServer(context: Context, device: DemoDevice): ApplicationEngine {
context.devices.registerDevice("demo", device)
val server = context.startDeviceServer(context.devices)
server.whenStarted {
plotlyModule("plots").apply {
updateMode = PlotlyUpdateMode.PUSH
@ -74,7 +76,7 @@ fun CoroutineScope.startDemoDeviceServer(device: DemoDevice): ApplicationEngine
yaxis.title = "sin"
trace {
launch {
context.launch {
val flow: Flow<Iterable<Double>> = sinFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
@ -89,7 +91,7 @@ fun CoroutineScope.startDemoDeviceServer(device: DemoDevice): ApplicationEngine
yaxis.title = "cos"
trace {
launch {
context.launch {
val flow: Flow<Iterable<Double>> = cosFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
@ -107,7 +109,7 @@ fun CoroutineScope.startDemoDeviceServer(device: DemoDevice): ApplicationEngine
trace {
name = "non-synchronized"
launch {
context.launch {
val flow: Flow<Iterable<Pair<Double, Double>>> = sinCosFlow.windowed(30)