From ff2c7ebbcc6ea6ff7f414e5e8268db80f4a8a8e0 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Wed, 5 Aug 2020 10:13:47 +0300 Subject: [PATCH] Fixed tcp port. Two versions of it --- .../hep/dataforge/control/ports/Port.kt | 7 +- .../dataforge/control/ports/KtorTcpPort.kt | 49 +++++ .../hep/dataforge/control/ports/TcpPort.kt | 65 +++---- .../kotlin/hep/dataforge/control/ports/nio.kt | 174 ------------------ .../dataforge/control/ports/TcpPortTest.kt | 45 +++-- 5 files changed, 111 insertions(+), 229 deletions(-) create mode 100644 dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/KtorTcpPort.kt delete mode 100644 dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/nio.kt diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt index ebb6286..7c7f99b 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt @@ -32,17 +32,16 @@ abstract class Port(val scope: CoroutineScope) : Closeable { */ protected fun receive(data: ByteArray) { scope.launch { - logger.debug { "RECEIVE: ${data.decodeToString()}" } + logger.debug { "RECEIVED: ${data.decodeToString()}" } incoming.send(data) } } private val sendJob = scope.launch { - //The port scope should be organized in order to avoid threading problems for (data in outgoing) { try { write(data) - logger.debug { "SEND: ${data.decodeToString()}" } + logger.debug { "SENT: ${data.decodeToString()}" } } catch (ex: Exception) { if(ex is CancellationException) throw ex logger.error(ex) { "Error while writing data to the port" } @@ -60,6 +59,8 @@ abstract class Port(val scope: CoroutineScope) : Closeable { override fun close() { scope.cancel("The port is closed") + outgoing.close() + incoming.close() } } diff --git a/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/KtorTcpPort.kt b/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/KtorTcpPort.kt new file mode 100644 index 0000000..eca8d3f --- /dev/null +++ b/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/KtorTcpPort.kt @@ -0,0 +1,49 @@ +package hep.dataforge.control.ports + +import io.ktor.network.selector.ActorSelectorManager +import io.ktor.network.sockets.aSocket +import io.ktor.network.sockets.openReadChannel +import io.ktor.network.sockets.openWriteChannel +import io.ktor.utils.io.consumeEachBufferRange +import io.ktor.utils.io.writeAvailable +import kotlinx.coroutines.* +import mu.KLogger +import mu.KotlinLogging +import java.net.InetSocketAddress + +class KtorTcpPort internal constructor( + scope: CoroutineScope, + val host: String, + val port: Int +) : Port(scope), AutoCloseable { + + override val logger: KLogger = KotlinLogging.logger("port[tcp:$host:$port]") + + private val socket = scope.async { + aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(InetSocketAddress(host, port)) + } + + private val writeChannel = scope.async { + socket.await().openWriteChannel(true) + } + + private val listenerJob = scope.launch { + val input = socket.await().openReadChannel() + input.consumeEachBufferRange { buffer, last -> + val array = ByteArray(buffer.remaining()) + buffer.get(array) + receive(array) + isActive + } + } + + override suspend fun write(data: ByteArray) { + writeChannel.await().writeAvailable(data) + } + +} + +fun CoroutineScope.openKtorTcpPort(host: String, port: Int): TcpPort { + val scope = CoroutineScope(SupervisorJob(coroutineContext[Job])) + return TcpPort(scope, host, port) +} \ No newline at end of file diff --git a/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt b/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt index 6573d2a..9e1d573 100644 --- a/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt +++ b/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt @@ -1,16 +1,19 @@ package hep.dataforge.control.ports -import io.ktor.network.selector.ActorSelectorManager -import io.ktor.network.sockets.aSocket -import io.ktor.network.sockets.openReadChannel -import io.ktor.network.sockets.openWriteChannel -import io.ktor.utils.io.consumeEachBufferRange -import io.ktor.utils.io.writeAvailable import kotlinx.coroutines.* import mu.KLogger import mu.KotlinLogging import java.net.InetSocketAddress -import java.util.concurrent.Executors +import java.nio.ByteBuffer +import java.nio.channels.SocketChannel + +internal fun ByteBuffer.readArray(limit: Int = limit()): ByteArray { + rewind() + val response = ByteArray(limit) + get(response) + rewind() + return response +} class TcpPort internal constructor( scope: CoroutineScope, @@ -20,41 +23,39 @@ class TcpPort internal constructor( override val logger: KLogger = KotlinLogging.logger("port[tcp:$host:$port]") - private val socket = scope.async { - aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(InetSocketAddress(host, port)) + private val futureChannel: Deferred = this.scope.async(Dispatchers.IO) { + SocketChannel.open(InetSocketAddress(host, port)) } - private val writeChannel = scope.async { - socket.await().openWriteChannel(true) - } + /** + * A handler to await port connection + */ + val startJob: Job get() = futureChannel - private val listenerJob = scope.launch { - val input = socket.await().openReadChannel() - input.consumeEachBufferRange { buffer, last -> - val array = ByteArray(buffer.remaining()) - buffer.get(array) - receive(array) - isActive + private val listenerJob = this.scope.launch { + val channel = futureChannel.await() + val buffer = ByteBuffer.allocate(1024) + while (isActive) { + try { + val num = channel.read(buffer) + if (num > 0) { + receive(buffer.readArray(num)) + } + if (num < 0) cancel("The input channel is exhausted") + } catch (ex: Exception) { + logger.error("Channel read error", ex) + delay(1000) + } } } override suspend fun write(data: ByteArray) { - writeChannel.await().writeAvailable(data) + futureChannel.await().write(ByteBuffer.wrap(data)) } - } fun CoroutineScope.openTcpPort(host: String, port: Int): TcpPort { - val executor = Executors.newSingleThreadExecutor { r -> - Thread(r).apply { - name = "port[tcp:$host:$port]" - priority = Thread.MAX_PRIORITY - } - } - val job = SupervisorJob(coroutineContext[Job]) - val scope = CoroutineScope(coroutineContext + executor.asCoroutineDispatcher() + job) - job.invokeOnCompletion { - executor.shutdown() - } + val scope = CoroutineScope(SupervisorJob(coroutineContext[Job])) return TcpPort(scope, host, port) + } \ No newline at end of file diff --git a/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/nio.kt b/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/nio.kt deleted file mode 100644 index 07715df..0000000 --- a/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/nio.kt +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package hep.dataforge.control.ports - -import kotlinx.coroutines.CancellableContinuation -import kotlinx.coroutines.suspendCancellableCoroutine -import java.net.SocketAddress -import java.nio.ByteBuffer -import java.nio.channels.* -import java.util.concurrent.TimeUnit -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException - -///** -// * Performs [AsynchronousFileChannel.lock] without blocking a thread and resumes when asynchronous operation completes. -// * This suspending function is cancellable. -// * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function -// * *closes the underlying channel* and immediately resumes with [CancellationException]. -// */ -//suspend fun AsynchronousFileChannel.aLock() = suspendCancellableCoroutine { cont -> -// lock(cont, asyncIOHandler()) -// closeOnCancel(cont) -//} -// -///** -// * Performs [AsynchronousFileChannel.lock] without blocking a thread and resumes when asynchronous operation completes. -// * This suspending function is cancellable. -// * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function -// * *closes the underlying channel* and immediately resumes with [CancellationException]. -// */ -//suspend fun AsynchronousFileChannel.aLock( -// position: Long, -// size: Long, -// shared: Boolean -//) = suspendCancellableCoroutine { cont -> -// lock(position, size, shared, cont, asyncIOHandler()) -// closeOnCancel(cont) -//} -// -///** -// * Performs [AsynchronousFileChannel.read] without blocking a thread and resumes when asynchronous operation completes. -// * This suspending function is cancellable. -// * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function -// * *closes the underlying channel* and immediately resumes with [CancellationException]. -// */ -//suspend fun AsynchronousFileChannel.aRead( -// buf: ByteBuffer, -// position: Long -//) = suspendCancellableCoroutine { cont -> -// read(buf, position, cont, asyncIOHandler()) -// closeOnCancel(cont) -//} -// -///** -// * Performs [AsynchronousFileChannel.write] without blocking a thread and resumes when asynchronous operation completes. -// * This suspending function is cancellable. -// * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function -// * *closes the underlying channel* and immediately resumes with [CancellationException]. -// */ -//suspend fun AsynchronousFileChannel.aWrite( -// buf: ByteBuffer, -// position: Long -//) = suspendCancellableCoroutine { cont -> -// write(buf, position, cont, asyncIOHandler()) -// closeOnCancel(cont) -//} - -/** - * Performs [AsynchronousServerSocketChannel.accept] without blocking a thread and resumes when asynchronous operation completes. - * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * *closes the underlying channel* and immediately resumes with [CancellationException]. - */ -internal suspend fun AsynchronousServerSocketChannel.suspendAccept() = - suspendCancellableCoroutine { cont -> - accept(cont, asyncIOHandler()) - closeOnCancel(cont) - } - -/** - * Performs [AsynchronousSocketChannel.connect] without blocking a thread and resumes when asynchronous operation completes. - * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * *closes the underlying channel* and immediately resumes with [CancellationException]. - */ -internal suspend fun AsynchronousSocketChannel.suspendConnect( - socketAddress: SocketAddress -) = suspendCancellableCoroutine { cont -> - connect(socketAddress, cont, AsyncVoidIOHandler) - closeOnCancel(cont) -} - -/** - * Performs [AsynchronousSocketChannel.read] without blocking a thread and resumes when asynchronous operation completes. - * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * *closes the underlying channel* and immediately resumes with [CancellationException]. - */ -internal suspend fun AsynchronousSocketChannel.suspendRead( - buf: ByteBuffer, - timeout: Long = 0L, - timeUnit: TimeUnit = TimeUnit.MILLISECONDS -) = suspendCancellableCoroutine { cont -> - read(buf, timeout, timeUnit, cont, asyncIOHandler()) - closeOnCancel(cont) -} - -/** - * Performs [AsynchronousSocketChannel.write] without blocking a thread and resumes when asynchronous operation completes. - * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * *closes the underlying channel* and immediately resumes with [CancellationException]. - */ -internal suspend fun AsynchronousSocketChannel.suspendWrite( - buf: ByteBuffer, - timeout: Long = 0L, - timeUnit: TimeUnit = TimeUnit.MILLISECONDS -) = suspendCancellableCoroutine { cont -> - write(buf, timeout, timeUnit, cont, asyncIOHandler()) - closeOnCancel(cont) -} - -internal fun ByteBuffer.toArray(limit: Int = limit()): ByteArray { - rewind() - val response = ByteArray(limit) - get(response) - rewind() - return response -} - -// ---------------- private details ---------------- - -private fun Channel.closeOnCancel(cont: CancellableContinuation<*>) { - cont.invokeOnCancellation { - try { - close() - } catch (ex: Throwable) { - ex.printStackTrace() - // Specification says that it is Ok to call it any time, but reality is different, - // so we have just to ignore exception - } - } -} - -@Suppress("UNCHECKED_CAST") -private fun asyncIOHandler(): CompletionHandler> = - AsyncIOHandlerAny as CompletionHandler> - -private object AsyncIOHandlerAny : CompletionHandler> { - override fun completed(result: Any, cont: CancellableContinuation) { - cont.resume(result) - } - - override fun failed(ex: Throwable, cont: CancellableContinuation) { - // just return if already cancelled and got an expected exception for that case - if (ex is AsynchronousCloseException && cont.isCancelled) return - cont.resumeWithException(ex) - } -} - -private object AsyncVoidIOHandler : CompletionHandler> { - override fun completed(result: Void?, cont: CancellableContinuation) { - cont.resume(Unit) - } - - override fun failed(ex: Throwable, cont: CancellableContinuation) { - // just return if already cancelled and got an expected exception for that case - if (ex is AsynchronousCloseException && cont.isCancelled) return - cont.resumeWithException(ex) - } -} - diff --git a/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt b/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt index c788db1..41d97a2 100644 --- a/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt +++ b/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt @@ -18,25 +18,30 @@ fun CoroutineScope.launchEchoServer(port: Int): Job = launch { println("Started echo telnet server at ${server.localAddress}") while (isActive) { - val socket = server.accept() + val socket = try { + server.accept() + } catch (ex: Exception) { + server.close() + return@launch + } launch { println("Socket accepted: ${socket.remoteAddress}") - val input = socket.openReadChannel() - val output = socket.openWriteChannel(autoFlush = true) - try { - while (true) { + val input = socket.openReadChannel() + val output = socket.openWriteChannel(autoFlush = true) + + + while (isActive) { val line = input.readUTF8Line() - println("${socket.remoteAddress}: $line") - output.write("$line\r\n") - } - } catch (e: Throwable) { - if (e !is CancellationException) { - e.printStackTrace() + //println("${socket.remoteAddress}: $line") + output.write("[response] $line") } + } catch (ex: Exception) { + cancel() + } finally { socket.close() } } @@ -47,27 +52,27 @@ fun CoroutineScope.launchEchoServer(port: Int): Job = launch { class TcpPortTest { @Test fun testWithEchoServer() { - runBlocking { - coroutineScope { + try { + runBlocking{ val server = launchEchoServer(22188) val port = openTcpPort("localhost", 22188) - launch { + + val logJob = launch { port.flow().collect { println("Flow: ${it.decodeToString()}") } } - delay(100) + port.startJob.join() port.send("aaa\n") - delay(10) +// delay(20) port.send("ddd\n") delay(200) + cancel() } -// port.close() -// server.cancel() + } catch (ex: Exception) { + if (ex !is CancellationException) throw ex } - - } } \ No newline at end of file