diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt index c67a319..082b904 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt @@ -11,7 +11,7 @@ import kotlinx.serialization.encoding.Encoder public class DeviceMessage : Scheme() { public var action: String by string { error("Action not defined") } - public var status: String by string(default = RESPONSE_OK_STATUS) + public var status: String by string(default = OK_STATUS) public var sourceName: String? by string() public var targetName: String? by string() public var comment: String? by string() @@ -25,9 +25,9 @@ public class DeviceMessage : Scheme() { public val MESSAGE_KEY_KEY: Name = DeviceMessage::key.name.asName() public val MESSAGE_VALUE_KEY: Name = DeviceMessage::value.name.asName() - public const val RESPONSE_OK_STATUS: String = "response.OK" - public const val RESPONSE_FAIL_STATUS: String = "response.FAIL" - public const val PROPERTY_CHANGED_ACTION: String = "event.propertyChange" + public const val OK_STATUS: String = "OK" + public const val FAIL_STATUS: String = "FAIL" + public const val PROPERTY_CHANGED_ACTION: String = "event.propertyChanged" public inline fun ok( request: DeviceMessage? = null, @@ -42,7 +42,7 @@ public class DeviceMessage : Scheme() { block: DeviceMessage.() -> Unit = {}, ): DeviceMessage = DeviceMessage { targetName = request?.sourceName - status = RESPONSE_FAIL_STATUS + status = FAIL_STATUS if (cause != null) { configure { set("error.type", cause::class.simpleName) diff --git a/motors/build.gradle.kts b/motors/build.gradle.kts index 23410d9..54d9d5d 100644 --- a/motors/build.gradle.kts +++ b/motors/build.gradle.kts @@ -9,6 +9,8 @@ kotlin{ explicitApi = null } +val ktorVersion: String by rootProject.extra + dependencies { implementation(project(":dataforge-device-core")) } diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualDevice.kt b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualDevice.kt index 6cd10d4..9673239 100644 --- a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualDevice.kt +++ b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualDevice.kt @@ -19,7 +19,7 @@ abstract class VirtualDevice(val scope: CoroutineScope) : Socket { private val toReceive = Channel(100) private val toRespond = Channel(100) - private val receiveJob: Job = toReceive.consumeAsFlow().onEach { + private val receiveJob: Job = toReceive.consumeAsFlow().transformRequests().onEach { evaluateRequest(it) }.catch { it.printStackTrace() @@ -48,7 +48,9 @@ abstract class VirtualDevice(val scope: CoroutineScope) : Socket { class VirtualPort(private val device: VirtualDevice, context: Context) : AbstractPort(context) { - private val respondJob = device.receiving().onEach(::receive).catch { + private val respondJob = device.receiving().onEach { + receive(it) + }.catch { it.printStackTrace() }.launchIn(scope) @@ -129,7 +131,7 @@ class PiMotionMasterVirtualDevice(scope: CoroutineScope, axisIds: List) } private fun respondForAllAxis(axisIds: List, extract: VirtualAxisState.(index: String) -> Any) { - val selectedAxis = if (axisIds.isEmpty()) { + val selectedAxis = if (axisIds.isEmpty()|| axisIds[0] == "ALL") { axisState.keys } else { axisIds @@ -149,9 +151,23 @@ class PiMotionMasterVirtualDevice(scope: CoroutineScope, axisIds: List) val axisIds: List = parts.drop(1) when (command) { - "XXX" -> respond("") - "IDN?" -> respond("DataForge-device demo") - "VER?" -> respond("test") + "XXX" -> {}//respond("WAT?") + "IDN?","*IDN?" -> respond("(c)2015 Physik Instrumente(PI) Karlsruhe, C-885.M1 TCP-IP Master,0,1.0.0.1") + "VER?" -> respond(""" + 2: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550039, 00.039 + 3: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550040, 00.039 + 4: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550041, 00.039 + 5: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550042, 00.039 + 6: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550043, 00.039 + 7: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550044, 00.039 + 8: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550046, 00.039 + 9: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550045, 00.039 + 10: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550047, 00.039 + 11: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550048, 00.039 + 12: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550049, 00.039 + 13: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550051, 00.039 + FW_ARM: V1.0.0.1 + """.trimIndent()) "HLP?" -> respond(""" The following commands are valid: #4 Request Status Register @@ -192,7 +208,10 @@ class PiMotionMasterVirtualDevice(scope: CoroutineScope, axisIds: List) VER? Get Versions Of Firmware And Drivers end of help """.trimIndent()) - "ERR?" -> respond(errorCode.toString()) + "ERR?" -> { + respond(errorCode.toString()) + errorCode = 0 + } "SAI?" -> respondForAllAxis(axisIds) { it } "CST?" -> respond(WAT) "RON?" -> respondForAllAxis(axisIds) { referenceMode } diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt index abf5206..5026131 100644 --- a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt +++ b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt @@ -9,55 +9,52 @@ import io.ktor.network.sockets.openWriteChannel import io.ktor.util.InternalAPI import io.ktor.util.KtorExperimentalAPI import io.ktor.util.moveToByteArray -import io.ktor.utils.io.readUntilDelimiter import io.ktor.utils.io.writeAvailable import kotlinx.coroutines.* import kotlinx.coroutines.flow.collect import java.net.InetSocketAddress -import java.nio.ByteBuffer -private val delimeter = ByteBuffer.wrap("\n".encodeToByteArray()) +val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable -> + throwable.printStackTrace() +} @OptIn(KtorExperimentalAPI::class, InternalAPI::class) -fun CoroutineScope.launchPiDebugServer(port: Int, virtualPort: Port): Job = launch { +fun CoroutineScope.launchPiDebugServer(port: Int, virtualPort: Port): Job = launch(exceptionHandler) { val server = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().bind(InetSocketAddress("localhost", port)) println("Started virtual port server at ${server.localAddress}") while (isActive) { - val socket = try { - server.accept() - } catch (ex: Exception) { - server.close() - ex.printStackTrace() - return@launch - } + val socket = server.accept() + launch(SupervisorJob(coroutineContext[Job])) { + println("Socket accepted: ${socket.remoteAddress}") + val input = socket.openReadChannel() + val output = socket.openWriteChannel() - - println("Socket accepted: ${socket.remoteAddress}") - supervisorScope { - socket.use { socket -> - val input = socket.openReadChannel() - val output = socket.openWriteChannel(autoFlush = true) - - val buffer = ByteBuffer.allocate(1024) - launch { - virtualPort.receiving().collect { - //println("Sending: ${it.decodeToString()}") - output.writeAvailable(it) - output.flush() - } - } - while (isActive) { - buffer.rewind() - val read = input.readUntilDelimiter(delimeter, buffer) - if (read > 0) { - buffer.flip() - val array = buffer.moveToByteArray() - //println("Received: ${array.decodeToString()}") - virtualPort.send(array) - } + val sendJob = launch { + virtualPort.receiving().collect { + //println("Sending: ${it.decodeToString()}") + output.writeAvailable(it) + output.flush() } } + + try { + while (isActive) { + input.read { buffer -> + val array = buffer.moveToByteArray() + launch { + virtualPort.send(array) + } + } + } + } catch (e: Throwable) { + e.printStackTrace() + sendJob.cancel() + socket.close() + } finally { + println("Socket closed") + } + } } }