SynchronousPortHandler

This commit is contained in:
Alexander Nozik 2020-08-27 21:04:11 +03:00
parent fed5d55512
commit 08982dcd3e
4 changed files with 78 additions and 39 deletions

View File

@ -4,13 +4,9 @@ import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.io.ByteArrayOutput
import kotlinx.io.Closeable import kotlinx.io.Closeable
import mu.KLogger import mu.KLogger
@ -20,7 +16,7 @@ abstract class Port(val scope: CoroutineScope) : Closeable {
private val outgoing = Channel<ByteArray>(100) private val outgoing = Channel<ByteArray>(100)
private val incoming = Channel<ByteArray>(Channel.CONFLATED) private val incoming = Channel<ByteArray>(Channel.CONFLATED)
val receiveChannel: ReceiveChannel<ByteArray> get() = incoming //val receiveChannel: ReceiveChannel<ByteArray> get() = incoming
/** /**
* Internal method to synchronously send data * Internal method to synchronously send data
@ -56,9 +52,9 @@ abstract class Port(val scope: CoroutineScope) : Closeable {
/** /**
* Raw flow of incoming data chunks. The chunks are not guaranteed to be complete phrases. * Raw flow of incoming data chunks. The chunks are not guaranteed to be complete phrases.
* In order to form phrases some condition should used on top of it. * In order to form phrases some condition should used on top of it.
* For example [delimitedInput] generates phrases with fixed delimiter. * For example [delimitedIncoming] generates phrases with fixed delimiter.
*/ */
fun input(): Flow<ByteArray> { fun incoming(): Flow<ByteArray> {
return incoming.receiveAsFlow() return incoming.receiveAsFlow()
} }
@ -73,32 +69,3 @@ abstract class Port(val scope: CoroutineScope) : Closeable {
* Send UTF-8 encoded string * Send UTF-8 encoded string
*/ */
suspend fun Port.send(string: String) = send(string.encodeToByteArray()) suspend fun Port.send(string: String) = send(string.encodeToByteArray())
fun Flow<ByteArray>.withDelimiter(delimiter: ByteArray, expectedMessageSize: Int = 32): Flow<ByteArray> = flow {
require(delimiter.isNotEmpty()) { "Delimiter must not be empty" }
var output = ByteArrayOutput(expectedMessageSize)
var matcherPosition = 0
collect { chunk ->
chunk.forEach { byte ->
output.writeByte(byte)
//matching current symbol in delimiter
if (byte == delimiter[matcherPosition]) {
matcherPosition++
if (matcherPosition == delimiter.size) {
//full match achieved, sending result
emit(output.toByteArray())
output = ByteArrayOutput(expectedMessageSize)
matcherPosition = 0
}
} else if (matcherPosition > 0) {
//Reset matcher since full match not achieved
matcherPosition = 0
}
}
}
}
fun Port.delimitedInput(delimiter: ByteArray, expectedMessageSize: Int = 32) =
input().withDelimiter(delimiter, expectedMessageSize)

View File

@ -0,0 +1,34 @@
package hep.dataforge.control.ports
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
/**
* A port handler for synchronous (request-response) communication with a port. Only one request could be active at a time (others are suspended.
* The handler does not guarantee exclusive access to the port so the user mush ensure that no other controller handles port at the moment.
*
*/
class SynchronousPortHandler(val port: Port) {
private val mutex = Mutex()
/**
* Send a single message and wait for the flow of respond messages.
*/
suspend fun <R> respond(data: ByteArray, transform: suspend Flow<ByteArray>.() -> R): R {
return mutex.withLock {
port.send(data)
transform(port.incoming())
}
}
}
/**
* Send request and read incoming data blocks until the delimiter is encountered
*/
suspend fun SynchronousPortHandler.respondWithDelimiter(data: ByteArray, delimiter: ByteArray): ByteArray {
return respond(data) {
withDelimiter(delimiter).first()
}
}

View File

@ -0,0 +1,38 @@
package hep.dataforge.control.ports
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.io.ByteArrayOutput
fun Flow<ByteArray>.withDelimiter(delimiter: ByteArray, expectedMessageSize: Int = 32): Flow<ByteArray> = flow {
require(delimiter.isNotEmpty()) { "Delimiter must not be empty" }
var output = ByteArrayOutput(expectedMessageSize)
var matcherPosition = 0
collect { chunk ->
chunk.forEach { byte ->
output.writeByte(byte)
//matching current symbol in delimiter
if (byte == delimiter[matcherPosition]) {
matcherPosition++
if (matcherPosition == delimiter.size) {
//full match achieved, sending result
emit(output.toByteArray())
output = ByteArrayOutput(expectedMessageSize)
matcherPosition = 0
}
} else if (matcherPosition > 0) {
//Reset matcher since full match not achieved
matcherPosition = 0
}
}
}
}
/**
* A flow of delimited phrases
*/
fun Port.delimitedIncoming(delimiter: ByteArray, expectedMessageSize: Int = 32) =
incoming().withDelimiter(delimiter, expectedMessageSize)

View File

@ -58,7 +58,7 @@ class TcpPortTest {
val port = TcpPort.open("localhost", 22188) val port = TcpPort.open("localhost", 22188)
val logJob = launch { val logJob = launch {
port.input().collect { port.incoming().collect {
println("Flow: ${it.decodeToString()}") println("Flow: ${it.decodeToString()}")
} }
} }
@ -83,7 +83,7 @@ class TcpPortTest {
val port = KtorTcpPort.open("localhost", 22188) val port = KtorTcpPort.open("localhost", 22188)
val logJob = launch { val logJob = launch {
port.input().collect { port.incoming().collect {
println("Flow: ${it.decodeToString()}") println("Flow: ${it.decodeToString()}")
} }
} }