Device web-server (untested)

This commit is contained in:
Alexander Nozik 2020-06-30 22:03:56 +03:00
parent 8e261a5ff5
commit 8db5934021
20 changed files with 485 additions and 211 deletions

View File

@ -1,4 +1,5 @@
import scientifik.useCoroutines
import scientifik.useSerialization
plugins {
id("scientifik.mpp")
@ -9,6 +10,7 @@ plugins {
val dataforgeVersion: String by rootProject.extra
useCoroutines(version = "1.3.7")
useSerialization()
kotlin {
sourceSets {

View File

@ -1,14 +0,0 @@
package hep.dataforge.control.api
import hep.dataforge.meta.Scheme
import hep.dataforge.meta.SchemeSpec
/**
* A descriptor for property
*/
class ActionDescriptor : Scheme() {
//var name by string { error("Property name is mandatory") }
//var descriptor by spec(ItemDescriptor)
companion object : SchemeSpec<ActionDescriptor>(::ActionDescriptor)
}

View File

@ -27,7 +27,7 @@ interface Device: Closeable {
* [owner] is provided optionally in order for listener to be
* easily removable
*/
fun registerListener(listener: PropertyChangeListener, owner: Any? = listener)
fun registerListener(listener: DeviceListener, owner: Any? = listener)
/**
* Remove all listeners belonging to specified owner
@ -61,9 +61,8 @@ interface Device: Closeable {
}
companion object {
const val GET_PROPERTY_ACTION = "@getProperty"
const val SET_PROPERTY_ACTION = "@setProperty"
const val CALL_ACTION ="@call"
const val GET_PROPERTY_ACTION = "@get"
const val SET_PROPERTY_ACTION = "@set"
}
}

View File

@ -2,6 +2,7 @@ package hep.dataforge.control.api
import hep.dataforge.meta.MetaItem
interface PropertyChangeListener {
interface DeviceListener {
fun propertyChanged(propertyName: String, value: MetaItem<*>?)
//TODO add general message listener method
}

View File

@ -1,14 +0,0 @@
package hep.dataforge.control.api
import hep.dataforge.meta.Scheme
import hep.dataforge.meta.SchemeSpec
/**
* A descriptor for property
*/
class PropertyDescriptor : Scheme() {
//var name by string { error("Property name is mandatory") }
//var descriptor by spec(ItemDescriptor)
companion object : SchemeSpec<PropertyDescriptor>(::PropertyDescriptor)
}

View File

@ -0,0 +1,20 @@
package hep.dataforge.control.api
import hep.dataforge.meta.Scheme
import hep.dataforge.meta.string
/**
* A descriptor for property
*/
class PropertyDescriptor(name: String) : Scheme() {
val name by string(name)
}
/**
* A descriptor for property
*/
class ActionDescriptor(name: String) : Scheme() {
val name by string(name)
//var descriptor by spec(ItemDescriptor)
}

View File

@ -23,42 +23,42 @@ class SimpleAction(
class ActionDelegate<D : DeviceBase>(
val owner: D,
val descriptor: ActionDescriptor = ActionDescriptor.empty(),
val descriptorBuilder: ActionDescriptor.()->Unit = {},
val block: suspend (MetaItem<*>?) -> MetaItem<*>?
) : ReadOnlyProperty<D, Action> {
override fun getValue(thisRef: D, property: KProperty<*>): Action {
val name = property.name
return owner.resolveAction(name) {
SimpleAction(name, descriptor, block)
SimpleAction(name, ActionDescriptor(name).apply(descriptorBuilder), block)
}
}
}
fun <D : DeviceBase> D.request(
descriptor: ActionDescriptor = ActionDescriptor.empty(),
descriptorBuilder: ActionDescriptor.()->Unit = {},
block: suspend (MetaItem<*>?) -> MetaItem<*>?
): ActionDelegate<D> = ActionDelegate(this, descriptor, block)
): ActionDelegate<D> = ActionDelegate(this, descriptorBuilder, block)
fun <D : DeviceBase> D.requestValue(
descriptor: ActionDescriptor = ActionDescriptor.empty(),
descriptorBuilder: ActionDescriptor.()->Unit = {},
block: suspend (MetaItem<*>?) -> Any?
): ActionDelegate<D> = ActionDelegate(this, descriptor){
): ActionDelegate<D> = ActionDelegate(this, descriptorBuilder){
val res = block(it)
MetaItem.ValueItem(Value.of(res))
}
fun <D : DeviceBase> D.requestMeta(
descriptor: ActionDescriptor = ActionDescriptor.empty(),
descriptorBuilder: ActionDescriptor.()->Unit = {},
block: suspend MetaBuilder.(MetaItem<*>?) -> Unit
): ActionDelegate<D> = ActionDelegate(this, descriptor){
): ActionDelegate<D> = ActionDelegate(this, descriptorBuilder){
val res = MetaBuilder().apply { block(it)}
MetaItem.NodeItem(res)
}
fun <D : DeviceBase> D.action(
descriptor: ActionDescriptor = ActionDescriptor.empty(),
descriptorBuilder: ActionDescriptor.()->Unit = {},
block: suspend (MetaItem<*>?) -> Unit
): ActionDelegate<D> = ActionDelegate(this, descriptor) {
): ActionDelegate<D> = ActionDelegate(this, descriptorBuilder) {
block(it)
null
}

View File

@ -2,7 +2,7 @@ package hep.dataforge.control.base
import hep.dataforge.control.api.ActionDescriptor
import hep.dataforge.control.api.Device
import hep.dataforge.control.api.PropertyChangeListener
import hep.dataforge.control.api.DeviceListener
import hep.dataforge.control.api.PropertyDescriptor
import hep.dataforge.meta.MetaItem
@ -10,9 +10,9 @@ abstract class DeviceBase : Device {
private val properties = HashMap<String, ReadOnlyDeviceProperty>()
private val actions = HashMap<String, Action>()
private val listeners = ArrayList<Pair<Any?, PropertyChangeListener>>(4)
private val listeners = ArrayList<Pair<Any?, DeviceListener>>(4)
override fun registerListener(listener: PropertyChangeListener, owner: Any?) {
override fun registerListener(listener: DeviceListener, owner: Any?) {
listeners.add(owner to listener)
}

View File

@ -65,7 +65,7 @@ open class IsolatedReadOnlyDeviceProperty(
private class ReadOnlyDevicePropertyDelegate<D : DeviceBase>(
val owner: D,
val default: MetaItem<*>?,
val descriptor: PropertyDescriptor = PropertyDescriptor.empty(),
val descriptorBuilder: PropertyDescriptor.() -> Unit = {},
private val getter: suspend (MetaItem<*>?) -> MetaItem<*>
) : ReadOnlyProperty<D, IsolatedReadOnlyDeviceProperty> {
@ -77,7 +77,7 @@ private class ReadOnlyDevicePropertyDelegate<D : DeviceBase>(
IsolatedReadOnlyDeviceProperty(
name,
default,
descriptor,
PropertyDescriptor(name).apply(descriptorBuilder),
owner.scope,
owner::propertyChanged,
getter
@ -93,7 +93,7 @@ fun <D : DeviceBase> D.reading(
): ReadOnlyProperty<D, IsolatedReadOnlyDeviceProperty> = ReadOnlyDevicePropertyDelegate(
this,
default,
PropertyDescriptor.invoke(descriptorBuilder),
descriptorBuilder,
getter
)
@ -104,7 +104,7 @@ fun <D : DeviceBase> D.readingValue(
): ReadOnlyProperty<D, IsolatedReadOnlyDeviceProperty> = ReadOnlyDevicePropertyDelegate(
this,
default?.let { MetaItem.ValueItem(it) },
PropertyDescriptor.invoke(descriptorBuilder),
descriptorBuilder,
getter = { MetaItem.ValueItem(Value.of(getter())) }
)
@ -115,7 +115,7 @@ fun <D : DeviceBase> D.readingNumber(
): ReadOnlyProperty<D, IsolatedReadOnlyDeviceProperty> = ReadOnlyDevicePropertyDelegate(
this,
default?.let { MetaItem.ValueItem(it.asValue()) },
PropertyDescriptor.invoke(descriptorBuilder),
descriptorBuilder,
getter = {
val number = getter()
MetaItem.ValueItem(number.asValue())
@ -129,7 +129,7 @@ fun <D : DeviceBase> D.readingMeta(
): ReadOnlyProperty<D, IsolatedReadOnlyDeviceProperty> = ReadOnlyDevicePropertyDelegate(
this,
default?.let { MetaItem.NodeItem(it) },
PropertyDescriptor.invoke(descriptorBuilder),
descriptorBuilder,
getter = {
MetaItem.NodeItem(MetaBuilder().apply { getter() })
}
@ -179,7 +179,7 @@ class IsolatedDeviceProperty(
private class DevicePropertyDelegate<D : DeviceBase>(
val owner: D,
val default: MetaItem<*>?,
val descriptor: PropertyDescriptor = PropertyDescriptor.empty(),
val descriptorBuilder: PropertyDescriptor.() -> Unit = {},
private val getter: suspend (MetaItem<*>?) -> MetaItem<*>,
private val setter: suspend (oldValue: MetaItem<*>?, newValue: MetaItem<*>) -> MetaItem<*>?
) : ReadOnlyProperty<D, IsolatedDeviceProperty> {
@ -191,7 +191,7 @@ private class DevicePropertyDelegate<D : DeviceBase>(
IsolatedDeviceProperty(
name,
default,
descriptor,
PropertyDescriptor(name).apply(descriptorBuilder),
owner.scope,
owner::propertyChanged,
getter,
@ -209,7 +209,7 @@ fun <D : DeviceBase> D.writing(
): ReadOnlyProperty<D, IsolatedDeviceProperty> = DevicePropertyDelegate(
this,
default,
PropertyDescriptor.invoke(descriptorBuilder),
descriptorBuilder,
getter,
setter
)
@ -250,7 +250,7 @@ fun <D : DeviceBase> D.writingDouble(
return DevicePropertyDelegate(
this,
MetaItem.ValueItem(Double.NaN.asValue()),
PropertyDescriptor.invoke(descriptorBuilder),
descriptorBuilder,
innerGetter,
innerSetter
)

View File

@ -1,62 +1,67 @@
package hep.dataforge.control.controlers
import hep.dataforge.control.controlers.DeviceMessage.Companion.PAYLOAD_VALUE_KEY
import hep.dataforge.meta.*
import hep.dataforge.names.asName
import hep.dataforge.names.plus
import kotlinx.serialization.KSerializer
import kotlinx.serialization.Serializable
open class DeviceMessage : Scheme() {
class DeviceMessage : Scheme() {
var id by item()
var source by string()//TODO consider replacing by item
var target by string()
var comment by string()
var action by string(key = MESSAGE_ACTION_KEY)
var status by string(RESPONSE_OK_STATUS)
var value by item(key = MESSAGE_VALUE_KEY)
var payload by config(key = MESSAGE_PAYLOAD_KEY)
var value by item(key = (MESSAGE_PAYLOAD_KEY + PAYLOAD_VALUE_KEY))
/**
* Set a payload for this message according to the given scheme
*/
inline fun <T : Scheme> payload(spec: Specification<T>, block: T.() -> Unit): T =
(payload?.let { spec.wrap(it) } ?: spec.empty().also { payload = it.config }).apply(block)
companion object : SchemeSpec<DeviceMessage>(::DeviceMessage){
val MESSAGE_ACTION_KEY = "action".asName()
val MESSAGE_VALUE_KEY = "value".asName()
val MESSAGE_PAYLOAD_KEY = "payload".asName()
val PAYLOAD_VALUE_KEY = "value".asName()
const val RESPONSE_OK_STATUS = "response.OK"
const val RESPONSE_FAIL_STATUS = "response.FAIL"
const val PROPERTY_CHANGED_ACTION = "event.propertyChange"
fun ok(request: DeviceMessage? = null, block: DeviceMessage.() -> Unit = {}): DeviceMessage {
return DeviceMessage {
inline fun ok(
request: DeviceMessage? = null,
block: DeviceMessage.() -> Unit = {}
): DeviceMessage = DeviceMessage {
id = request?.id
}.apply(block)
}
fun fail(request: DeviceMessage? = null,block: DeviceMessage.() -> Unit = {}): DeviceMessage {
return DeviceMessage {
inline fun fail(
request: DeviceMessage? = null,
block: DeviceMessage.() -> Unit = {}
): DeviceMessage = DeviceMessage {
id = request?.id
status = RESPONSE_FAIL_STATUS
}.apply(block)
}
}
class PropertyPayload : Scheme() {
var name by string { error("Property name could not be empty") }
var value by item(key = PAYLOAD_VALUE_KEY)
companion object : SchemeSpec<PropertyPayload>(::PropertyPayload)
}
class DevicePropertyMessage : DeviceMessage() {
//TODO add multiple properties in the same message
var property by spec(PropertyValue)
@DFBuilder
inline fun DeviceMessage.property(block: PropertyPayload.() -> Unit): PropertyPayload = payload(PropertyPayload, block)
fun property(builder: PropertyValue.() -> Unit) {
this.property = PropertyValue.invoke(builder)
var DeviceMessage.property: PropertyPayload?
get() = payload?.let { PropertyPayload.wrap(it) }
set(value) {
payload = value?.config
}
class PropertyValue : Scheme() {
var name by string { error("Property name not defined") }
var value by item()
companion object : SchemeSpec<PropertyValue>(::PropertyValue)
}
companion object : SchemeSpec<DevicePropertyMessage>(::DevicePropertyMessage) {
const val PROPERTY_CHANGED_ACTION = "event.propertyChange"
fun ok(request: DeviceMessage? = null, block: DevicePropertyMessage.() -> Unit = {}): DeviceMessage {
return DevicePropertyMessage {
id = request?.id
property {
name
}
}.apply(block)
}
}
}

View File

@ -1,44 +1,54 @@
package hep.dataforge.control.controlers
import hep.dataforge.control.api.Device
import hep.dataforge.control.api.PropertyChangeListener
import hep.dataforge.control.controlers.DevicePropertyMessage.Companion.PROPERTY_CHANGED_ACTION
import hep.dataforge.control.api.DeviceListener
import hep.dataforge.control.controlers.DeviceMessage.Companion.PROPERTY_CHANGED_ACTION
import hep.dataforge.io.Envelope
import hep.dataforge.io.Responder
import hep.dataforge.io.SimpleEnvelope
import hep.dataforge.meta.MetaItem
import hep.dataforge.meta.get
import hep.dataforge.meta.string
import hep.dataforge.meta.wrap
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.launch
import kotlinx.io.Binary
interface MessageConsumer {
/**
* A consumer of envelopes
*/
interface Consumer {
fun consume(message: Envelope): Unit
}
class MessageController(
val device: Device,
val deviceTarget: String
) : Responder, PropertyChangeListener {
val deviceTarget: String,
val scope: CoroutineScope = device.scope
) : Consumer, Responder, DeviceListener {
init {
device.registerListener(this, this)
}
var messageListener: MessageConsumer? = null
private val outputChannel = Channel<Envelope>(Channel.CONFLATED)
override suspend fun respond(request: Envelope): Envelope {
val responseMessage: DeviceMessage = try {
when (val action = request.meta[DeviceMessage.MESSAGE_ACTION_KEY].string ?: error("Action not defined")) {
suspend fun respondMessage(
request: DeviceMessage
): DeviceMessage = if (request.target != null && request.target != deviceTarget) {
DeviceMessage.fail {
comment = "Wrong target name $deviceTarget expected but ${request.target} found"
}
} else try {
when (val action = request.action ?: error("Action is not defined in message")) {
Device.GET_PROPERTY_ACTION -> {
val message = DevicePropertyMessage.wrap(request.meta)
val property = message.property ?: error("Property item not defined")
val property = request.property ?: error("Payload is not defined or not a property")
val propertyName: String = property.name
val result = device.getProperty(propertyName)
DevicePropertyMessage.ok {
DeviceMessage.ok {
this.source = deviceTarget
this.target = message.source
this.target = request.source
property {
name = propertyName
value = result
@ -46,8 +56,7 @@ class MessageController(
}
}
Device.SET_PROPERTY_ACTION -> {
val message = DevicePropertyMessage.wrap(request.meta)
val property = message.property ?: error("Property item not defined")
val property = request.property ?: error("Payload is not defined or not a property")
val propertyName: String = property.name
val propertyValue = property.value
if (propertyValue == null) {
@ -55,16 +64,16 @@ class MessageController(
} else {
device.setProperty(propertyName, propertyValue)
}
DevicePropertyMessage.ok {
DeviceMessage.ok {
this.source = deviceTarget
this.target = message.source
this.target = request.source
property {
name = propertyName
}
}
}
else -> {
val value = request.meta[DeviceMessage.MESSAGE_VALUE_KEY]
val value = request.value
val result = device.call(action, value)
DeviceMessage.ok {
this.source = deviceTarget
@ -79,13 +88,23 @@ class MessageController(
}
}
override fun consume(message: Envelope) {
// Fire the respond procedure and forget about the result
scope.launch {
respond(message)
}
}
override suspend fun respond(request: Envelope): Envelope {
val requestMessage = DeviceMessage.wrap(request.meta)
val responseMessage = respondMessage(requestMessage)
return SimpleEnvelope(responseMessage.toMeta(), Binary.EMPTY)
}
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
if (value == null) return
messageListener?.let { listener ->
val change = DevicePropertyMessage.ok {
scope.launch {
val change = DeviceMessage.ok {
this.source = deviceTarget
action = PROPERTY_CHANGED_ACTION
property {
@ -94,10 +113,14 @@ class MessageController(
}
}
val envelope = SimpleEnvelope(change.toMeta(), Binary.EMPTY)
listener.consume(envelope)
outputChannel.send(envelope)
}
}
fun output() = outputChannel.consumeAsFlow()
companion object {
}
}

View File

@ -1,57 +0,0 @@
package hep.dataforge.control.controlers
import hep.dataforge.io.Envelope
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.launch
import kotlinx.io.Closeable
@ExperimentalCoroutinesApi
class MessageFlow(
val controller: MessageController,
val scope: CoroutineScope
) : Closeable, MessageConsumer {
init {
if (controller.messageListener != null) error("Can't attach controller to $controller, the controller is already attached")
controller.messageListener = this
}
private val outputChannel = Channel<Envelope>(CONFLATED)
private val inputChannel = Channel<Envelope>(CONFLATED)
val input: SendChannel<Envelope> get() = inputChannel
val output: Flow<Envelope> = outputChannel.consumeAsFlow()
init {
scope.launch {
while (!inputChannel.isClosedForSend) {
val request = inputChannel.receive()
val response = controller.respond(request)
outputChannel.send(response)
}
}
}
override fun consume(message: Envelope) {
scope.launch {
outputChannel.send(message)
}
}
override fun close() {
outputChannel.cancel()
}
}
@ExperimentalCoroutinesApi
fun MessageController.flow(scope: CoroutineScope = device.scope): MessageFlow {
return MessageFlow(this, scope).also {
this@flow.messageListener = it
}
}

View File

@ -1,7 +1,7 @@
package hep.dataforge.control.controlers
import hep.dataforge.control.api.Device
import hep.dataforge.control.api.PropertyChangeListener
import hep.dataforge.control.api.DeviceListener
import hep.dataforge.meta.MetaItem
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.awaitClose
@ -12,7 +12,7 @@ import kotlinx.coroutines.launch
@ExperimentalCoroutinesApi
suspend fun Device.flowValues(): Flow<Pair<String, MetaItem<*>>> = callbackFlow {
val listener = object : PropertyChangeListener {
val listener = object : DeviceListener {
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
if (value != null) {
launch {

View File

@ -0,0 +1,21 @@
import scientifik.useCoroutines
import scientifik.useSerialization
plugins {
id("scientifik.jvm")
id("scientifik.publish")
application
}
useSerialization()
val dataforgeVersion: String by rootProject.extra
val ktorVersion: String by extra("1.3.2")
dependencies{
implementation(project(":dataforge-control-core"))
implementation("io.ktor:ktor-server-cio:$ktorVersion")
implementation("io.ktor:ktor-websockets:$ktorVersion")
implementation("io.ktor:ktor-serialization:$ktorVersion")
implementation("io.ktor:ktor-html-builder:$ktorVersion")
}

View File

@ -0,0 +1,217 @@
@file:OptIn(ExperimentalCoroutinesApi::class, KtorExperimentalAPI::class, FlowPreview::class, UnstableDefault::class)
package hep.dataforge.control.server
import hep.dataforge.control.api.Device
import hep.dataforge.control.controlers.DeviceMessage
import hep.dataforge.control.controlers.MessageController
import hep.dataforge.control.controlers.property
import hep.dataforge.meta.toJson
import hep.dataforge.meta.toMeta
import hep.dataforge.meta.toMetaItem
import hep.dataforge.meta.wrap
import io.ktor.application.*
import io.ktor.features.ContentNegotiation
import io.ktor.features.StatusPages
import io.ktor.html.respondHtml
import io.ktor.http.ContentType
import io.ktor.http.HttpStatusCode
import io.ktor.request.receiveText
import io.ktor.response.respond
import io.ktor.response.respondRedirect
import io.ktor.response.respondText
import io.ktor.routing.*
import io.ktor.serialization.json
import io.ktor.server.cio.CIO
import io.ktor.server.engine.ApplicationEngine
import io.ktor.server.engine.embeddedServer
import io.ktor.util.KtorExperimentalAPI
import io.ktor.util.getValue
import io.ktor.websocket.WebSockets
import io.ktor.websocket.webSocket
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.html.body
import kotlinx.html.h1
import kotlinx.html.head
import kotlinx.html.title
import kotlinx.serialization.UnstableDefault
import kotlinx.serialization.json.*
/**
* Create and start a web server for several devices
*/
fun CoroutineScope.startDeviceServer(
devices: Map<String, Device>,
port: Int = 8111,
host: String = "0.0.0.0"
): ApplicationEngine {
val controllers = devices.mapValues {
MessageController(it.value, it.key, this)
}
return embeddedServer(CIO, port, host) {
install(WebSockets)
install(ContentNegotiation) {
json()
}
install(StatusPages) {
exception<IllegalArgumentException> { cause ->
call.respond(HttpStatusCode.BadRequest, cause.message ?: "")
}
}
routing {
routeDevices(controllers)
get("/") {
call.respondRedirect("/dashboard")
}
}
}.start()
}
suspend fun ApplicationCall.respondJson(builder: JsonObjectBuilder.() -> Unit) {
val json = json(builder)
respondText(json.toString(), contentType = ContentType.Application.Json)
}
const val WEB_SERVER_TARGET = "@webServer"
private suspend fun ApplicationCall.message(target: MessageController) {
val body = receiveText()
val json = Json.parseJson(body) as? JsonObject
?: throw IllegalArgumentException("The body is not a json object")
val meta = json.toMeta()
val request = DeviceMessage.wrap(meta)
val response = target.respondMessage(request)
respond(response.toMeta())
}
private suspend fun ApplicationCall.getProperty(target: MessageController) {
val property: String by parameters
val request = DeviceMessage {
action = Device.GET_PROPERTY_ACTION
source = WEB_SERVER_TARGET
this.target = target.deviceTarget
property {
name = property
}
}
val response = target.respondMessage(request)
respond(response.toMeta())
}
private suspend fun ApplicationCall.setProperty(target: MessageController) {
val property: String by parameters
val body = receiveText()
val json = Json.parseJson(body)
val request = DeviceMessage {
action = Device.SET_PROPERTY_ACTION
source = WEB_SERVER_TARGET
this.target = target.deviceTarget
property {
name = property
value = json.toMetaItem()
}
}
val response = target.respondMessage(request)
respondMessage(response)
}
fun Routing.routeDevices(targets: Map<String, MessageController>) {
this.application.feature(WebSockets)
fun generateFlow(target: String?) = if (target == null) {
targets.values.asFlow().flatMapMerge { it.output() }
} else {
targets[target]?.output() ?: error("The device with target $target not found")
}
get("dashboard") {
call.respondHtml {
head {
title("Device server dashboard")
}
body {
h1 {
+"Under construction"
}
}
}
}
get("list") {
call.respondJson {
targets.values.forEach { controller ->
"target" to controller.deviceTarget
val device = controller.device
"properties" to jsonArray {
device.propertyDescriptors.forEach { descriptor ->
+descriptor.config.toJson()
}
}
"actions" to jsonArray {
device.actionDescriptors.forEach { actionDescriptor ->
+actionDescriptor.config.toJson()
}
}
}
}
}
//Check if application supports websockets and if it does add a push channel
if (this.application.featureOrNull(WebSockets) != null) {
webSocket("ws") {
//subscribe on device
val target: String? by call.request.queryParameters
try {
application.log.debug("Opened server socket for ${call.request.queryParameters}")
generateFlow(target).collect {
outgoing.send(it.toFrame())
}
} catch (ex: Exception) {
application.log.debug("Closed server socket for ${call.request.queryParameters}")
}
}
}
post("message") {
val target: String by call.request.queryParameters
val controller = targets[target] ?: throw IllegalArgumentException("Target $target not found in $targets")
call.message(controller)
}
route("{target}") {
//global route for the device
route("{property}") {
get("get") {
val target: String by call.parameters
val controller = targets[target]
?: throw IllegalArgumentException("Target $target not found in $targets")
call.getProperty(controller)
}
post("set") {
val target: String by call.parameters
val controller =
targets[target] ?: throw IllegalArgumentException("Target $target not found in $targets")
call.setProperty(controller)
}
}
}
}

View File

@ -0,0 +1,27 @@
package hep.dataforge.control.server
import hep.dataforge.control.controlers.DeviceMessage
import hep.dataforge.io.Envelope
import io.ktor.application.ApplicationCall
import io.ktor.http.cio.websocket.Frame
import io.ktor.response.ApplicationResponse
fun Frame.toEnvelope(): Envelope {
TODO()
}
fun Envelope.toFrame(): Frame {
TODO()
}
suspend fun ApplicationCall.respondMessage(message: DeviceMessage) {
TODO()
}
suspend fun ApplicationCall.respondMessage(builder: DeviceMessage.() -> Unit) {
respondMessage(DeviceMessage(builder))
}
suspend fun ApplicationCall.respondFail(builder: DeviceMessage.() -> Unit) {
respondMessage(DeviceMessage.fail(null, builder))
}

View File

@ -0,0 +1,41 @@
package hep.dataforge.control.server
import io.ktor.application.ApplicationCall
import io.ktor.http.CacheControl
import io.ktor.http.ContentType
import io.ktor.response.cacheControl
import io.ktor.response.respondTextWriter
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
/**
* The data class representing a SSE Event that will be sent to the client.
*/
data class SseEvent(val data: String, val event: String? = null, val id: String? = null)
/**
* Method that responds an [ApplicationCall] by reading all the [SseEvent]s from the specified [events] [ReceiveChannel]
* and serializing them in a way that is compatible with the Server-Sent Events specification.
*
* You can read more about it here: https://www.html5rocks.com/en/tutorials/eventsource/basics/
*/
@Suppress("BlockingMethodInNonBlockingContext")
suspend fun ApplicationCall.respondSse(events: Flow<SseEvent>) {
response.cacheControl(CacheControl.NoCache(null))
respondTextWriter(contentType = ContentType.Text.EventStream) {
events.collect { event->
if (event.id != null) {
write("id: ${event.id}\n")
}
if (event.event != null) {
write("event: ${event.event}\n")
}
for (dataLine in event.data.lines()) {
write("data: $dataLine\n")
}
write("\n")
flush()
}
}
}

View File

@ -16,6 +16,7 @@ repositories{
dependencies{
implementation(project(":dataforge-control-core"))
implementation(project(":dataforge-control-server"))
implementation("no.tornado:tornadofx:1.7.20")
implementation(kotlin("stdlib-jdk8"))
implementation("scientifik:plotlykt-server:$plotlyVersion")

View File

@ -5,6 +5,7 @@ import hep.dataforge.control.controlers.double
import hep.dataforge.values.asValue
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.asCoroutineDispatcher
import java.time.Instant
import java.util.concurrent.Executors
@ -19,7 +20,7 @@ class DemoDevice(parentScope: CoroutineScope = GlobalScope) : DeviceBase() {
private val executor = Executors.newSingleThreadExecutor()
override val scope: CoroutineScope = CoroutineScope(
parentScope.coroutineContext + executor.asCoroutineDispatcher()
parentScope.coroutineContext + executor.asCoroutineDispatcher() + Job(parentScope.coroutineContext[Job])
)
val timeScale: IsolatedDeviceProperty by writingVirtual(5000.0.asValue())

View File

@ -37,6 +37,7 @@ rootProject.name = "dataforge-control"
include(
":dataforge-control-core",
":dataforge-control-server",
":demo"
)