Working server

This commit is contained in:
Alexander Nozik 2020-07-19 20:37:44 +03:00
parent bbf7f38725
commit d1ec920e1c
29 changed files with 425 additions and 318 deletions

View File

@ -32,7 +32,7 @@ Among other things, you can:
### `dataforge-control-core` module packages
- `api` - defines API for device management. The main class here is
[`Device`](dataforge-control-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt).
[`Device`](dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt).
Generally, a Device has Properties that can be read and written. Also, some Actions
can optionally be applied on a device (may or may not affect properties).

View File

@ -1,5 +1,5 @@
val dataforgeVersion by extra("0.1.8-dev-4")
val plotlyVersion by extra("0.2.0-dev-4")
val dataforgeVersion by extra("0.1.8")
val plotlyVersion by extra("0.2.0-dev-12")
allprojects {

View File

@ -1,14 +0,0 @@
package hep.dataforge.control.controllers
import hep.dataforge.control.base.DeviceProperty
import hep.dataforge.control.base.ReadOnlyDeviceProperty
import hep.dataforge.meta.MetaItem
import hep.dataforge.meta.double
import hep.dataforge.meta.map
import hep.dataforge.values.asValue
fun ReadOnlyDeviceProperty.double() = map { it.double }
fun DeviceProperty.double() = map(
reader = { it.double ?: Double.NaN },
writer = { MetaItem.ValueItem(it.asValue()) }
)

View File

@ -1,26 +0,0 @@
package hep.dataforge.control.server
import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.io.Envelope
import io.ktor.application.ApplicationCall
import io.ktor.http.cio.websocket.Frame
fun Frame.toEnvelope(): Envelope {
TODO()
}
fun Envelope.toFrame(): Frame {
TODO()
}
suspend fun ApplicationCall.respondMessage(message: DeviceMessage) {
TODO()
}
suspend fun ApplicationCall.respondMessage(builder: DeviceMessage.() -> Unit) {
respondMessage(DeviceMessage(builder))
}
suspend fun ApplicationCall.respondFail(builder: DeviceMessage.() -> Unit) {
respondMessage(DeviceMessage.fail(null, builder))
}

View File

@ -0,0 +1,18 @@
plugins {
id("scientifik.mpp")
id("scientifik.publish")
}
val ktorVersion: String by extra("1.3.2")
kotlin {
sourceSets {
commonMain{
dependencies {
implementation(project(":dataforge-device-core"))
implementation("io.ktor:ktor-client-cio:$ktorVersion")
}
}
}
}

View File

@ -4,7 +4,6 @@ import scientifik.useSerialization
plugins {
id("scientifik.mpp")
id("scientifik.publish")
id("kotlinx-atomicfu") version "0.14.3"
}
val dataforgeVersion: String by rootProject.extra
@ -22,7 +21,3 @@ kotlin {
}
}
}
atomicfu {
variant = "VH"
}

View File

@ -59,16 +59,11 @@ interface Device: Closeable {
* Send an action request and suspend caller while request is being processed.
* Could return null if request does not return a meaningful answer.
*/
suspend fun call(action: String, argument: MetaItem<*>? = null): MetaItem<*>?
suspend fun exec(action: String, argument: MetaItem<*>? = null): MetaItem<*>?
override fun close() {
scope.cancel("The device is closed")
}
companion object {
const val GET_PROPERTY_ACTION = "@get"
const val SET_PROPERTY_ACTION = "@set"
}
}
suspend fun Device.call(name: String, meta: Meta?) = call(name, meta?.let { MetaItem.NodeItem(it) })
suspend fun Device.exec(name: String, meta: Meta?) = exec(name, meta?.let { MetaItem.NodeItem(it) })

View File

@ -18,6 +18,6 @@ suspend fun DeviceHub.setProperty(deviceName: String, propertyName: String, valu
.setProperty(propertyName, value)
}
suspend fun DeviceHub.call(deviceName: String, command: String, argument: MetaItem<*>?): MetaItem<*>? =
suspend fun DeviceHub.exec(deviceName: String, command: String, argument: MetaItem<*>?): MetaItem<*>? =
(getDevice(deviceName) ?: error("Device with name $deviceName not found in the hub"))
.call(command, argument)
.exec(command, argument)

View File

@ -54,7 +54,7 @@ abstract class DeviceBase : Device {
)
}
override suspend fun call(action: String, argument: MetaItem<*>?): MetaItem<*>? =
override suspend fun exec(action: String, argument: MetaItem<*>?): MetaItem<*>? =
(actions[action] ?: error("Request with name $action not defined")).invoke(argument)

View File

@ -4,15 +4,12 @@ import hep.dataforge.control.api.PropertyDescriptor
import hep.dataforge.meta.MetaItem
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlin.properties.ReadOnlyProperty
import kotlin.properties.ReadWriteProperty
import kotlin.reflect.KProperty
import kotlin.time.Duration
/**
* Read-only device property
*/
interface ReadOnlyDeviceProperty : ReadOnlyProperty<Any?, MetaItem<*>?> {
interface ReadOnlyDeviceProperty {
/**
* Property name, should be unique in device
*/
@ -51,10 +48,9 @@ interface ReadOnlyDeviceProperty : ReadOnlyProperty<Any?, MetaItem<*>?> {
* Produces null when the state is invalidated
*/
fun flow(): Flow<MetaItem<*>?>
override fun getValue(thisRef: Any?, property: KProperty<*>): MetaItem<*>? = value
}
/**
* Launch recurring force re-read job on a property scope with given [duration] between reads.
*/
@ -68,18 +64,11 @@ fun ReadOnlyDeviceProperty.readEvery(duration: Duration): Job = scope.launch {
/**
* A writeable device property with non-suspended write
*/
interface DeviceProperty : ReadOnlyDeviceProperty, ReadWriteProperty<Any?, MetaItem<*>?> {
interface DeviceProperty : ReadOnlyDeviceProperty {
override var value: MetaItem<*>?
/**
* Write value to physical device. Invalidates logical value, but does not update it automatically
*/
suspend fun write(item: MetaItem<*>)
override fun setValue(thisRef: Any?, property: KProperty<*>, value: MetaItem<*>?) {
this.value = value
}
override fun getValue(thisRef: Any?, property: KProperty<*>): MetaItem<*>? = value
}

View File

@ -3,26 +3,29 @@ package hep.dataforge.control.controllers
import hep.dataforge.control.controllers.DeviceMessage.Companion.PAYLOAD_VALUE_KEY
import hep.dataforge.meta.*
import hep.dataforge.names.asName
import hep.dataforge.names.plus
class DeviceMessage : Scheme() {
var id by item()
var source by string()//TODO consider replacing by item
var id by string { error("The message id must not be empty") }
var parent by string()
var origin by string()
var target by string()
var action by string(default = MessageController.GET_PROPERTY_ACTION, key = MESSAGE_ACTION_KEY)
var comment by string()
var action by string(key = MESSAGE_ACTION_KEY)
var status by string(RESPONSE_OK_STATUS)
var payload by config(key = MESSAGE_PAYLOAD_KEY)
var value by item(key = (MESSAGE_PAYLOAD_KEY + PAYLOAD_VALUE_KEY))
var payload: List<MessagePayload>
get() = config.getIndexed(MESSAGE_PAYLOAD_KEY).values.map { MessagePayload.wrap(it.node!!) }
set(value) {
config[MESSAGE_PAYLOAD_KEY] = value.map { it.config }
}
/**
* Set a payload for this message according to the given scheme
* Append a payload to this message according to the given scheme
*/
inline fun <T : Scheme> payload(spec: Specification<T>, block: T.() -> Unit): T =
(payload?.let { spec.wrap(it) } ?: spec.empty().also { payload = it.config }).apply(block)
fun <T : Configurable> append(spec: Specification<T>, block: T.() -> Unit): T =
spec.invoke(block).also { config.append(MESSAGE_PAYLOAD_KEY, it) }
companion object : SchemeSpec<DeviceMessage>(::DeviceMessage){
companion object : SchemeSpec<DeviceMessage>(::DeviceMessage) {
val MESSAGE_ACTION_KEY = "action".asName()
val MESSAGE_PAYLOAD_KEY = "payload".asName()
val PAYLOAD_VALUE_KEY = "value".asName()
@ -34,32 +37,25 @@ class DeviceMessage : Scheme() {
request: DeviceMessage? = null,
block: DeviceMessage.() -> Unit = {}
): DeviceMessage = DeviceMessage {
id = request?.id
parent = request?.id
}.apply(block)
inline fun fail(
request: DeviceMessage? = null,
block: DeviceMessage.() -> Unit = {}
): DeviceMessage = DeviceMessage {
id = request?.id
parent = request?.id
status = RESPONSE_FAIL_STATUS
}.apply(block)
}
}
class PropertyPayload : Scheme() {
class MessagePayload : Scheme() {
var name by string { error("Property name could not be empty") }
var value by item(key = PAYLOAD_VALUE_KEY)
companion object : SchemeSpec<PropertyPayload>(::PropertyPayload)
companion object : SchemeSpec<MessagePayload>(::MessagePayload)
}
@DFBuilder
inline fun DeviceMessage.property(block: PropertyPayload.() -> Unit): PropertyPayload = payload(PropertyPayload, block)
var DeviceMessage.property: PropertyPayload?
get() = payload?.let { PropertyPayload.wrap(it) }
set(value) {
payload = value?.config
}
fun DeviceMessage.property(block: MessagePayload.() -> Unit): MessagePayload = append(MessagePayload, block)

View File

@ -40,23 +40,17 @@ class MessageController(
comment = "Wrong target name $deviceTarget expected but ${request.target} found"
}
} else try {
when (val action = request.action ?: error("Action is not defined in message")) {
Device.GET_PROPERTY_ACTION -> {
val property = request.property ?: error("Payload is not defined or not a property")
val propertyName: String = property.name
val result = device.getProperty(propertyName)
DeviceMessage.ok {
this.source = deviceTarget
this.target = request.source
property {
name = propertyName
value = result
val result: List<MessagePayload> = when (val action = request.action) {
GET_PROPERTY_ACTION -> {
request.payload.map { property ->
MessagePayload {
name = property.name
value = device.getProperty(name)
}
}
}
Device.SET_PROPERTY_ACTION -> {
val property = request.property ?: error("Payload is not defined or not a property")
SET_PROPERTY_ACTION -> {
request.payload.map { property ->
val propertyName: String = property.name
val propertyValue = property.value
if (propertyValue == null) {
@ -64,23 +58,47 @@ class MessageController(
} else {
device.setProperty(propertyName, propertyValue)
}
DeviceMessage.ok {
this.source = deviceTarget
this.target = request.source
property {
MessagePayload {
name = propertyName
value = device.getProperty(propertyName)
}
}
}
EXECUTE_ACTION -> {
request.payload.map { payload ->
MessagePayload {
name = payload.name
value = device.exec(payload.name, payload.value)
}
}
}
PROPERTY_LIST_ACTION -> {
device.propertyDescriptors.map { descriptor ->
MessagePayload {
name = descriptor.name
value = MetaItem.NodeItem(descriptor.config)
}
}
}
ACTION_LIST_ACTION -> {
device.actionDescriptors.map { descriptor ->
MessagePayload {
name = descriptor.name
value = MetaItem.NodeItem(descriptor.config)
}
}
}
else -> {
val value = request.value
val result = device.call(action, value)
error("Unrecognized action $action")
}
}
DeviceMessage.ok {
this.source = deviceTarget
this.action = action
this.value = result
}
}
this.parent = request.id
this.origin = deviceTarget
this.target = request.origin
this.payload = result
}
} catch (ex: Exception) {
DeviceMessage.fail {
@ -105,7 +123,7 @@ class MessageController(
if (value == null) return
scope.launch {
val change = DeviceMessage.ok {
this.source = deviceTarget
this.origin = deviceTarget
action = PROPERTY_CHANGED_ACTION
property {
name = propertyName
@ -122,5 +140,10 @@ class MessageController(
companion object {
const val GET_PROPERTY_ACTION = "read"
const val SET_PROPERTY_ACTION = "write"
const val EXECUTE_ACTION = "execute"
const val PROPERTY_LIST_ACTION = "propertyList"
const val ACTION_LIST_ACTION = "actionList"
}
}

View File

@ -0,0 +1,40 @@
package hep.dataforge.control.controllers
import hep.dataforge.control.base.DeviceProperty
import hep.dataforge.control.base.ReadOnlyDeviceProperty
import hep.dataforge.meta.MetaItem
import hep.dataforge.meta.transformations.MetaConverter
import hep.dataforge.values.Null
import kotlin.properties.ReadOnlyProperty
import kotlin.properties.ReadWriteProperty
import kotlin.reflect.KProperty
operator fun ReadOnlyDeviceProperty.getValue(thisRef: Any?, property: KProperty<*>): MetaItem<*> =
value ?: MetaItem.ValueItem(Null)
operator fun DeviceProperty.setValue(thisRef: Any?, property: KProperty<*>, value: MetaItem<*>) {
this.value = value
}
fun <T : Any> ReadOnlyDeviceProperty.convert(metaConverter: MetaConverter<T>): ReadOnlyProperty<Any?, T> {
return object : ReadOnlyProperty<Any?, T> {
override fun getValue(thisRef: Any?, property: KProperty<*>): T {
return this@convert.getValue(thisRef, property).let { metaConverter.itemToObject(it) }
}
}
}
fun <T : Any> DeviceProperty.convert(metaConverter: MetaConverter<T>): ReadWriteProperty<Any?, T> {
return object : ReadWriteProperty<Any?, T> {
override fun getValue(thisRef: Any?, property: KProperty<*>): T {
return this@convert.getValue(thisRef, property).let { metaConverter.itemToObject(it) }
}
override fun setValue(thisRef: Any?, property: KProperty<*>, value: T) {
this@convert.setValue(thisRef, property, value.let { metaConverter.objectToMetaItem(it) })
}
}
}
fun ReadOnlyDeviceProperty.double() = convert(MetaConverter.double)
fun DeviceProperty.double() = convert(MetaConverter.double)

View File

@ -1,10 +1,8 @@
import scientifik.useCoroutines
import scientifik.useSerialization
plugins {
id("scientifik.jvm")
id("scientifik.publish")
application
}
useSerialization()
@ -13,7 +11,7 @@ val dataforgeVersion: String by rootProject.extra
val ktorVersion: String by extra("1.3.2")
dependencies{
implementation(project(":dataforge-control-core"))
implementation(project(":dataforge-device-core"))
implementation("io.ktor:ktor-server-cio:$ktorVersion")
implementation("io.ktor:ktor-websockets:$ktorVersion")
implementation("io.ktor:ktor-serialization:$ktorVersion")

View File

@ -0,0 +1,43 @@
package hep.dataforge.control.server
import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.io.*
import hep.dataforge.meta.MetaSerializer
import io.ktor.application.ApplicationCall
import io.ktor.http.ContentType
import io.ktor.http.cio.websocket.Frame
import io.ktor.response.respondText
import kotlinx.io.asBinary
import kotlinx.serialization.UnstableDefault
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonObjectBuilder
import kotlinx.serialization.json.json
fun Frame.toEnvelope(): Envelope {
return data.asBinary().readWith(TaggedEnvelopeFormat)
}
fun Envelope.toFrame(): Frame {
val data = buildByteArray {
writeWith(TaggedEnvelopeFormat,this@toFrame)
}
return Frame.Binary(false, data)
}
suspend fun ApplicationCall.respondJson(builder: JsonObjectBuilder.() -> Unit) {
val json = json(builder)
respondText(json.toString(), contentType = ContentType.Application.Json)
}
@OptIn(UnstableDefault::class)
suspend fun ApplicationCall.respondMessage(message: DeviceMessage) {
respondText(Json.stringify(MetaSerializer,message.toMeta()), contentType = ContentType.Application.Json)
}
suspend fun ApplicationCall.respondMessage(builder: DeviceMessage.() -> Unit) {
respondMessage(DeviceMessage(builder))
}
suspend fun ApplicationCall.respondFail(builder: DeviceMessage.() -> Unit) {
respondMessage(DeviceMessage.fail(null, builder))
}

View File

@ -5,21 +5,22 @@ package hep.dataforge.control.server
import hep.dataforge.control.api.Device
import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.control.controllers.MessageController
import hep.dataforge.control.controllers.MessageController.Companion.GET_PROPERTY_ACTION
import hep.dataforge.control.controllers.MessageController.Companion.SET_PROPERTY_ACTION
import hep.dataforge.control.controllers.property
import hep.dataforge.meta.toJson
import hep.dataforge.meta.toMeta
import hep.dataforge.meta.toMetaItem
import hep.dataforge.meta.wrap
import io.ktor.application.*
import io.ktor.features.CORS
import io.ktor.features.ContentNegotiation
import io.ktor.features.StatusPages
import io.ktor.html.respondHtml
import io.ktor.http.ContentType
import io.ktor.http.HttpStatusCode
import io.ktor.request.receiveText
import io.ktor.response.respond
import io.ktor.response.respondRedirect
import io.ktor.response.respondText
import io.ktor.routing.*
import io.ktor.serialization.json
import io.ktor.server.cio.CIO
@ -40,7 +41,9 @@ import kotlinx.html.h1
import kotlinx.html.head
import kotlinx.html.title
import kotlinx.serialization.UnstableDefault
import kotlinx.serialization.json.*
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.jsonArray
/**
* Create and start a web server for several devices
@ -48,15 +51,18 @@ import kotlinx.serialization.json.*
fun CoroutineScope.startDeviceServer(
devices: Map<String, Device>,
port: Int = 8111,
host: String = "0.0.0.0"
host: String = "localhost"
): ApplicationEngine {
val controllers = devices.mapValues {
MessageController(it.value, it.key, this)
}
return embeddedServer(CIO, port, host) {
return this.embeddedServer(CIO, port, host) {
install(WebSockets)
install(CORS) {
anyHost()
}
install(ContentNegotiation) {
json()
}
@ -65,23 +71,21 @@ fun CoroutineScope.startDeviceServer(
call.respond(HttpStatusCode.BadRequest, cause.message ?: "")
}
}
deviceModule(controllers)
routing {
routeDevices(controllers)
get("/") {
call.respondRedirect("/dashboard")
}
}
}.start()
}
suspend fun ApplicationCall.respondJson(builder: JsonObjectBuilder.() -> Unit) {
val json = json(builder)
respondText(json.toString(), contentType = ContentType.Application.Json)
fun ApplicationEngine.whenStarted(callback: Application.() -> Unit){
environment.monitor.subscribe(ApplicationStarted, callback)
}
const val WEB_SERVER_TARGET = "@webServer"
const val WEB_SERVER_TARGET = "@webServer"
private suspend fun ApplicationCall.message(target: MessageController) {
val body = receiveText()
@ -98,8 +102,8 @@ private suspend fun ApplicationCall.message(target: MessageController) {
private suspend fun ApplicationCall.getProperty(target: MessageController) {
val property: String by parameters
val request = DeviceMessage {
action = Device.GET_PROPERTY_ACTION
source = WEB_SERVER_TARGET
action = GET_PROPERTY_ACTION
origin = WEB_SERVER_TARGET
this.target = target.deviceTarget
property {
name = property
@ -116,8 +120,8 @@ private suspend fun ApplicationCall.setProperty(target: MessageController) {
val json = Json.parseJson(body)
val request = DeviceMessage {
action = Device.SET_PROPERTY_ACTION
source = WEB_SERVER_TARGET
action = SET_PROPERTY_ACTION
origin = WEB_SERVER_TARGET
this.target = target.deviceTarget
property {
name = property
@ -129,15 +133,23 @@ private suspend fun ApplicationCall.setProperty(target: MessageController) {
respondMessage(response)
}
fun Routing.routeDevices(targets: Map<String, MessageController>) {
this.application.feature(WebSockets)
@OptIn(KtorExperimentalAPI::class)
fun Application.deviceModule(targets: Map<String, MessageController>, route: String = "/") {
if(featureOrNull(WebSockets) == null) {
install(WebSockets)
}
if(featureOrNull(CORS)==null){
install(CORS) {
anyHost()
}
}
fun generateFlow(target: String?) = if (target == null) {
targets.values.asFlow().flatMapMerge { it.output() }
} else {
targets[target]?.output() ?: error("The device with target $target not found")
}
routing {
route(route) {
get("dashboard") {
call.respondHtml {
head {
@ -190,7 +202,8 @@ fun Routing.routeDevices(targets: Map<String, MessageController>) {
post("message") {
val target: String by call.request.queryParameters
val controller = targets[target] ?: throw IllegalArgumentException("Target $target not found in $targets")
val controller =
targets[target] ?: throw IllegalArgumentException("Target $target not found in $targets")
call.message(controller)
}
@ -214,4 +227,6 @@ fun Routing.routeDevices(targets: Map<String, MessageController>) {
}
}
}
}
}
}

View File

@ -1,7 +1,7 @@
plugins {
kotlin("jvm") version "1.3.72"
id("org.openjfx.javafxplugin") version "0.0.8"
`application`
application
}
val plotlyVersion: String by rootProject.extra
@ -16,8 +16,8 @@ repositories{
}
dependencies{
implementation(project(":dataforge-control-core"))
implementation(project(":dataforge-control-server"))
implementation(project(":dataforge-device-core"))
implementation(project(":dataforge-device-server"))
implementation("no.tornado:tornadofx:1.7.20")
implementation(kotlin("stdlib-jdk8"))
implementation("scientifik:plotlykt-server:$plotlyVersion")

View File

@ -1,12 +1,12 @@
package hep.dataforge.control.demo
import io.ktor.server.engine.ApplicationEngine
import javafx.scene.Parent
import javafx.scene.control.Slider
import javafx.scene.layout.Priority
import javafx.stage.Stage
import kotlinx.coroutines.*
import org.slf4j.LoggerFactory
import scientifik.plotly.server.PlotlyServer
import tornadofx.*
import java.awt.Desktop
import java.net.URI
@ -17,19 +17,19 @@ val logger = LoggerFactory.getLogger("Demo")
class DemoController : Controller(), CoroutineScope {
var device: DemoDevice? = null
var server: PlotlyServer? = null
var server: ApplicationEngine? = null
override val coroutineContext: CoroutineContext = GlobalScope.newCoroutineContext(Dispatchers.Default) + Job()
fun init() {
launch {
device = DemoDevice(this)
server = device?.let { servePlots(it) }
server = device?.let { this.startDemoDeviceServer(it) }
}
}
fun shutdown() {
logger.info("Shutting down...")
server?.stop()
server?.stop(1000, 5000)
logger.info("Visualization server stopped")
device?.close()
logger.info("Device server stopped")
@ -89,7 +89,9 @@ class DemoControllerView : View(title = " Demo controller remote") {
useMaxWidth = true
action {
controller.server?.run {
val uri = URI("http", null, host, port, null, null, null)
val host = "localhost"//environment.connectors.first().host
val port = environment.connectors.first().port
val uri = URI("http", null, host, port, "/plots", null, null)
Desktop.getDesktop().browse(uri)
}
}

View File

@ -1,94 +0,0 @@
package hep.dataforge.control.demo
import hep.dataforge.meta.double
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import scientifik.plotly.Plotly
import scientifik.plotly.layout
import scientifik.plotly.models.Trace
import scientifik.plotly.server.PlotlyServer
import scientifik.plotly.server.pushUpdates
import scientifik.plotly.server.serve
import scientifik.plotly.trace
import java.util.concurrent.ConcurrentLinkedQueue
/**
* In-place replacement for absent method from stdlib
*/
fun <T> Flow<T>.windowed(size: Int): Flow<Iterable<T>> {
val queue = ConcurrentLinkedQueue<T>()
return flow {
this@windowed.collect {
queue.add(it)
if (queue.size >= size) {
queue.poll()
}
emit(queue)
}
}
}
suspend fun Trace.updateFrom(axisName: String, flow: Flow<Iterable<Double>>) {
flow.collect {
axis(axisName).numbers = it
}
}
suspend fun Trace.updateXYFrom(flow: Flow<Iterable<Pair<Double, Double>>>) {
flow.collect { pairs ->
x.numbers = pairs.map { it.first }
y.numbers = pairs.map { it.second }
}
}
fun CoroutineScope.servePlots(device: DemoDevice): PlotlyServer {
val sinFlow = device.sin.flow()
val cosFlow = device.cos.flow()
val sinCosFlow = sinFlow.zip(cosFlow) { sin, cos ->
sin.double!! to cos.double!!
}
return Plotly.serve(this) {
plot(rowNumber = 0, colOrderNumber = 0, size = 6) {
layout {
title = "sin property"
xaxis.title = "point index"
yaxis.title = "sin"
}
trace {
this@servePlots.launch {
val flow: Flow<Iterable<Double>> = sinFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
}
}
}
plot(rowNumber = 0, colOrderNumber = 1, size = 6) {
layout {
title = "cos property"
xaxis.title = "point index"
yaxis.title = "cos"
}
trace {
this@servePlots.launch {
val flow: Flow<Iterable<Double>> = cosFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
}
}
}
plot(rowNumber = 1, colOrderNumber = 0, size = 12) {
layout {
title = "cos vs sin"
xaxis.title = "sin"
yaxis.title = "cos"
}
trace {
name = "non-synchronized"
this@servePlots.launch {
val flow: Flow<Iterable<Pair<Double, Double>>> = sinCosFlow.windowed(30)
updateXYFrom(flow)
}
}
}
}.pushUpdates()
}

View File

@ -0,0 +1,126 @@
package hep.dataforge.control.demo
import hep.dataforge.control.server.startDeviceServer
import hep.dataforge.control.server.whenStarted
import hep.dataforge.meta.double
import io.ktor.application.uninstall
import io.ktor.server.engine.ApplicationEngine
import io.ktor.websocket.WebSockets
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.html.div
import kotlinx.html.link
import scientifik.plotly.layout
import scientifik.plotly.models.Trace
import scientifik.plotly.plot
import scientifik.plotly.server.PlotlyServerConfig
import scientifik.plotly.server.PlotlyUpdateMode
import scientifik.plotly.server.plotlyModule
import scientifik.plotly.trace
import java.util.concurrent.ConcurrentLinkedQueue
/**
* In-place replacement for absent method from stdlib
*/
fun <T> Flow<T>.windowed(size: Int): Flow<Iterable<T>> {
val queue = ConcurrentLinkedQueue<T>()
return flow {
this@windowed.collect {
queue.add(it)
if (queue.size >= size) {
queue.poll()
}
emit(queue)
}
}
}
suspend fun Trace.updateFrom(axisName: String, flow: Flow<Iterable<Double>>) {
flow.collect {
axis(axisName).numbers = it
}
}
suspend fun Trace.updateXYFrom(flow: Flow<Iterable<Pair<Double, Double>>>) {
flow.collect { pairs ->
x.numbers = pairs.map { it.first }
y.numbers = pairs.map { it.second }
}
}
fun CoroutineScope.startDemoDeviceServer(device: DemoDevice): ApplicationEngine {
val server = startDeviceServer(mapOf("demo" to device))
server.whenStarted {
uninstall(WebSockets)
plotlyModule(
"plots",
PlotlyServerConfig { updateMode = PlotlyUpdateMode.PUSH; updateInterval = 50 }
) { container ->
val sinFlow = device.sin.flow()
val cosFlow = device.cos.flow()
val sinCosFlow = sinFlow.zip(cosFlow) { sin, cos ->
sin.double!! to cos.double!!
}
link {
rel = "stylesheet"
href = "https://stackpath.bootstrapcdn.com/bootstrap/4.5.0/css/bootstrap.min.css"
attributes["integrity"] = "sha384-9aIt2nRpC12Uk9gS9baDl411NQApFmC26EwAOH8WgZl5MYYxFfc+NcPb1dKGj7Sk"
attributes["crossorigin"] = "anonymous"
}
div("row") {
div("col-6") {
plot(container = container) {
layout {
title = "sin property"
xaxis.title = "point index"
yaxis.title = "sin"
}
trace {
launch {
val flow: Flow<Iterable<Double>> = sinFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
}
}
}
}
div("col-6") {
plot(container = container) {
layout {
title = "cos property"
xaxis.title = "point index"
yaxis.title = "cos"
}
trace {
launch {
val flow: Flow<Iterable<Double>> = cosFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
}
}
}
}
}
div("row") {
div("col-12") {
plot(container = container) {
layout {
title = "cos vs sin"
xaxis.title = "sin"
yaxis.title = "cos"
}
trace {
name = "non-synchronized"
launch {
val flow: Flow<Iterable<Pair<Double, Double>>> = sinCosFlow.windowed(30)
updateXYFrom(flow)
}
}
}
}
}
}
}
return server
}

View File

@ -33,11 +33,12 @@ pluginManagement {
}
}
rootProject.name = "dataforge-control"
rootProject.name = "dataforge-device"
include(
":dataforge-control-core",
":dataforge-control-server",
":dataforge-device-core",
":dataforge-device-server",
":dataforge-device-client",
":demo"
)