From e4705b82395258fe17f532c958db81f56057caf0 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Mon, 21 Sep 2020 21:34:40 +0300 Subject: [PATCH] A lot of small changes. --- build.gradle.kts | 3 +- dataforge-device-client/build.gradle.kts | 8 +- .../hep/dataforge/control/client/sseClient.kt | 13 ++++ .../control/controllers/DeviceController.kt | 19 ++--- .../control/controllers/DeviceMessage.kt | 10 +++ .../hep/dataforge/control/ports/Port.kt | 8 +- .../dataforge/control/ports/KtorTcpPort.kt | 4 +- .../hep/dataforge/control/ports/TcpPort.kt | 4 +- .../dataforge/control/serial/SerialPort.kt | 4 +- dataforge-device-server/build.gradle.kts | 3 +- .../hep/dataforge/control/server/sse.kt | 41 ---------- .../dataforge/control/server/sseOnServer.kt | 26 +++++++ ktor-sse/build.gradle.kts | 28 +++++++ .../kotlin/ru/mipt/npm/io/sse/SseEvent.kt | 60 +++++++++++++++ .../kotlin/ru/mipt/npm/io/sse/SseTest.kt | 74 ++++++++++++++++++ .../pimotionmaster/PiMotionMasterDevice.kt | 30 +++++++- .../PiMotionMasterVirtualPort.kt | 45 +++++++++++ .../devices/pimotionmaster/piDebugServer.kt | 75 +++++++++++++++++++ settings.gradle.kts | 1 + 19 files changed, 382 insertions(+), 74 deletions(-) create mode 100644 dataforge-device-client/src/commonMain/kotlin/hep/dataforge/control/client/sseClient.kt delete mode 100644 dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/sse.kt create mode 100644 dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/sseOnServer.kt create mode 100644 ktor-sse/build.gradle.kts create mode 100644 ktor-sse/src/commonMain/kotlin/ru/mipt/npm/io/sse/SseEvent.kt create mode 100644 ktor-sse/src/jvmTest/kotlin/ru/mipt/npm/io/sse/SseTest.kt create mode 100644 motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualPort.kt create mode 100644 motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt diff --git a/build.gradle.kts b/build.gradle.kts index 22435c0..47117f3 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -3,7 +3,8 @@ plugins{ kotlin("js") version "1.4.0" apply false } -val dataforgeVersion by extra("0.1.9-dev-2") +val dataforgeVersion: String by extra("0.1.9-dev-2") +val ktorVersion: String by extra("1.4.0") allprojects { repositories { diff --git a/dataforge-device-client/build.gradle.kts b/dataforge-device-client/build.gradle.kts index 5404d11..3579034 100644 --- a/dataforge-device-client/build.gradle.kts +++ b/dataforge-device-client/build.gradle.kts @@ -3,7 +3,7 @@ plugins { id("ru.mipt.npm.publish") } -val ktorVersion: String by extra("1.4.0") +val ktorVersion: String by rootProject.extra kotlin { sourceSets { @@ -15,14 +15,12 @@ kotlin { } jvmMain { dependencies { - implementation("io.ktor:ktor-client-cio:$ktorVersion") + } } jsMain { dependencies { - implementation("io.ktor:ktor-client-js:$ktorVersion") - implementation(npm("text-encoding", "0.7.0")) - implementation(npm("abort-controller", "3.0.0")) + } } } diff --git a/dataforge-device-client/src/commonMain/kotlin/hep/dataforge/control/client/sseClient.kt b/dataforge-device-client/src/commonMain/kotlin/hep/dataforge/control/client/sseClient.kt new file mode 100644 index 0000000..0548aef --- /dev/null +++ b/dataforge-device-client/src/commonMain/kotlin/hep/dataforge/control/client/sseClient.kt @@ -0,0 +1,13 @@ +package hep.dataforge.control.client + +import io.ktor.client.HttpClient +import io.ktor.client.call.receive +import io.ktor.client.request.get +import io.ktor.client.statement.HttpResponse +import io.ktor.client.statement.HttpStatement +import io.ktor.utils.io.ByteReadChannel + +suspend fun HttpClient.sse(address: String) = get(address).execute { response: HttpResponse -> + // Response is not downloaded here. + val channel = response.receive() +} \ No newline at end of file diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt index eabe36b..99964cc 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt @@ -23,7 +23,7 @@ import kotlinx.io.Binary public class DeviceController( public val device: Device, public val deviceTarget: String, - public val scope: CoroutineScope = device.scope + public val scope: CoroutineScope = device.scope, ) : Responder, Consumer, DeviceListener { init { @@ -32,7 +32,8 @@ public class DeviceController( private val outputChannel = Channel(Channel.CONFLATED) - public suspend fun respondMessage(message: DeviceMessage): DeviceMessage = respondMessage(device, deviceTarget, message) + public suspend fun respondMessage(message: DeviceMessage): DeviceMessage = + respondMessage(device, deviceTarget, message) override suspend fun respond(request: Envelope): Envelope = respond(device, deviceTarget, request) @@ -87,16 +88,14 @@ public class DeviceController( return response.seal() } } catch (ex: Exception) { - DeviceMessage.fail { - comment = ex.message - }.wrap() + DeviceMessage.fail(cause = ex).wrap() } } internal suspend fun respondMessage( device: Device, deviceTarget: String, - request: DeviceMessage + request: DeviceMessage, ): DeviceMessage { return try { val result: List = when (val action = request.type) { @@ -157,9 +156,7 @@ public class DeviceController( data = result } } catch (ex: Exception) { - DeviceMessage.fail { - comment = ex.message - } + DeviceMessage.fail(request, cause = ex) } } } @@ -172,8 +169,6 @@ public suspend fun DeviceHub.respondMessage(request: DeviceMessage): DeviceMessa val device = this[targetName] ?: error("The device with name $targetName not found in $this") DeviceController.respondMessage(device, targetName.toString(), request) } catch (ex: Exception) { - DeviceMessage.fail { - comment = ex.message - } + DeviceMessage.fail(request, cause = ex) } } diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt index 3533110..6e26f05 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt @@ -47,12 +47,22 @@ public class DeviceMessage : Scheme() { public inline fun fail( request: DeviceMessage? = null, + cause: Throwable? = null, block: DeviceMessage.() -> Unit = {} ): DeviceMessage = DeviceMessage { target = request?.source status = RESPONSE_FAIL_STATUS + if(cause!=null){ + configure { + set("error.type", cause::class.simpleName) + set("error.message", cause.message) + //set("error.trace", ex.stackTraceToString()) + } + comment = cause.message + } }.apply(block) + override val descriptor: SerialDescriptor = MetaSerializer.descriptor override fun deserialize(decoder: Decoder): DeviceMessage { diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt index 5d23142..e51f4e5 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt @@ -18,9 +18,9 @@ public interface Port: Closeable, ContextAware { public typealias PortFactory = Factory -public abstract class AbstractPort(override val context: Context, parentContext: CoroutineContext = context.coroutineContext) : Port { +public abstract class AbstractPort(override val context: Context, coroutineContext: CoroutineContext = context.coroutineContext) : Port { - protected val scope: CoroutineScope = CoroutineScope(parentContext + SupervisorJob(parentContext[Job])) + protected val scope: CoroutineScope = CoroutineScope(coroutineContext + SupervisorJob(coroutineContext[Job])) private val outgoing = Channel(100) private val incoming = Channel(Channel.CONFLATED) @@ -41,7 +41,7 @@ public abstract class AbstractPort(override val context: Context, parentContext: */ protected fun receive(data: ByteArray) { scope.launch { - logger.debug { "RECEIVED: ${data.decodeToString()}" } + logger.debug { "[${this@AbstractPort}] RECEIVED: ${data.decodeToString()}" } incoming.send(data) } } @@ -50,7 +50,7 @@ public abstract class AbstractPort(override val context: Context, parentContext: for (data in outgoing) { try { write(data) - logger.debug { "SENT: ${data.decodeToString()}" } + logger.debug { "[${this@AbstractPort}] SENT: ${data.decodeToString()}" } } catch (ex: Exception) { if (ex is CancellationException) throw ex logger.error(ex) { "Error while writing data to the port" } diff --git a/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/KtorTcpPort.kt b/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/KtorTcpPort.kt index 90ab9da..de2ea85 100644 --- a/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/KtorTcpPort.kt +++ b/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/KtorTcpPort.kt @@ -23,8 +23,8 @@ public class KtorTcpPort internal constructor( context: Context, public val host: String, public val port: Int, - parentContext: CoroutineContext = context.coroutineContext, -) : AbstractPort(context, parentContext), AutoCloseable { + coroutineContext: CoroutineContext = context.coroutineContext, +) : AbstractPort(context, coroutineContext), AutoCloseable { override fun toString(): String = "port[tcp:$host:$port]" diff --git a/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt b/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt index 352b977..39bab89 100644 --- a/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt +++ b/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt @@ -23,8 +23,8 @@ public class TcpPort private constructor( context: Context, public val host: String, public val port: Int, - parentContext: CoroutineContext = context.coroutineContext, -) : AbstractPort(context, parentContext), AutoCloseable { + coroutineContext: CoroutineContext = context.coroutineContext, +) : AbstractPort(context, coroutineContext), AutoCloseable { override fun toString(): String = "port[tcp:$host:$port]" diff --git a/dataforge-device-serial/src/main/kotlin/hep/dataforge/control/serial/SerialPort.kt b/dataforge-device-serial/src/main/kotlin/hep/dataforge/control/serial/SerialPort.kt index 18b5e1f..2daf6ea 100644 --- a/dataforge-device-serial/src/main/kotlin/hep/dataforge/control/serial/SerialPort.kt +++ b/dataforge-device-serial/src/main/kotlin/hep/dataforge/control/serial/SerialPort.kt @@ -18,8 +18,8 @@ import jssc.SerialPort as JSSCPort public class SerialPort private constructor( context: Context, private val jssc: JSSCPort, - parentContext: CoroutineContext = context.coroutineContext, -) : AbstractPort(context, parentContext) { + coroutineContext: CoroutineContext = context.coroutineContext, +) : AbstractPort(context, coroutineContext) { override fun toString(): String = "port[${jssc.portName}]" diff --git a/dataforge-device-server/build.gradle.kts b/dataforge-device-server/build.gradle.kts index 8f7428e..3c1381a 100644 --- a/dataforge-device-server/build.gradle.kts +++ b/dataforge-device-server/build.gradle.kts @@ -8,9 +8,10 @@ kscience { } val dataforgeVersion: String by rootProject.extra -val ktorVersion: String by extra("1.4.0") +val ktorVersion: String by rootProject.extra dependencies{ + implementation(project(":ktor-sse")) implementation(project(":dataforge-device-core")) implementation("io.ktor:ktor-server-cio:$ktorVersion") implementation("io.ktor:ktor-websockets:$ktorVersion") diff --git a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/sse.kt b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/sse.kt deleted file mode 100644 index 95dffec..0000000 --- a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/sse.kt +++ /dev/null @@ -1,41 +0,0 @@ -package hep.dataforge.control.server - -import io.ktor.application.ApplicationCall -import io.ktor.http.CacheControl -import io.ktor.http.ContentType -import io.ktor.response.cacheControl -import io.ktor.response.respondTextWriter -import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.collect - -/** - * The data class representing a SSE Event that will be sent to the client. - */ -public data class SseEvent(val data: String, val event: String? = null, val id: String? = null) - -/** - * Method that responds an [ApplicationCall] by reading all the [SseEvent]s from the specified [events] [ReceiveChannel] - * and serializing them in a way that is compatible with the Server-Sent Events specification. - * - * You can read more about it here: https://www.html5rocks.com/en/tutorials/eventsource/basics/ - */ -@Suppress("BlockingMethodInNonBlockingContext") -public suspend fun ApplicationCall.respondSse(events: Flow) { - response.cacheControl(CacheControl.NoCache(null)) - respondTextWriter(contentType = ContentType.Text.EventStream) { - events.collect { event-> - if (event.id != null) { - write("id: ${event.id}\n") - } - if (event.event != null) { - write("event: ${event.event}\n") - } - for (dataLine in event.data.lines()) { - write("data: $dataLine\n") - } - write("\n") - flush() - } - } -} \ No newline at end of file diff --git a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/sseOnServer.kt b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/sseOnServer.kt new file mode 100644 index 0000000..c99dd92 --- /dev/null +++ b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/sseOnServer.kt @@ -0,0 +1,26 @@ +package hep.dataforge.control.server + +import io.ktor.application.ApplicationCall +import io.ktor.http.CacheControl +import io.ktor.http.ContentType +import io.ktor.response.cacheControl +import io.ktor.response.respondBytesWriter +import io.ktor.util.KtorExperimentalAPI +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.flow.Flow +import ru.mipt.npm.io.sse.SseEvent +import ru.mipt.npm.io.sse.writeSseFlow + +/** + * Method that responds an [ApplicationCall] by reading all the [SseEvent]s from the specified [events] [ReceiveChannel] + * and serializing them in a way that is compatible with the Server-Sent Events specification. + * + * You can read more about it here: https://www.html5rocks.com/en/tutorials/eventsource/basics/ + */ +@OptIn(KtorExperimentalAPI::class) +public suspend fun ApplicationCall.respondSse(events: Flow) { + response.cacheControl(CacheControl.NoCache(null)) + respondBytesWriter(contentType = ContentType.Text.EventStream) { + writeSseFlow(events) + } +} \ No newline at end of file diff --git a/ktor-sse/build.gradle.kts b/ktor-sse/build.gradle.kts new file mode 100644 index 0000000..2e5c1a6 --- /dev/null +++ b/ktor-sse/build.gradle.kts @@ -0,0 +1,28 @@ +plugins { + id("ru.mipt.npm.mpp") +} + +group = "ru.mipt.npm" + +val ktorVersion: String by rootProject.extra + +kscience{ + useCoroutines() +} + +kotlin { + sourceSets { + commonMain { + dependencies { + api("io.ktor:ktor-io:$ktorVersion") + } + } + jvmTest{ + dependencies{ + implementation("io.ktor:ktor-server-cio:$ktorVersion") + implementation("io.ktor:ktor-client-cio:$ktorVersion") + implementation("ch.qos.logback:logback-classic:1.2.3") + } + } + } +} \ No newline at end of file diff --git a/ktor-sse/src/commonMain/kotlin/ru/mipt/npm/io/sse/SseEvent.kt b/ktor-sse/src/commonMain/kotlin/ru/mipt/npm/io/sse/SseEvent.kt new file mode 100644 index 0000000..659e4d8 --- /dev/null +++ b/ktor-sse/src/commonMain/kotlin/ru/mipt/npm/io/sse/SseEvent.kt @@ -0,0 +1,60 @@ +package ru.mipt.npm.io.sse + +import io.ktor.utils.io.* +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.channelFlow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.isActive + + +/** + * The data class representing a SSE Event that will be sent to the client. + */ +public data class SseEvent(val data: String, val event: String? = null, val id: String? = null) + +public suspend fun ByteWriteChannel.writeSseFlow(events: Flow): Unit = events.collect { event -> + if (event.id != null) { + writeStringUtf8("id: ${event.id}\n") + } + if (event.event != null) { + writeStringUtf8("event: ${event.event}\n") + } + for (dataLine in event.data.lines()) { + writeStringUtf8("data: $dataLine\n") + } + writeStringUtf8("\n") + flush() +} + +@OptIn(ExperimentalCoroutinesApi::class) +public suspend fun ByteReadChannel.readSseFlow(): Flow = channelFlow { + while (isActive) { + //val lines = ArrayList() + val builder = StringBuilder() + var id: String? = null + var event: String? = null + //read lines until blank line or the end of stream + + do{ + val line = readUTF8Line() + if (line != null && !line.isBlank()) { + val key = line.substringBefore(":") + val value = line.substringAfter(": ") + when (key) { + "id" -> id = value + "event" -> event = value + "data" -> builder.append(value) + else -> error("Unrecognized event-stream key $key") + } + } + } while (line?.isBlank() != true) + if(builder.isNotBlank()) { + send(SseEvent(builder.toString(), event, id)) + } + } + awaitClose { + this@readSseFlow.cancel() + } +} diff --git a/ktor-sse/src/jvmTest/kotlin/ru/mipt/npm/io/sse/SseTest.kt b/ktor-sse/src/jvmTest/kotlin/ru/mipt/npm/io/sse/SseTest.kt new file mode 100644 index 0000000..5b8ef4d --- /dev/null +++ b/ktor-sse/src/jvmTest/kotlin/ru/mipt/npm/io/sse/SseTest.kt @@ -0,0 +1,74 @@ +package ru.mipt.npm.io.sse + +import io.ktor.application.ApplicationCall +import io.ktor.application.call +import io.ktor.client.HttpClient +import io.ktor.client.call.receive +import io.ktor.client.request.get +import io.ktor.client.statement.HttpResponse +import io.ktor.client.statement.HttpStatement +import io.ktor.http.CacheControl +import io.ktor.http.ContentType +import io.ktor.response.cacheControl +import io.ktor.response.respondBytesWriter +import io.ktor.routing.get +import io.ktor.routing.routing +import io.ktor.server.cio.CIO +import io.ktor.server.engine.embeddedServer +import io.ktor.util.KtorExperimentalAPI +import io.ktor.utils.io.ByteReadChannel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Test + +@OptIn(KtorExperimentalAPI::class) +suspend fun ApplicationCall.respondSse(events: Flow) { + response.cacheControl(CacheControl.NoCache(null)) + respondBytesWriter(contentType = ContentType.Text.EventStream) { + writeSseFlow(events) + } +} + +suspend fun HttpClient.readSse(address: String, block: suspend (SseEvent) -> Unit): Unit = + get(address).execute { response: HttpResponse -> + // Response is not downloaded here. + val channel = response.receive() + val flow = channel.readSseFlow() + flow.collect(block) + } + +class SseTest { + @OptIn(KtorExperimentalAPI::class) + @Test + fun testSseIntegration() { + runBlocking { + val server = embeddedServer(CIO, 12080) { + routing { + get("/") { + val flow = flow { + repeat(5) { + delay(300) + emit(it) + } + }.map { + SseEvent(data = it.toString(), id = it.toString()) + } + call.respondSse(flow) + } + } + } + server.start(wait = false) + delay(1000) + val client = HttpClient(io.ktor.client.engine.cio.CIO) + client.readSse("http://localhost:12080") { + println(it) + } + client.close() + server.stop(1000, 1000) + } + } +} \ No newline at end of file diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt index a9418e1..9992d7a 100644 --- a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt +++ b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt @@ -48,6 +48,10 @@ public class PiMotionMasterDevice( private val mutex = Mutex() + private suspend fun dispatchError(errorCode: Int){ + + } + private suspend fun sendCommandInternal(command: String, vararg arguments: String) { val joinedArguments = if (arguments.isEmpty()) { "" @@ -58,14 +62,31 @@ public class PiMotionMasterDevice( connector.send(stringToSend) } + public suspend fun getErrorCode(): Int = mutex.withLock{ + withTimeout(timeoutValue) { + sendCommandInternal("ERR?") + val errorString = connector.receiving().withDelimiter("\n").first() + errorString.toInt() + } + } + + /** * Send a synchronous request and receive a list of lines as a response */ private suspend fun request(command: String, vararg arguments: String): List = mutex.withLock { - withTimeout(timeoutValue) { - sendCommandInternal(command, *arguments) - val phrases = connector.receiving().withDelimiter("\n") - phrases.takeWhile { it.endsWith(" \n") }.toList() + phrases.first() + try { + withTimeout(timeoutValue) { + sendCommandInternal(command, *arguments) + val phrases = connector.receiving().withDelimiter("\n") + phrases.takeWhile { it.endsWith(" \n") }.toList() + phrases.first() + } + } catch (ex: Throwable){ + logger.warn { "Error during PIMotionMaster request. Requesting error code." } + val errorCode = getErrorCode() + dispatchError(errorCode) + logger.warn { "Error code $errorCode" } + error("Error code $errorCode") } } @@ -87,6 +108,7 @@ public class PiMotionMasterDevice( } } + public val initialize: Action by acting { send("INI") } diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualPort.kt b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualPort.kt new file mode 100644 index 0000000..7b13330 --- /dev/null +++ b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualPort.kt @@ -0,0 +1,45 @@ +package ru.mipt.npm.devices.pimotionmaster + +import hep.dataforge.context.Context +import hep.dataforge.control.ports.AbstractPort + +abstract class VirtualPort(context: Context) : AbstractPort(context) + + +class PiMotionMasterVirtualPort(context: Context) : VirtualPort(context) { + + init { + //add asynchronous send logic here + } + + private val axisID = "0" + + private var errorCode: Int = 0 + private var velocity: Float = 1.0f + private var position: Float = 0.0f + private var servoMode: Int = 1 + private var targetPosition: Float = 0.0f + + + private fun receive(str: String) = receive((str + "\n").toByteArray()) + + override suspend fun write(data: ByteArray) { + assert(data.last() == '\n'.toByte()) + val string = data.decodeToString().substringBefore("\n") + val parts = string.split(' ') + val command = parts.firstOrNull() ?: error("Command not present") + when (command) { + "XXX" -> receive("WAT?") + "VER?" -> receive("test") + "ERR?" -> receive(errorCode.toString()) + "SVO?" -> receive("$axisID=$servoMode") + "SVO" ->{ + val requestAxis = parts[1] + if(requestAxis == axisID) { + servoMode = parts[2].toInt() + } + } + else -> errorCode = 2 // do not send anything. Ser error code + } + } +} \ No newline at end of file diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt new file mode 100644 index 0000000..fd9fe2f --- /dev/null +++ b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt @@ -0,0 +1,75 @@ +package ru.mipt.npm.devices.pimotionmaster + +import hep.dataforge.context.Global +import hep.dataforge.control.ports.Port +import io.ktor.network.selector.ActorSelectorManager +import io.ktor.network.sockets.aSocket +import io.ktor.network.sockets.openReadChannel +import io.ktor.network.sockets.openWriteChannel +import io.ktor.util.InternalAPI +import io.ktor.util.KtorExperimentalAPI +import io.ktor.util.moveToByteArray +import io.ktor.utils.io.readUntilDelimiter +import io.ktor.utils.io.writeAvailable +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.collect +import java.net.InetSocketAddress +import java.nio.ByteBuffer + +private val delimeter = ByteBuffer.wrap("\n".encodeToByteArray()) + +@OptIn(KtorExperimentalAPI::class, InternalAPI::class) +fun CoroutineScope.launchPiDebugServer(port: Int, virtualPort: Port): Job = launch { + val server = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().bind(InetSocketAddress("localhost", port)) + println("Started virtual port server at ${server.localAddress}") + + while (isActive) { + val socket = try { + server.accept() + } catch (ex: Exception) { + server.close() + return@launch + } + + launch { + println("Socket accepted: ${socket.remoteAddress}") + + try { + val input = socket.openReadChannel() + val output = socket.openWriteChannel(autoFlush = true) + + val buffer = ByteBuffer.allocate(1024) + launch { + virtualPort.receiving().collect { + println("Sending: ${it.decodeToString()}") + output.writeAvailable(it) + } + } + while (isActive) { + buffer.rewind() + val read = input.readUntilDelimiter(delimeter, buffer) + if (read > 0) { + buffer.flip() + val array = buffer.moveToByteArray() + println("Received: ${array.decodeToString()}") + virtualPort.send(array) + } + } + } catch (ex: Exception) { + cancel() + } finally { + socket.close() + } + } + } +} + +fun main() { + val port = 10024 + val virtualPort = PiMotionMasterVirtualPort(Global) + runBlocking(Dispatchers.Default) { + val serverJob = launchPiDebugServer(port, virtualPort) + readLine() + serverJob.cancel() + } +} \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index 62a0213..ed10e46 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -26,6 +26,7 @@ pluginManagement { rootProject.name = "dataforge-control" include( + ":ktor-sse", ":dataforge-device-core", ":dataforge-device-serial", ":dataforge-device-server",