refactoring

This commit is contained in:
Alexander Nozik 2020-10-08 11:20:08 +03:00
parent 64043cf2c0
commit 4b9f535002
15 changed files with 94 additions and 66 deletions

View File

@ -1,13 +0,0 @@
package hep.dataforge.control.client
import io.ktor.client.HttpClient
import io.ktor.client.call.receive
import io.ktor.client.request.get
import io.ktor.client.statement.HttpResponse
import io.ktor.client.statement.HttpStatement
import io.ktor.utils.io.ByteReadChannel
suspend fun HttpClient.sse(address: String) = get<HttpStatement>(address).execute { response: HttpResponse ->
// Response is not downloaded here.
val channel = response.receive<ByteReadChannel>()
}

View File

@ -67,14 +67,6 @@ public interface Device : Closeable, ContextAware {
*/ */
public suspend fun execute(command: String, argument: MetaItem<*>? = null): MetaItem<*>? public suspend fun execute(command: String, argument: MetaItem<*>? = null): MetaItem<*>?
/**
*
* A request with binary data or for binary response (or both). This request does not cover basic functionality like
* [setProperty], [getProperty] or [execute] and not defined for a generic device.
*
*/
public suspend fun respondWithData(request: Envelope): EnvelopeBuilder = error("No binary response defined")
override fun close() { override fun close() {
scope.cancel("The device is closed") scope.cancel("The device is closed")
} }
@ -84,4 +76,14 @@ public interface Device : Closeable, ContextAware {
} }
} }
public interface ResponderDevice{
/**
*
* A request with binary data or for binary response (or both). This request does not cover basic functionality like
* [setProperty], [getProperty] or [execute] and not defined for a generic device.
*
*/
public suspend fun respondWithData(request: Envelope): EnvelopeBuilder
}
public suspend fun Device.execute(name: String, meta: Meta?): MetaItem<*>? = execute(name, meta?.let { MetaItem.NodeItem(it) }) public suspend fun Device.execute(name: String, meta: Meta?): MetaItem<*>? = execute(name, meta?.let { MetaItem.NodeItem(it) })

View File

@ -3,7 +3,7 @@ package hep.dataforge.control.base
import hep.dataforge.control.api.ActionDescriptor import hep.dataforge.control.api.ActionDescriptor
import hep.dataforge.meta.MetaItem import hep.dataforge.meta.MetaItem
public interface Action { public interface DeviceAction {
public val name: String public val name: String
public val descriptor: ActionDescriptor public val descriptor: ActionDescriptor
public suspend operator fun invoke(arg: MetaItem<*>? = null): MetaItem<*>? public suspend operator fun invoke(arg: MetaItem<*>? = null): MetaItem<*>?

View File

@ -19,8 +19,8 @@ public abstract class DeviceBase(override val context: Context) : Device {
private val _properties = HashMap<String, ReadOnlyDeviceProperty>() private val _properties = HashMap<String, ReadOnlyDeviceProperty>()
public val properties: Map<String, ReadOnlyDeviceProperty> get() = _properties public val properties: Map<String, ReadOnlyDeviceProperty> get() = _properties
private val _actions = HashMap<String, Action>() private val _actions = HashMap<String, DeviceAction>()
public val actions: Map<String, Action> get() = _actions public val actions: Map<String, DeviceAction> get() = _actions
private val listeners = ArrayList<Pair<Any?, DeviceListener>>(4) private val listeners = ArrayList<Pair<Any?, DeviceListener>>(4)
@ -54,7 +54,7 @@ public abstract class DeviceBase(override val context: Context) : Device {
_properties[name] = property _properties[name] = property
} }
internal fun registerAction(name: String, action: Action) { internal fun registerAction(name: String, action: DeviceAction) {
if (_actions.contains(name)) error("Action with name $name already registered") if (_actions.contains(name)) error("Action with name $name already registered")
_actions[name] = action _actions[name] = action
} }
@ -199,11 +199,11 @@ public abstract class DeviceBase(override val context: Context) : Device {
/** /**
* A stand-alone action * A stand-alone action
*/ */
private inner class BasicAction( private inner class BasicDeviceAction(
override val name: String, override val name: String,
override val descriptor: ActionDescriptor, override val descriptor: ActionDescriptor,
private val block: suspend (MetaItem<*>?) -> MetaItem<*>?, private val block: suspend (MetaItem<*>?) -> MetaItem<*>?,
) : Action { ) : DeviceAction {
override suspend fun invoke(arg: MetaItem<*>?): MetaItem<*>? = override suspend fun invoke(arg: MetaItem<*>?): MetaItem<*>? =
withContext(scope.coroutineContext + SupervisorJob(scope.coroutineContext[Job])) { withContext(scope.coroutineContext + SupervisorJob(scope.coroutineContext[Job])) {
block(arg).also { block(arg).also {
@ -221,8 +221,8 @@ public abstract class DeviceBase(override val context: Context) : Device {
name: String, name: String,
descriptorBuilder: ActionDescriptor.() -> Unit = {}, descriptorBuilder: ActionDescriptor.() -> Unit = {},
block: suspend (MetaItem<*>?) -> MetaItem<*>?, block: suspend (MetaItem<*>?) -> MetaItem<*>?,
): Action { ): DeviceAction {
val action = BasicAction(name, ActionDescriptor(name).apply(descriptorBuilder), block) val action = BasicDeviceAction(name, ActionDescriptor(name).apply(descriptorBuilder), block)
registerAction(name, action) registerAction(name, action)
return action return action
} }

View File

@ -9,13 +9,13 @@ import kotlin.properties.ReadOnlyProperty
import kotlin.reflect.KProperty import kotlin.reflect.KProperty
private fun <D : DeviceBase> D.provideAction(): ReadOnlyProperty<D, Action> = private fun <D : DeviceBase> D.provideAction(): ReadOnlyProperty<D, DeviceAction> =
ReadOnlyProperty { _: D, property: KProperty<*> -> ReadOnlyProperty { _: D, property: KProperty<*> ->
val name = property.name val name = property.name
return@ReadOnlyProperty actions[name]!! return@ReadOnlyProperty actions[name]!!
} }
public typealias ActionDelegate = ReadOnlyProperty<DeviceBase, Action> public typealias ActionDelegate = ReadOnlyProperty<DeviceBase, DeviceAction>
private class ActionProvider<D : DeviceBase>( private class ActionProvider<D : DeviceBase>(
val owner: D, val owner: D,

View File

@ -1,9 +1,6 @@
package hep.dataforge.control.controllers package hep.dataforge.control.controllers
import hep.dataforge.control.api.Device import hep.dataforge.control.api.*
import hep.dataforge.control.api.DeviceHub
import hep.dataforge.control.api.DeviceListener
import hep.dataforge.control.api.get
import hep.dataforge.control.controllers.DeviceMessage.Companion.PROPERTY_CHANGED_ACTION import hep.dataforge.control.controllers.DeviceMessage.Companion.PROPERTY_CHANGED_ACTION
import hep.dataforge.io.Consumer import hep.dataforge.io.Consumer
import hep.dataforge.io.Envelope import hep.dataforge.io.Envelope
@ -77,13 +74,15 @@ public class DeviceController(
} else if (target != null && target != deviceTarget) { } else if (target != null && target != deviceTarget) {
error("Wrong target name $deviceTarget expected but $target found") error("Wrong target name $deviceTarget expected but $target found")
} else { } else {
if (device is ResponderDevice) {
val response = device.respondWithData(request).apply { val response = device.respondWithData(request).apply {
meta { meta {
"target" put request.meta["source"].string "target" put request.meta["source"].string
"source" put deviceTarget "source" put deviceTarget
} }
} }
return response.seal() response.seal()
} else error("Device does not support binary response")
} }
} catch (ex: Exception) { } catch (ex: Exception) {
DeviceMessage.fail(cause = ex).wrap() DeviceMessage.fail(cause = ex).wrap()
@ -99,7 +98,7 @@ public class DeviceController(
DeviceMessage.ok { DeviceMessage.ok {
targetName = request.sourceName targetName = request.sourceName
sourceName = deviceTarget sourceName = deviceTarget
action ="response.${request.action}" action = "response.${request.action}"
val requestKey = request.key val requestKey = request.key
val requestValue = request.value val requestValue = request.value

View File

@ -10,12 +10,13 @@ kotlin {
commonMain { commonMain {
dependencies { dependencies {
implementation(project(":dataforge-device-core")) implementation(project(":dataforge-device-core"))
implementation(project(":ktor-sse"))
implementation("io.ktor:ktor-client-core:$ktorVersion") implementation("io.ktor:ktor-client-core:$ktorVersion")
} }
} }
jvmMain { jvmMain {
dependencies { dependencies {
implementation("io.ktor:ktor-client-cio:$ktorVersion")
} }
} }
jsMain { jsMain {

View File

@ -7,18 +7,34 @@ import hep.dataforge.meta.toJson
import hep.dataforge.meta.toMeta import hep.dataforge.meta.toMeta
import hep.dataforge.meta.wrap import hep.dataforge.meta.wrap
import io.ktor.client.HttpClient import io.ktor.client.HttpClient
import io.ktor.client.call.receive
import io.ktor.client.request.get
import io.ktor.client.request.post import io.ktor.client.request.post
import io.ktor.client.statement.HttpResponse
import io.ktor.client.statement.HttpStatement
import io.ktor.http.ContentType import io.ktor.http.ContentType
import io.ktor.http.Url import io.ktor.http.Url
import io.ktor.http.contentType import io.ktor.http.contentType
import io.ktor.utils.io.ByteReadChannel
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.serialization.json.* import kotlinx.serialization.json.*
import ru.mipt.npm.io.sse.SseEvent
import ru.mipt.npm.io.sse.readSseFlow
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
private suspend fun HttpClient.readSse(address: String, block: suspend (SseEvent) -> Unit): Job = launch {
get<HttpStatement>(address).execute { response: HttpResponse ->
// Response is not downloaded here.
val channel = response.receive<ByteReadChannel>()
val flow = channel.readSseFlow()
flow.collect(block)
}
}
/* /*
{ {
"id":"string|number[optional, but desired]", "id":"string|number[optional, but desired]",
@ -31,14 +47,14 @@ import kotlin.coroutines.CoroutineContext
} }
*/ */
/** /**
* Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1) * Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1)
*/ */
public class MagixClient( public class MagixClient(
private val manager: DeviceManager, private val manager: DeviceManager,
private val postUrl: Url, private val postUrl: Url,
private val inbox: Flow<JsonObject> private val sseUrl: Url
//private val inbox: Flow<JsonObject>
): CoroutineScope { ): CoroutineScope {
override val coroutineContext: CoroutineContext = manager.context.coroutineContext + Job(manager.context.coroutineContext[Job]) override val coroutineContext: CoroutineContext = manager.context.coroutineContext + Job(manager.context.coroutineContext[Job])
@ -79,7 +95,9 @@ public class MagixClient(
} }
private val respondJob = launch { private val respondJob = launch {
inbox.collect { json -> client.readSse(sseUrl.toString()){
val json = Json.parseToJsonElement(it.data) as JsonObject
val requestId = json["id"]?.jsonPrimitive?.content val requestId = json["id"]?.jsonPrimitive?.content
val payload = json["payload"]?.jsonObject val payload = json["payload"]?.jsonObject
//TODO analyze action //TODO analyze action

View File

@ -50,7 +50,7 @@ class DemoDevice(context: Context) : DeviceBase(context) {
} }
val resetScale: Action by acting { val resetScale: DeviceAction by acting {
timeScaleValue = 5000.0 timeScaleValue = 5000.0
sinScaleValue = 1.0 sinScaleValue = 1.0
cosScaleValue = 1.0 cosScaleValue = 1.0

View File

@ -39,7 +39,7 @@ public suspend fun ByteReadChannel.readSseFlow(): Flow<SseEvent> = channelFlow {
do{ do{
val line = readUTF8Line() val line = readUTF8Line()
if (line != null && !line.isBlank()) { if (line != null && line.isNotBlank()) {
val key = line.substringBefore(":") val key = line.substringBefore(":")
val value = line.substringAfter(": ") val value = line.substringAfter(": ")
when (key) { when (key) {

View File

@ -13,4 +13,5 @@ val ktorVersion: String by rootProject.extra
dependencies { dependencies {
implementation(project(":dataforge-device-core")) implementation(project(":dataforge-device-core"))
implementation(project(":dataforge-magix-client"))
} }

View File

@ -12,8 +12,10 @@ import hep.dataforge.control.ports.PortProxy
import hep.dataforge.control.ports.send import hep.dataforge.control.ports.send
import hep.dataforge.control.ports.withDelimiter import hep.dataforge.control.ports.withDelimiter
import hep.dataforge.meta.MetaItem import hep.dataforge.meta.MetaItem
import hep.dataforge.meta.asMetaItem
import hep.dataforge.names.NameToken import hep.dataforge.names.NameToken
import hep.dataforge.values.Null import hep.dataforge.values.Null
import hep.dataforge.values.asValue
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.flow.takeWhile
@ -48,7 +50,8 @@ public class PiMotionMasterDevice(
private val mutex = Mutex() private val mutex = Mutex()
private suspend fun dispatchError(errorCode: Int) { private suspend fun dispatchError(errorCode: Int) {
logger.error { "Error code: $errorCode" }
//TODO add error handling
} }
private suspend fun sendCommandInternal(command: String, vararg arguments: String) { private suspend fun sendCommandInternal(command: String, vararg arguments: String) {
@ -108,7 +111,7 @@ public class PiMotionMasterDevice(
} }
public val initialize: Action by acting { public val initialize: DeviceAction by acting {
send("INI") send("INI")
} }
@ -116,7 +119,7 @@ public class PiMotionMasterDevice(
request("VER?").first() request("VER?").first()
} }
public val stop: Action by acting( public val stop: DeviceAction by acting(
descriptorBuilder = { descriptorBuilder = {
info = "Stop all axis" info = "Stop all axis"
}, },
@ -164,7 +167,7 @@ public class PiMotionMasterDevice(
info = "Motor enable state." info = "Motor enable state."
} }
public val halt: Action by acting { public val halt: DeviceAction by acting {
send("HLT", axisId) send("HLT", axisId)
} }
@ -218,10 +221,14 @@ public class PiMotionMasterDevice(
} }
} }
val axisIds: ReadOnlyDeviceProperty by reading {
request("SAI?").map { it.asValue() }.asValue().asMetaItem()
}
override val devices: Map<NameToken, Axis> = axes.associate { NameToken(it) to Axis(it) } override val devices: Map<NameToken, Axis> = axes.associate { NameToken(it) to Axis(it) }
/** /**
* * Name-friendly accessor for axis
*/ */
val axes: Map<String, Axis> get() = devices.mapKeys { it.toString() } val axes: Map<String, Axis> get() = devices.mapKeys { it.toString() }

View File

@ -1,6 +1,7 @@
package ru.mipt.npm.devices.pimotionmaster package ru.mipt.npm.devices.pimotionmaster
import hep.dataforge.context.Context import hep.dataforge.context.Context
import hep.dataforge.context.ContextAware
import hep.dataforge.control.api.Socket import hep.dataforge.control.api.Socket
import hep.dataforge.control.ports.AbstractPort import hep.dataforge.control.ports.AbstractPort
import hep.dataforge.control.ports.withDelimiter import hep.dataforge.control.ports.withDelimiter
@ -66,7 +67,11 @@ class VirtualPort(private val device: VirtualDevice, context: Context) : Abstrac
} }
class PiMotionMasterVirtualDevice(scope: CoroutineScope, axisIds: List<String>) : VirtualDevice(scope) { class PiMotionMasterVirtualDevice(
override val context: Context,
axisIds: List<String>,
scope: CoroutineScope = context,
) : VirtualDevice(scope), ContextAware {
init { init {
//add asynchronous send logic here //add asynchronous send logic here
@ -131,13 +136,14 @@ class PiMotionMasterVirtualDevice(scope: CoroutineScope, axisIds: List<String>)
} }
private fun respondForAllAxis(axisIds: List<String>, extract: VirtualAxisState.(index: String) -> Any) { private fun respondForAllAxis(axisIds: List<String>, extract: VirtualAxisState.(index: String) -> Any) {
val selectedAxis = if (axisIds.isEmpty()|| axisIds[0] == "ALL") { val selectedAxis = if (axisIds.isEmpty() || axisIds[0] == "ALL") {
axisState.keys axisState.keys
} else { } else {
axisIds axisIds
} }
val response = selectedAxis.joinToString(postfix = "\n", separator = " \n") { val response = selectedAxis.joinToString(separator = " \n") {
axisState.getValue(it).extract(it).toString() val state = axisState.getValue(it)
"$it=${state.extract(it)}"
} }
respond(response) respond(response)
} }
@ -145,14 +151,18 @@ class PiMotionMasterVirtualDevice(scope: CoroutineScope, axisIds: List<String>)
override suspend fun evaluateRequest(request: ByteArray) { override suspend fun evaluateRequest(request: ByteArray) {
assert(request.last() == '\n'.toByte()) assert(request.last() == '\n'.toByte())
val string = request.decodeToString().substringBefore("\n") val string = request.decodeToString().substringBefore("\n")
.dropWhile { it != '*' && it != '#' && it !in 'A'..'Z' } //filter junk symbols at the beginning of the line
//logger.debug { "Received command: $string" }
val parts = string.split(' ') val parts = string.split(' ')
val command = parts.firstOrNull() ?: error("Command not present") val command = parts.firstOrNull() ?: error("Command not present")
val axisIds: List<String> = parts.drop(1) val axisIds: List<String> = parts.drop(1)
when (command) { when (command) {
"XXX" -> {}//respond("WAT?") "XXX" -> {
"IDN?","*IDN?" -> respond("(c)2015 Physik Instrumente(PI) Karlsruhe, C-885.M1 TCP-IP Master,0,1.0.0.1") }
"IDN?", "*IDN?" -> respond("(c)2015 Physik Instrumente(PI) Karlsruhe, C-885.M1 TCP-IP Master,0,1.0.0.1")
"VER?" -> respond(""" "VER?" -> respond("""
2: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550039, 00.039 2: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550039, 00.039
3: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550040, 00.039 3: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550040, 00.039
@ -212,12 +222,12 @@ class PiMotionMasterVirtualDevice(scope: CoroutineScope, axisIds: List<String>)
respond(errorCode.toString()) respond(errorCode.toString())
errorCode = 0 errorCode = 0
} }
"SAI?" -> respondForAllAxis(axisIds) { it } "SAI?" -> respond(axisIds.joinToString(separator = " \n"))
"CST?" -> respond(WAT) "CST?" -> respondForAllAxis(axisIds) { "L-220.20SG" }
"RON?" -> respondForAllAxis(axisIds) { referenceMode } "RON?" -> respondForAllAxis(axisIds) { referenceMode }
"FRF?" -> respondForAllAxis(axisIds) { "1" } // WAT? "FRF?" -> respondForAllAxis(axisIds) { "1" } // WAT?
"SVO?" -> respondForAllAxis(axisIds) { servoMode } "SVO?" -> respondForAllAxis(axisIds) { servoMode }
"MVO?" -> respondForAllAxis(axisIds) { targetPosition } "MOV?" -> respondForAllAxis(axisIds) { targetPosition }
"POS?" -> respondForAllAxis(axisIds) { position } "POS?" -> respondForAllAxis(axisIds) { position }
"TMN?" -> respondForAllAxis(axisIds) { minPosition } "TMN?" -> respondForAllAxis(axisIds) { minPosition }
"TMX?" -> respondForAllAxis(axisIds) { maxPosition } "TMX?" -> respondForAllAxis(axisIds) { maxPosition }
@ -228,7 +238,10 @@ class PiMotionMasterVirtualDevice(scope: CoroutineScope, axisIds: List<String>)
val servoMode = parts.last() val servoMode = parts.last()
axisState[requestAxis]?.servoMode = servoMode.toInt() axisState[requestAxis]?.servoMode = servoMode.toInt()
} }
else -> errorCode = 2 // do not send anything. Ser error code else -> {
logger.warn { "Unknown command: $command in message ${String(request)}" }
errorCode = 2
} // do not send anything. Ser error code
} }
} }

View File

@ -14,7 +14,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import java.net.InetSocketAddress import java.net.InetSocketAddress
val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable -> val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
throwable.printStackTrace() throwable.printStackTrace()
} }

View File

@ -31,7 +31,7 @@ include(
":dataforge-device-core", ":dataforge-device-core",
":dataforge-device-serial", ":dataforge-device-serial",
":dataforge-device-server", ":dataforge-device-server",
":dataforge-device-client", ":dataforge-magix-client",
":demo", ":demo",
":motors" ":motors"
) )