diff --git a/dataforge-device-client/src/commonMain/kotlin/hep/dataforge/control/client/sseClient.kt b/dataforge-device-client/src/commonMain/kotlin/hep/dataforge/control/client/sseClient.kt deleted file mode 100644 index 0548aef..0000000 --- a/dataforge-device-client/src/commonMain/kotlin/hep/dataforge/control/client/sseClient.kt +++ /dev/null @@ -1,13 +0,0 @@ -package hep.dataforge.control.client - -import io.ktor.client.HttpClient -import io.ktor.client.call.receive -import io.ktor.client.request.get -import io.ktor.client.statement.HttpResponse -import io.ktor.client.statement.HttpStatement -import io.ktor.utils.io.ByteReadChannel - -suspend fun HttpClient.sse(address: String) = get(address).execute { response: HttpResponse -> - // Response is not downloaded here. - val channel = response.receive() -} \ No newline at end of file diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt index 8c1f607..d6f5d5f 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt @@ -67,14 +67,6 @@ public interface Device : Closeable, ContextAware { */ public suspend fun execute(command: String, argument: MetaItem<*>? = null): MetaItem<*>? - /** - * - * A request with binary data or for binary response (or both). This request does not cover basic functionality like - * [setProperty], [getProperty] or [execute] and not defined for a generic device. - * - */ - public suspend fun respondWithData(request: Envelope): EnvelopeBuilder = error("No binary response defined") - override fun close() { scope.cancel("The device is closed") } @@ -84,4 +76,14 @@ public interface Device : Closeable, ContextAware { } } +public interface ResponderDevice{ + /** + * + * A request with binary data or for binary response (or both). This request does not cover basic functionality like + * [setProperty], [getProperty] or [execute] and not defined for a generic device. + * + */ + public suspend fun respondWithData(request: Envelope): EnvelopeBuilder +} + public suspend fun Device.execute(name: String, meta: Meta?): MetaItem<*>? = execute(name, meta?.let { MetaItem.NodeItem(it) }) \ No newline at end of file diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/Action.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceAction.kt similarity index 89% rename from dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/Action.kt rename to dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceAction.kt index 962c364..b9f8e00 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/Action.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceAction.kt @@ -3,7 +3,7 @@ package hep.dataforge.control.base import hep.dataforge.control.api.ActionDescriptor import hep.dataforge.meta.MetaItem -public interface Action { +public interface DeviceAction { public val name: String public val descriptor: ActionDescriptor public suspend operator fun invoke(arg: MetaItem<*>? = null): MetaItem<*>? diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt index 626ce11..312cc21 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt @@ -19,8 +19,8 @@ public abstract class DeviceBase(override val context: Context) : Device { private val _properties = HashMap() public val properties: Map get() = _properties - private val _actions = HashMap() - public val actions: Map get() = _actions + private val _actions = HashMap() + public val actions: Map get() = _actions private val listeners = ArrayList>(4) @@ -54,7 +54,7 @@ public abstract class DeviceBase(override val context: Context) : Device { _properties[name] = property } - internal fun registerAction(name: String, action: Action) { + internal fun registerAction(name: String, action: DeviceAction) { if (_actions.contains(name)) error("Action with name $name already registered") _actions[name] = action } @@ -199,11 +199,11 @@ public abstract class DeviceBase(override val context: Context) : Device { /** * A stand-alone action */ - private inner class BasicAction( + private inner class BasicDeviceAction( override val name: String, override val descriptor: ActionDescriptor, private val block: suspend (MetaItem<*>?) -> MetaItem<*>?, - ) : Action { + ) : DeviceAction { override suspend fun invoke(arg: MetaItem<*>?): MetaItem<*>? = withContext(scope.coroutineContext + SupervisorJob(scope.coroutineContext[Job])) { block(arg).also { @@ -221,8 +221,8 @@ public abstract class DeviceBase(override val context: Context) : Device { name: String, descriptorBuilder: ActionDescriptor.() -> Unit = {}, block: suspend (MetaItem<*>?) -> MetaItem<*>?, - ): Action { - val action = BasicAction(name, ActionDescriptor(name).apply(descriptorBuilder), block) + ): DeviceAction { + val action = BasicDeviceAction(name, ActionDescriptor(name).apply(descriptorBuilder), block) registerAction(name, action) return action } diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/actionDelegates.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/actionDelegates.kt index 877739e..79d6019 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/actionDelegates.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/actionDelegates.kt @@ -9,13 +9,13 @@ import kotlin.properties.ReadOnlyProperty import kotlin.reflect.KProperty -private fun D.provideAction(): ReadOnlyProperty = +private fun D.provideAction(): ReadOnlyProperty = ReadOnlyProperty { _: D, property: KProperty<*> -> val name = property.name return@ReadOnlyProperty actions[name]!! } -public typealias ActionDelegate = ReadOnlyProperty +public typealias ActionDelegate = ReadOnlyProperty private class ActionProvider( val owner: D, diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt index fc00df8..6a80602 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt @@ -1,9 +1,6 @@ package hep.dataforge.control.controllers -import hep.dataforge.control.api.Device -import hep.dataforge.control.api.DeviceHub -import hep.dataforge.control.api.DeviceListener -import hep.dataforge.control.api.get +import hep.dataforge.control.api.* import hep.dataforge.control.controllers.DeviceMessage.Companion.PROPERTY_CHANGED_ACTION import hep.dataforge.io.Consumer import hep.dataforge.io.Envelope @@ -77,13 +74,15 @@ public class DeviceController( } else if (target != null && target != deviceTarget) { error("Wrong target name $deviceTarget expected but $target found") } else { - val response = device.respondWithData(request).apply { - meta { - "target" put request.meta["source"].string - "source" put deviceTarget + if (device is ResponderDevice) { + val response = device.respondWithData(request).apply { + meta { + "target" put request.meta["source"].string + "source" put deviceTarget + } } - } - return response.seal() + response.seal() + } else error("Device does not support binary response") } } catch (ex: Exception) { DeviceMessage.fail(cause = ex).wrap() @@ -99,7 +98,7 @@ public class DeviceController( DeviceMessage.ok { targetName = request.sourceName sourceName = deviceTarget - action ="response.${request.action}" + action = "response.${request.action}" val requestKey = request.key val requestValue = request.value diff --git a/dataforge-device-client/build.gradle.kts b/dataforge-magix-client/build.gradle.kts similarity index 79% rename from dataforge-device-client/build.gradle.kts rename to dataforge-magix-client/build.gradle.kts index 3579034..107e787 100644 --- a/dataforge-device-client/build.gradle.kts +++ b/dataforge-magix-client/build.gradle.kts @@ -10,12 +10,13 @@ kotlin { commonMain { dependencies { implementation(project(":dataforge-device-core")) + implementation(project(":ktor-sse")) implementation("io.ktor:ktor-client-core:$ktorVersion") } } jvmMain { dependencies { - + implementation("io.ktor:ktor-client-cio:$ktorVersion") } } jsMain { diff --git a/dataforge-device-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt b/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt similarity index 77% rename from dataforge-device-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt rename to dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt index d14f6c4..926f26f 100644 --- a/dataforge-device-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt +++ b/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt @@ -7,18 +7,34 @@ import hep.dataforge.meta.toJson import hep.dataforge.meta.toMeta import hep.dataforge.meta.wrap import io.ktor.client.HttpClient +import io.ktor.client.call.receive +import io.ktor.client.request.get import io.ktor.client.request.post +import io.ktor.client.statement.HttpResponse +import io.ktor.client.statement.HttpStatement import io.ktor.http.ContentType import io.ktor.http.Url import io.ktor.http.contentType +import io.ktor.utils.io.ByteReadChannel import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import kotlinx.serialization.json.* +import ru.mipt.npm.io.sse.SseEvent +import ru.mipt.npm.io.sse.readSseFlow import kotlin.coroutines.CoroutineContext + +private suspend fun HttpClient.readSse(address: String, block: suspend (SseEvent) -> Unit): Job = launch { + get(address).execute { response: HttpResponse -> + // Response is not downloaded here. + val channel = response.receive() + val flow = channel.readSseFlow() + flow.collect(block) + } +} + /* { "id":"string|number[optional, but desired]", @@ -31,14 +47,14 @@ import kotlin.coroutines.CoroutineContext } */ - /** * Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1) */ public class MagixClient( private val manager: DeviceManager, private val postUrl: Url, - private val inbox: Flow + private val sseUrl: Url + //private val inbox: Flow ): CoroutineScope { override val coroutineContext: CoroutineContext = manager.context.coroutineContext + Job(manager.context.coroutineContext[Job]) @@ -79,7 +95,9 @@ public class MagixClient( } private val respondJob = launch { - inbox.collect { json -> + client.readSse(sseUrl.toString()){ + val json = Json.parseToJsonElement(it.data) as JsonObject + val requestId = json["id"]?.jsonPrimitive?.content val payload = json["payload"]?.jsonObject //TODO analyze action diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt index abe1d8a..d8373d9 100644 --- a/demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt +++ b/demo/src/main/kotlin/hep/dataforge/control/demo/DemoDevice.kt @@ -50,7 +50,7 @@ class DemoDevice(context: Context) : DeviceBase(context) { } - val resetScale: Action by acting { + val resetScale: DeviceAction by acting { timeScaleValue = 5000.0 sinScaleValue = 1.0 cosScaleValue = 1.0 diff --git a/ktor-sse/src/commonMain/kotlin/ru/mipt/npm/io/sse/SseEvent.kt b/ktor-sse/src/commonMain/kotlin/ru/mipt/npm/io/sse/SseEvent.kt index 659e4d8..f5136ea 100644 --- a/ktor-sse/src/commonMain/kotlin/ru/mipt/npm/io/sse/SseEvent.kt +++ b/ktor-sse/src/commonMain/kotlin/ru/mipt/npm/io/sse/SseEvent.kt @@ -39,7 +39,7 @@ public suspend fun ByteReadChannel.readSseFlow(): Flow = channelFlow { do{ val line = readUTF8Line() - if (line != null && !line.isBlank()) { + if (line != null && line.isNotBlank()) { val key = line.substringBefore(":") val value = line.substringAfter(": ") when (key) { diff --git a/motors/build.gradle.kts b/motors/build.gradle.kts index 54d9d5d..059e388 100644 --- a/motors/build.gradle.kts +++ b/motors/build.gradle.kts @@ -13,4 +13,5 @@ val ktorVersion: String by rootProject.extra dependencies { implementation(project(":dataforge-device-core")) + implementation(project(":dataforge-magix-client")) } diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt index c3f5ad9..5505930 100644 --- a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt +++ b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt @@ -12,8 +12,10 @@ import hep.dataforge.control.ports.PortProxy import hep.dataforge.control.ports.send import hep.dataforge.control.ports.withDelimiter import hep.dataforge.meta.MetaItem +import hep.dataforge.meta.asMetaItem import hep.dataforge.names.NameToken import hep.dataforge.values.Null +import hep.dataforge.values.asValue import kotlinx.coroutines.* import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.takeWhile @@ -48,7 +50,8 @@ public class PiMotionMasterDevice( private val mutex = Mutex() private suspend fun dispatchError(errorCode: Int) { - + logger.error { "Error code: $errorCode" } + //TODO add error handling } private suspend fun sendCommandInternal(command: String, vararg arguments: String) { @@ -108,7 +111,7 @@ public class PiMotionMasterDevice( } - public val initialize: Action by acting { + public val initialize: DeviceAction by acting { send("INI") } @@ -116,7 +119,7 @@ public class PiMotionMasterDevice( request("VER?").first() } - public val stop: Action by acting( + public val stop: DeviceAction by acting( descriptorBuilder = { info = "Stop all axis" }, @@ -164,7 +167,7 @@ public class PiMotionMasterDevice( info = "Motor enable state." } - public val halt: Action by acting { + public val halt: DeviceAction by acting { send("HLT", axisId) } @@ -218,10 +221,14 @@ public class PiMotionMasterDevice( } } + val axisIds: ReadOnlyDeviceProperty by reading { + request("SAI?").map { it.asValue() }.asValue().asMetaItem() + } + override val devices: Map = axes.associate { NameToken(it) to Axis(it) } /** - * + * Name-friendly accessor for axis */ val axes: Map get() = devices.mapKeys { it.toString() } diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualDevice.kt b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualDevice.kt index 9673239..384609c 100644 --- a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualDevice.kt +++ b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualDevice.kt @@ -1,6 +1,7 @@ package ru.mipt.npm.devices.pimotionmaster import hep.dataforge.context.Context +import hep.dataforge.context.ContextAware import hep.dataforge.control.api.Socket import hep.dataforge.control.ports.AbstractPort import hep.dataforge.control.ports.withDelimiter @@ -66,7 +67,11 @@ class VirtualPort(private val device: VirtualDevice, context: Context) : Abstrac } -class PiMotionMasterVirtualDevice(scope: CoroutineScope, axisIds: List) : VirtualDevice(scope) { +class PiMotionMasterVirtualDevice( + override val context: Context, + axisIds: List, + scope: CoroutineScope = context, +) : VirtualDevice(scope), ContextAware { init { //add asynchronous send logic here @@ -131,13 +136,14 @@ class PiMotionMasterVirtualDevice(scope: CoroutineScope, axisIds: List) } private fun respondForAllAxis(axisIds: List, extract: VirtualAxisState.(index: String) -> Any) { - val selectedAxis = if (axisIds.isEmpty()|| axisIds[0] == "ALL") { + val selectedAxis = if (axisIds.isEmpty() || axisIds[0] == "ALL") { axisState.keys } else { axisIds } - val response = selectedAxis.joinToString(postfix = "\n", separator = " \n") { - axisState.getValue(it).extract(it).toString() + val response = selectedAxis.joinToString(separator = " \n") { + val state = axisState.getValue(it) + "$it=${state.extract(it)}" } respond(response) } @@ -145,14 +151,18 @@ class PiMotionMasterVirtualDevice(scope: CoroutineScope, axisIds: List) override suspend fun evaluateRequest(request: ByteArray) { assert(request.last() == '\n'.toByte()) val string = request.decodeToString().substringBefore("\n") + .dropWhile { it != '*' && it != '#' && it !in 'A'..'Z' } //filter junk symbols at the beginning of the line + + //logger.debug { "Received command: $string" } val parts = string.split(' ') val command = parts.firstOrNull() ?: error("Command not present") val axisIds: List = parts.drop(1) when (command) { - "XXX" -> {}//respond("WAT?") - "IDN?","*IDN?" -> respond("(c)2015 Physik Instrumente(PI) Karlsruhe, C-885.M1 TCP-IP Master,0,1.0.0.1") + "XXX" -> { + } + "IDN?", "*IDN?" -> respond("(c)2015 Physik Instrumente(PI) Karlsruhe, C-885.M1 TCP-IP Master,0,1.0.0.1") "VER?" -> respond(""" 2: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550039, 00.039 3: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550040, 00.039 @@ -212,12 +222,12 @@ class PiMotionMasterVirtualDevice(scope: CoroutineScope, axisIds: List) respond(errorCode.toString()) errorCode = 0 } - "SAI?" -> respondForAllAxis(axisIds) { it } - "CST?" -> respond(WAT) + "SAI?" -> respond(axisIds.joinToString(separator = " \n")) + "CST?" -> respondForAllAxis(axisIds) { "L-220.20SG" } "RON?" -> respondForAllAxis(axisIds) { referenceMode } "FRF?" -> respondForAllAxis(axisIds) { "1" } // WAT? "SVO?" -> respondForAllAxis(axisIds) { servoMode } - "MVO?" -> respondForAllAxis(axisIds) { targetPosition } + "MOV?" -> respondForAllAxis(axisIds) { targetPosition } "POS?" -> respondForAllAxis(axisIds) { position } "TMN?" -> respondForAllAxis(axisIds) { minPosition } "TMX?" -> respondForAllAxis(axisIds) { maxPosition } @@ -228,7 +238,10 @@ class PiMotionMasterVirtualDevice(scope: CoroutineScope, axisIds: List) val servoMode = parts.last() axisState[requestAxis]?.servoMode = servoMode.toInt() } - else -> errorCode = 2 // do not send anything. Ser error code + else -> { + logger.warn { "Unknown command: $command in message ${String(request)}" } + errorCode = 2 + } // do not send anything. Ser error code } } diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt index 5026131..27ac821 100644 --- a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt +++ b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt @@ -14,7 +14,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.flow.collect import java.net.InetSocketAddress -val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable -> +val exceptionHandler = CoroutineExceptionHandler { _, throwable -> throwable.printStackTrace() } diff --git a/settings.gradle.kts b/settings.gradle.kts index e0ca653..43c69c1 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -31,7 +31,7 @@ include( ":dataforge-device-core", ":dataforge-device-serial", ":dataforge-device-server", - ":dataforge-device-client", + ":dataforge-magix-client", ":demo", ":motors" )