diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt index 87ceb67..a805bbe 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt @@ -4,13 +4,9 @@ import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.launch -import kotlinx.io.ByteArrayOutput import kotlinx.io.Closeable import mu.KLogger @@ -20,7 +16,7 @@ abstract class Port(val scope: CoroutineScope) : Closeable { private val outgoing = Channel(100) private val incoming = Channel(Channel.CONFLATED) - val receiveChannel: ReceiveChannel get() = incoming + //val receiveChannel: ReceiveChannel get() = incoming /** * Internal method to synchronously send data @@ -56,9 +52,9 @@ abstract class Port(val scope: CoroutineScope) : Closeable { /** * Raw flow of incoming data chunks. The chunks are not guaranteed to be complete phrases. * In order to form phrases some condition should used on top of it. - * For example [delimitedInput] generates phrases with fixed delimiter. + * For example [delimitedIncoming] generates phrases with fixed delimiter. */ - fun input(): Flow { + fun incoming(): Flow { return incoming.receiveAsFlow() } @@ -72,33 +68,4 @@ abstract class Port(val scope: CoroutineScope) : Closeable { /** * Send UTF-8 encoded string */ -suspend fun Port.send(string: String) = send(string.encodeToByteArray()) - -fun Flow.withDelimiter(delimiter: ByteArray, expectedMessageSize: Int = 32): Flow = flow { - require(delimiter.isNotEmpty()) { "Delimiter must not be empty" } - - var output = ByteArrayOutput(expectedMessageSize) - var matcherPosition = 0 - - collect { chunk -> - chunk.forEach { byte -> - output.writeByte(byte) - //matching current symbol in delimiter - if (byte == delimiter[matcherPosition]) { - matcherPosition++ - if (matcherPosition == delimiter.size) { - //full match achieved, sending result - emit(output.toByteArray()) - output = ByteArrayOutput(expectedMessageSize) - matcherPosition = 0 - } - } else if (matcherPosition > 0) { - //Reset matcher since full match not achieved - matcherPosition = 0 - } - } - } -} - -fun Port.delimitedInput(delimiter: ByteArray, expectedMessageSize: Int = 32) = - input().withDelimiter(delimiter, expectedMessageSize) +suspend fun Port.send(string: String) = send(string.encodeToByteArray()) \ No newline at end of file diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/SynchronousPortHandler.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/SynchronousPortHandler.kt new file mode 100644 index 0000000..7d81bef --- /dev/null +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/SynchronousPortHandler.kt @@ -0,0 +1,34 @@ +package hep.dataforge.control.ports + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +/** + * A port handler for synchronous (request-response) communication with a port. Only one request could be active at a time (others are suspended. + * The handler does not guarantee exclusive access to the port so the user mush ensure that no other controller handles port at the moment. + * + */ +class SynchronousPortHandler(val port: Port) { + private val mutex = Mutex() + + /** + * Send a single message and wait for the flow of respond messages. + */ + suspend fun respond(data: ByteArray, transform: suspend Flow.() -> R): R { + return mutex.withLock { + port.send(data) + transform(port.incoming()) + } + } +} + +/** + * Send request and read incoming data blocks until the delimiter is encountered + */ +suspend fun SynchronousPortHandler.respondWithDelimiter(data: ByteArray, delimiter: ByteArray): ByteArray { + return respond(data) { + withDelimiter(delimiter).first() + } +} \ No newline at end of file diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/phrases.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/phrases.kt new file mode 100644 index 0000000..a268326 --- /dev/null +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/phrases.kt @@ -0,0 +1,38 @@ +package hep.dataforge.control.ports + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flow +import kotlinx.io.ByteArrayOutput + +fun Flow.withDelimiter(delimiter: ByteArray, expectedMessageSize: Int = 32): Flow = flow { + require(delimiter.isNotEmpty()) { "Delimiter must not be empty" } + + var output = ByteArrayOutput(expectedMessageSize) + var matcherPosition = 0 + + collect { chunk -> + chunk.forEach { byte -> + output.writeByte(byte) + //matching current symbol in delimiter + if (byte == delimiter[matcherPosition]) { + matcherPosition++ + if (matcherPosition == delimiter.size) { + //full match achieved, sending result + emit(output.toByteArray()) + output = ByteArrayOutput(expectedMessageSize) + matcherPosition = 0 + } + } else if (matcherPosition > 0) { + //Reset matcher since full match not achieved + matcherPosition = 0 + } + } + } +} + +/** + * A flow of delimited phrases + */ +fun Port.delimitedIncoming(delimiter: ByteArray, expectedMessageSize: Int = 32) = + incoming().withDelimiter(delimiter, expectedMessageSize) diff --git a/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt b/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt index 2ffb21a..36a60e0 100644 --- a/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt +++ b/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt @@ -58,7 +58,7 @@ class TcpPortTest { val port = TcpPort.open("localhost", 22188) val logJob = launch { - port.input().collect { + port.incoming().collect { println("Flow: ${it.decodeToString()}") } } @@ -83,7 +83,7 @@ class TcpPortTest { val port = KtorTcpPort.open("localhost", 22188) val logJob = launch { - port.input().collect { + port.incoming().collect { println("Flow: ${it.decodeToString()}") } }