From 53ffe49875d8d9b5e1aedf5eaafe566f9410d760 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Tue, 4 Aug 2020 16:13:53 +0300 Subject: [PATCH] Ktor-io TCP implementation --- dataforge-device-core/build.gradle.kts | 4 +- .../hep/dataforge/control/ports/Port.kt | 19 +- .../hep/dataforge/control/ports/TcpPort.kt | 122 +++++------- .../kotlin/hep/dataforge/control/ports/nio.kt | 174 ++++++++++++++++++ .../hep/dataforge/control/ports/PortIOTest.kt | 1 + .../dataforge/control/ports/TcpPortTest.kt | 73 ++++++++ demo/build.gradle.kts | 2 +- settings.gradle.kts | 2 +- 8 files changed, 305 insertions(+), 92 deletions(-) create mode 100644 dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/nio.kt create mode 100644 dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt diff --git a/dataforge-device-core/build.gradle.kts b/dataforge-device-core/build.gradle.kts index 2d4930a..af4a5ab 100644 --- a/dataforge-device-core/build.gradle.kts +++ b/dataforge-device-core/build.gradle.kts @@ -17,17 +17,15 @@ kotlin { dependencies { api("hep.dataforge:dataforge-io:$dataforgeVersion") //implementation("org.jetbrains.kotlinx:atomicfu-common:0.14.3") - //api("io.github.microutils:kotlin-logging-common:1.8.3") } } jvmMain{ dependencies{ - //api("io.github.microutils:kotlin-logging:1.8.3") + api("io.ktor:ktor-network:1.3.2") } } jsMain{ dependencies{ - //api("io.github.microutils:kotlin-logging-js:1.8.3") } } } diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt index d0cf203..ebb6286 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt @@ -1,5 +1,6 @@ package hep.dataforge.control.ports +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel @@ -13,7 +14,7 @@ import kotlinx.io.ByteArrayOutput import kotlinx.io.Closeable import mu.KLogger -abstract class Port : Closeable, CoroutineScope { +abstract class Port(val scope: CoroutineScope) : Closeable { abstract val logger: KLogger @@ -24,25 +25,27 @@ abstract class Port : Closeable, CoroutineScope { /** * Internal method to synchronously send data */ - protected abstract fun sendInternal(data: ByteArray) + protected abstract suspend fun write(data: ByteArray) /** * Internal method to receive data synchronously */ protected fun receive(data: ByteArray) { - launch { + scope.launch { + logger.debug { "RECEIVE: ${data.decodeToString()}" } incoming.send(data) } } - private val sendJob = launch { - //using special dispatcher to avoid threading problems + private val sendJob = scope.launch { + //The port scope should be organized in order to avoid threading problems for (data in outgoing) { try { - sendInternal(data) + write(data) logger.debug { "SEND: ${data.decodeToString()}" } } catch (ex: Exception) { - logger.error(ex) { "Error while sending data" } + if(ex is CancellationException) throw ex + logger.error(ex) { "Error while writing data to the port" } } } } @@ -56,7 +59,7 @@ abstract class Port : Closeable, CoroutineScope { } override fun close() { - cancel("The port is closed") + scope.cancel("The port is closed") } } diff --git a/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt b/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt index 81684cc..6573d2a 100644 --- a/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt +++ b/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt @@ -1,96 +1,60 @@ package hep.dataforge.control.ports +import io.ktor.network.selector.ActorSelectorManager +import io.ktor.network.sockets.aSocket +import io.ktor.network.sockets.openReadChannel +import io.ktor.network.sockets.openWriteChannel +import io.ktor.utils.io.consumeEachBufferRange +import io.ktor.utils.io.writeAvailable import kotlinx.coroutines.* import mu.KLogger import mu.KotlinLogging import java.net.InetSocketAddress -import java.nio.ByteBuffer -import java.nio.channels.AsynchronousCloseException -import java.nio.channels.AsynchronousSocketChannel -import java.nio.channels.CompletionHandler import java.util.concurrent.Executors -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException -@Suppress("UNCHECKED_CAST") -private fun asyncIOHandler(): CompletionHandler> = - object : CompletionHandler> { - override fun completed(result: T, cont: CancellableContinuation) { - cont.resume(result) - } - - override fun failed(ex: Throwable, cont: CancellableContinuation) { - // just return if already cancelled and got an expected exception for that case - if (ex is AsynchronousCloseException && cont.isCancelled) return - cont.resumeWithException(ex) - } - } - -suspend fun AsynchronousSocketChannel.readSuspended( - buf: ByteBuffer -) = suspendCancellableCoroutine { cont -> - read(buf, cont, asyncIOHandler()) - cont.invokeOnCancellation { - try { - close() - } catch (ex: Throwable) { - // Specification says that it is Ok to call it any time, but reality is different, - // so we have just to ignore exception - } - } -} - - -private fun ByteBuffer.toArray(limit: Int = limit()): ByteArray{ - rewind() - val response = ByteArray(limit) - get(response) - rewind() - return response -} - - -class TcpPort( - parentScope: CoroutineScope, - val ip: String, +class TcpPort internal constructor( + scope: CoroutineScope, + val host: String, val port: Int -) : Port() { +) : Port(scope), AutoCloseable { - override val logger: KLogger = KotlinLogging.logger("[tcp]$ip:$port") + override val logger: KLogger = KotlinLogging.logger("port[tcp:$host:$port]") - private val executor = Executors.newSingleThreadExecutor { r -> + private val socket = scope.async { + aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(InetSocketAddress(host, port)) + } + + private val writeChannel = scope.async { + socket.await().openWriteChannel(true) + } + + private val listenerJob = scope.launch { + val input = socket.await().openReadChannel() + input.consumeEachBufferRange { buffer, last -> + val array = ByteArray(buffer.remaining()) + buffer.get(array) + receive(array) + isActive + } + } + + override suspend fun write(data: ByteArray) { + writeChannel.await().writeAvailable(data) + } + +} + +fun CoroutineScope.openTcpPort(host: String, port: Int): TcpPort { + val executor = Executors.newSingleThreadExecutor { r -> Thread(r).apply { - name = "[tcp]$ip:$port" + name = "port[tcp:$host:$port]" priority = Thread.MAX_PRIORITY } } - override val coroutineContext: CoroutineContext = parentScope.coroutineContext + executor.asCoroutineDispatcher() - - private var socket: AsynchronousSocketChannel = openSocket() - - private fun openSocket()= AsynchronousSocketChannel.open().bind(InetSocketAddress(ip, port)) - - private val listenerJob = launch { - val buffer = ByteBuffer.allocate(1024) - while (isActive) { - try { - val num = socket.readSuspended(buffer) - if (num > 0) { - receive(buffer.toArray(num)) - } - } catch (ex: Exception) { - logger.error("Channel read error", ex) - delay(100) - logger.info("Reconnecting") - socket = openSocket() - } - } + val job = SupervisorJob(coroutineContext[Job]) + val scope = CoroutineScope(coroutineContext + executor.asCoroutineDispatcher() + job) + job.invokeOnCompletion { + executor.shutdown() } - - override fun sendInternal(data: ByteArray) { - if (!socket.isOpen) socket = openSocket() - socket.write(ByteBuffer.wrap(data)) - } - + return TcpPort(scope, host, port) } \ No newline at end of file diff --git a/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/nio.kt b/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/nio.kt new file mode 100644 index 0000000..07715df --- /dev/null +++ b/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/nio.kt @@ -0,0 +1,174 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package hep.dataforge.control.ports + +import kotlinx.coroutines.CancellableContinuation +import kotlinx.coroutines.suspendCancellableCoroutine +import java.net.SocketAddress +import java.nio.ByteBuffer +import java.nio.channels.* +import java.util.concurrent.TimeUnit +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException + +///** +// * Performs [AsynchronousFileChannel.lock] without blocking a thread and resumes when asynchronous operation completes. +// * This suspending function is cancellable. +// * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function +// * *closes the underlying channel* and immediately resumes with [CancellationException]. +// */ +//suspend fun AsynchronousFileChannel.aLock() = suspendCancellableCoroutine { cont -> +// lock(cont, asyncIOHandler()) +// closeOnCancel(cont) +//} +// +///** +// * Performs [AsynchronousFileChannel.lock] without blocking a thread and resumes when asynchronous operation completes. +// * This suspending function is cancellable. +// * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function +// * *closes the underlying channel* and immediately resumes with [CancellationException]. +// */ +//suspend fun AsynchronousFileChannel.aLock( +// position: Long, +// size: Long, +// shared: Boolean +//) = suspendCancellableCoroutine { cont -> +// lock(position, size, shared, cont, asyncIOHandler()) +// closeOnCancel(cont) +//} +// +///** +// * Performs [AsynchronousFileChannel.read] without blocking a thread and resumes when asynchronous operation completes. +// * This suspending function is cancellable. +// * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function +// * *closes the underlying channel* and immediately resumes with [CancellationException]. +// */ +//suspend fun AsynchronousFileChannel.aRead( +// buf: ByteBuffer, +// position: Long +//) = suspendCancellableCoroutine { cont -> +// read(buf, position, cont, asyncIOHandler()) +// closeOnCancel(cont) +//} +// +///** +// * Performs [AsynchronousFileChannel.write] without blocking a thread and resumes when asynchronous operation completes. +// * This suspending function is cancellable. +// * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function +// * *closes the underlying channel* and immediately resumes with [CancellationException]. +// */ +//suspend fun AsynchronousFileChannel.aWrite( +// buf: ByteBuffer, +// position: Long +//) = suspendCancellableCoroutine { cont -> +// write(buf, position, cont, asyncIOHandler()) +// closeOnCancel(cont) +//} + +/** + * Performs [AsynchronousServerSocketChannel.accept] without blocking a thread and resumes when asynchronous operation completes. + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function + * *closes the underlying channel* and immediately resumes with [CancellationException]. + */ +internal suspend fun AsynchronousServerSocketChannel.suspendAccept() = + suspendCancellableCoroutine { cont -> + accept(cont, asyncIOHandler()) + closeOnCancel(cont) + } + +/** + * Performs [AsynchronousSocketChannel.connect] without blocking a thread and resumes when asynchronous operation completes. + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function + * *closes the underlying channel* and immediately resumes with [CancellationException]. + */ +internal suspend fun AsynchronousSocketChannel.suspendConnect( + socketAddress: SocketAddress +) = suspendCancellableCoroutine { cont -> + connect(socketAddress, cont, AsyncVoidIOHandler) + closeOnCancel(cont) +} + +/** + * Performs [AsynchronousSocketChannel.read] without blocking a thread and resumes when asynchronous operation completes. + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function + * *closes the underlying channel* and immediately resumes with [CancellationException]. + */ +internal suspend fun AsynchronousSocketChannel.suspendRead( + buf: ByteBuffer, + timeout: Long = 0L, + timeUnit: TimeUnit = TimeUnit.MILLISECONDS +) = suspendCancellableCoroutine { cont -> + read(buf, timeout, timeUnit, cont, asyncIOHandler()) + closeOnCancel(cont) +} + +/** + * Performs [AsynchronousSocketChannel.write] without blocking a thread and resumes when asynchronous operation completes. + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function + * *closes the underlying channel* and immediately resumes with [CancellationException]. + */ +internal suspend fun AsynchronousSocketChannel.suspendWrite( + buf: ByteBuffer, + timeout: Long = 0L, + timeUnit: TimeUnit = TimeUnit.MILLISECONDS +) = suspendCancellableCoroutine { cont -> + write(buf, timeout, timeUnit, cont, asyncIOHandler()) + closeOnCancel(cont) +} + +internal fun ByteBuffer.toArray(limit: Int = limit()): ByteArray { + rewind() + val response = ByteArray(limit) + get(response) + rewind() + return response +} + +// ---------------- private details ---------------- + +private fun Channel.closeOnCancel(cont: CancellableContinuation<*>) { + cont.invokeOnCancellation { + try { + close() + } catch (ex: Throwable) { + ex.printStackTrace() + // Specification says that it is Ok to call it any time, but reality is different, + // so we have just to ignore exception + } + } +} + +@Suppress("UNCHECKED_CAST") +private fun asyncIOHandler(): CompletionHandler> = + AsyncIOHandlerAny as CompletionHandler> + +private object AsyncIOHandlerAny : CompletionHandler> { + override fun completed(result: Any, cont: CancellableContinuation) { + cont.resume(result) + } + + override fun failed(ex: Throwable, cont: CancellableContinuation) { + // just return if already cancelled and got an expected exception for that case + if (ex is AsynchronousCloseException && cont.isCancelled) return + cont.resumeWithException(ex) + } +} + +private object AsyncVoidIOHandler : CompletionHandler> { + override fun completed(result: Void?, cont: CancellableContinuation) { + cont.resume(Unit) + } + + override fun failed(ex: Throwable, cont: CancellableContinuation) { + // just return if already cancelled and got an expected exception for that case + if (ex is AsynchronousCloseException && cont.isCancelled) return + cont.resumeWithException(ex) + } +} + diff --git a/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/PortIOTest.kt b/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/PortIOTest.kt index 94a0e68..16c4da5 100644 --- a/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/PortIOTest.kt +++ b/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/PortIOTest.kt @@ -16,6 +16,7 @@ internal class PortIOTest{ val chunked = flow.withDelimiter("?:".encodeToByteArray()) runBlocking { val result = chunked.toList() + assertEquals(3, result.size) assertEquals("bb?bddd?:",result[0].decodeToString()) assertEquals("defgb?:", result[1].decodeToString()) assertEquals("ddf34fb?:", result[2].decodeToString()) diff --git a/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt b/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt new file mode 100644 index 0000000..c788db1 --- /dev/null +++ b/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt @@ -0,0 +1,73 @@ +package hep.dataforge.control.ports + +import io.ktor.network.selector.ActorSelectorManager +import io.ktor.network.sockets.aSocket +import io.ktor.network.sockets.openReadChannel +import io.ktor.network.sockets.openWriteChannel +import io.ktor.util.KtorExperimentalAPI +import io.ktor.util.cio.write +import io.ktor.utils.io.readUTF8Line +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.collect +import org.junit.jupiter.api.Test +import java.net.InetSocketAddress + +@OptIn(KtorExperimentalAPI::class) +fun CoroutineScope.launchEchoServer(port: Int): Job = launch { + val server = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().bind(InetSocketAddress("localhost", port)) + println("Started echo telnet server at ${server.localAddress}") + + while (isActive) { + val socket = server.accept() + + launch { + println("Socket accepted: ${socket.remoteAddress}") + + val input = socket.openReadChannel() + val output = socket.openWriteChannel(autoFlush = true) + + try { + while (true) { + val line = input.readUTF8Line() + + println("${socket.remoteAddress}: $line") + output.write("$line\r\n") + } + } catch (e: Throwable) { + if (e !is CancellationException) { + e.printStackTrace() + } + socket.close() + } + } + } + +} + +class TcpPortTest { + @Test + fun testWithEchoServer() { + runBlocking { + coroutineScope { + val server = launchEchoServer(22188) + val port = openTcpPort("localhost", 22188) + launch { + port.flow().collect { + println("Flow: ${it.decodeToString()}") + } + } + delay(100) + port.send("aaa\n") + delay(10) + port.send("ddd\n") + + delay(200) + cancel() + } +// port.close() +// server.cancel() + } + + + } +} \ No newline at end of file diff --git a/demo/build.gradle.kts b/demo/build.gradle.kts index 62a250e..9da83fc 100644 --- a/demo/build.gradle.kts +++ b/demo/build.gradle.kts @@ -1,5 +1,5 @@ plugins { - kotlin("jvm") version "1.3.72" + kotlin("jvm") id("org.openjfx.javafxplugin") version "0.0.9" application } diff --git a/settings.gradle.kts b/settings.gradle.kts index 7233629..d4b3ffd 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -33,7 +33,7 @@ pluginManagement { } } -rootProject.name = "dataforge-device" +rootProject.name = "dataforge-control" include( ":dataforge-device-core",