From 2f8584829fbaa6669778fa3d0b61964f99e53b9f Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sun, 2 Aug 2020 22:39:11 +0300 Subject: [PATCH] Ports --- dataforge-device-core/build.gradle.kts | 11 +++ .../hep/dataforge/control/ports/Port.kt | 92 ++++++++++++++++++ .../hep/dataforge/control/ports/TcpPort.kt | 96 +++++++++++++++++++ .../hep/dataforge/control/ports/PortIOTest.kt | 24 +++++ settings.gradle.kts | 2 +- 5 files changed, 224 insertions(+), 1 deletion(-) create mode 100644 dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt create mode 100644 dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt create mode 100644 dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/PortIOTest.kt diff --git a/dataforge-device-core/build.gradle.kts b/dataforge-device-core/build.gradle.kts index a809fd7..2d4930a 100644 --- a/dataforge-device-core/build.gradle.kts +++ b/dataforge-device-core/build.gradle.kts @@ -17,6 +17,17 @@ kotlin { dependencies { api("hep.dataforge:dataforge-io:$dataforgeVersion") //implementation("org.jetbrains.kotlinx:atomicfu-common:0.14.3") + //api("io.github.microutils:kotlin-logging-common:1.8.3") + } + } + jvmMain{ + dependencies{ + //api("io.github.microutils:kotlin-logging:1.8.3") + } + } + jsMain{ + dependencies{ + //api("io.github.microutils:kotlin-logging-js:1.8.3") } } } diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt new file mode 100644 index 0000000..d0cf203 --- /dev/null +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt @@ -0,0 +1,92 @@ +package hep.dataforge.control.ports + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.receiveAsFlow +import kotlinx.coroutines.launch +import kotlinx.io.ByteArrayOutput +import kotlinx.io.Closeable +import mu.KLogger + +abstract class Port : Closeable, CoroutineScope { + + abstract val logger: KLogger + + private val outgoing = Channel(100) + private val incoming = Channel(Channel.CONFLATED) + val receiveChannel: ReceiveChannel get() = incoming + + /** + * Internal method to synchronously send data + */ + protected abstract fun sendInternal(data: ByteArray) + + /** + * Internal method to receive data synchronously + */ + protected fun receive(data: ByteArray) { + launch { + incoming.send(data) + } + } + + private val sendJob = launch { + //using special dispatcher to avoid threading problems + for (data in outgoing) { + try { + sendInternal(data) + logger.debug { "SEND: ${data.decodeToString()}" } + } catch (ex: Exception) { + logger.error(ex) { "Error while sending data" } + } + } + } + + suspend fun send(data: ByteArray) { + outgoing.send(data) + } + + fun flow(): Flow { + return incoming.receiveAsFlow() + } + + override fun close() { + cancel("The port is closed") + } +} + +/** + * Send UTF-8 encoded string + */ +suspend fun Port.send(string: String) = send(string.encodeToByteArray()) + +fun Flow.withDelimiter(delimiter: ByteArray, expectedMessageSize: Int = 32): Flow = flow { + require(delimiter.isNotEmpty()) { "Delimiter must not be empty" } + + var output = ByteArrayOutput(expectedMessageSize) + var matcherPosition = 0 + + collect { chunk -> + chunk.forEach { byte -> + output.writeByte(byte) + //matching current symbol in delimiter + if (byte == delimiter[matcherPosition]) { + matcherPosition++ + if (matcherPosition == delimiter.size) { + //full match achieved, sending result + emit(output.toByteArray()) + output = ByteArrayOutput(expectedMessageSize) + matcherPosition = 0 + } + } else if (matcherPosition > 0) { + //Reset matcher since full match not achieved + matcherPosition = 0 + } + } + } +} diff --git a/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt b/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt new file mode 100644 index 0000000..81684cc --- /dev/null +++ b/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt @@ -0,0 +1,96 @@ +package hep.dataforge.control.ports + +import kotlinx.coroutines.* +import mu.KLogger +import mu.KotlinLogging +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.channels.AsynchronousCloseException +import java.nio.channels.AsynchronousSocketChannel +import java.nio.channels.CompletionHandler +import java.util.concurrent.Executors +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException + +@Suppress("UNCHECKED_CAST") +private fun asyncIOHandler(): CompletionHandler> = + object : CompletionHandler> { + override fun completed(result: T, cont: CancellableContinuation) { + cont.resume(result) + } + + override fun failed(ex: Throwable, cont: CancellableContinuation) { + // just return if already cancelled and got an expected exception for that case + if (ex is AsynchronousCloseException && cont.isCancelled) return + cont.resumeWithException(ex) + } + } + +suspend fun AsynchronousSocketChannel.readSuspended( + buf: ByteBuffer +) = suspendCancellableCoroutine { cont -> + read(buf, cont, asyncIOHandler()) + cont.invokeOnCancellation { + try { + close() + } catch (ex: Throwable) { + // Specification says that it is Ok to call it any time, but reality is different, + // so we have just to ignore exception + } + } +} + + +private fun ByteBuffer.toArray(limit: Int = limit()): ByteArray{ + rewind() + val response = ByteArray(limit) + get(response) + rewind() + return response +} + + +class TcpPort( + parentScope: CoroutineScope, + val ip: String, + val port: Int +) : Port() { + + override val logger: KLogger = KotlinLogging.logger("[tcp]$ip:$port") + + private val executor = Executors.newSingleThreadExecutor { r -> + Thread(r).apply { + name = "[tcp]$ip:$port" + priority = Thread.MAX_PRIORITY + } + } + override val coroutineContext: CoroutineContext = parentScope.coroutineContext + executor.asCoroutineDispatcher() + + private var socket: AsynchronousSocketChannel = openSocket() + + private fun openSocket()= AsynchronousSocketChannel.open().bind(InetSocketAddress(ip, port)) + + private val listenerJob = launch { + val buffer = ByteBuffer.allocate(1024) + while (isActive) { + try { + val num = socket.readSuspended(buffer) + if (num > 0) { + receive(buffer.toArray(num)) + } + } catch (ex: Exception) { + logger.error("Channel read error", ex) + delay(100) + logger.info("Reconnecting") + socket = openSocket() + } + } + } + + override fun sendInternal(data: ByteArray) { + if (!socket.isOpen) socket = openSocket() + socket.write(ByteBuffer.wrap(data)) + } + +} \ No newline at end of file diff --git a/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/PortIOTest.kt b/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/PortIOTest.kt new file mode 100644 index 0000000..94a0e68 --- /dev/null +++ b/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/PortIOTest.kt @@ -0,0 +1,24 @@ +package hep.dataforge.control.ports + +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Test +import kotlin.test.assertEquals + + +internal class PortIOTest{ + + @Test + fun testDelimiteredByteArrayFlow(){ + val flow = flowOf("bb?b","ddd?",":defgb?:ddf","34fb?:--").map { it.encodeToByteArray() } + val chunked = flow.withDelimiter("?:".encodeToByteArray()) + runBlocking { + val result = chunked.toList() + assertEquals("bb?bddd?:",result[0].decodeToString()) + assertEquals("defgb?:", result[1].decodeToString()) + assertEquals("ddf34fb?:", result[2].decodeToString()) + } + } +} \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index 2cc6c04..7233629 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -1,6 +1,6 @@ pluginManagement { val kotlinVersion = "1.3.72" - val toolsVersion = "0.5.0" + val toolsVersion = "0.5.2" repositories { mavenLocal()