Ktor-io TCP implementation

This commit is contained in:
Alexander Nozik 2020-08-04 16:13:53 +03:00
parent 2f8584829f
commit 53ffe49875
8 changed files with 305 additions and 92 deletions

View File

@ -17,17 +17,15 @@ kotlin {
dependencies { dependencies {
api("hep.dataforge:dataforge-io:$dataforgeVersion") api("hep.dataforge:dataforge-io:$dataforgeVersion")
//implementation("org.jetbrains.kotlinx:atomicfu-common:0.14.3") //implementation("org.jetbrains.kotlinx:atomicfu-common:0.14.3")
//api("io.github.microutils:kotlin-logging-common:1.8.3")
} }
} }
jvmMain{ jvmMain{
dependencies{ dependencies{
//api("io.github.microutils:kotlin-logging:1.8.3") api("io.ktor:ktor-network:1.3.2")
} }
} }
jsMain{ jsMain{
dependencies{ dependencies{
//api("io.github.microutils:kotlin-logging-js:1.8.3")
} }
} }
} }

View File

@ -1,5 +1,6 @@
package hep.dataforge.control.ports package hep.dataforge.control.ports
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
@ -13,7 +14,7 @@ import kotlinx.io.ByteArrayOutput
import kotlinx.io.Closeable import kotlinx.io.Closeable
import mu.KLogger import mu.KLogger
abstract class Port : Closeable, CoroutineScope { abstract class Port(val scope: CoroutineScope) : Closeable {
abstract val logger: KLogger abstract val logger: KLogger
@ -24,25 +25,27 @@ abstract class Port : Closeable, CoroutineScope {
/** /**
* Internal method to synchronously send data * Internal method to synchronously send data
*/ */
protected abstract fun sendInternal(data: ByteArray) protected abstract suspend fun write(data: ByteArray)
/** /**
* Internal method to receive data synchronously * Internal method to receive data synchronously
*/ */
protected fun receive(data: ByteArray) { protected fun receive(data: ByteArray) {
launch { scope.launch {
logger.debug { "RECEIVE: ${data.decodeToString()}" }
incoming.send(data) incoming.send(data)
} }
} }
private val sendJob = launch { private val sendJob = scope.launch {
//using special dispatcher to avoid threading problems //The port scope should be organized in order to avoid threading problems
for (data in outgoing) { for (data in outgoing) {
try { try {
sendInternal(data) write(data)
logger.debug { "SEND: ${data.decodeToString()}" } logger.debug { "SEND: ${data.decodeToString()}" }
} catch (ex: Exception) { } catch (ex: Exception) {
logger.error(ex) { "Error while sending data" } if(ex is CancellationException) throw ex
logger.error(ex) { "Error while writing data to the port" }
} }
} }
} }
@ -56,7 +59,7 @@ abstract class Port : Closeable, CoroutineScope {
} }
override fun close() { override fun close() {
cancel("The port is closed") scope.cancel("The port is closed")
} }
} }

View File

@ -1,96 +1,60 @@
package hep.dataforge.control.ports package hep.dataforge.control.ports
import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.utils.io.consumeEachBufferRange
import io.ktor.utils.io.writeAvailable
import kotlinx.coroutines.* import kotlinx.coroutines.*
import mu.KLogger import mu.KLogger
import mu.KotlinLogging import mu.KotlinLogging
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousCloseException
import java.nio.channels.AsynchronousSocketChannel
import java.nio.channels.CompletionHandler
import java.util.concurrent.Executors import java.util.concurrent.Executors
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
@Suppress("UNCHECKED_CAST") class TcpPort internal constructor(
private fun <T> asyncIOHandler(): CompletionHandler<T, CancellableContinuation<T>> = scope: CoroutineScope,
object : CompletionHandler<T, CancellableContinuation<T>> { val host: String,
override fun completed(result: T, cont: CancellableContinuation<T>) {
cont.resume(result)
}
override fun failed(ex: Throwable, cont: CancellableContinuation<T>) {
// just return if already cancelled and got an expected exception for that case
if (ex is AsynchronousCloseException && cont.isCancelled) return
cont.resumeWithException(ex)
}
}
suspend fun AsynchronousSocketChannel.readSuspended(
buf: ByteBuffer
) = suspendCancellableCoroutine<Int> { cont ->
read(buf, cont, asyncIOHandler<Int>())
cont.invokeOnCancellation {
try {
close()
} catch (ex: Throwable) {
// Specification says that it is Ok to call it any time, but reality is different,
// so we have just to ignore exception
}
}
}
private fun ByteBuffer.toArray(limit: Int = limit()): ByteArray{
rewind()
val response = ByteArray(limit)
get(response)
rewind()
return response
}
class TcpPort(
parentScope: CoroutineScope,
val ip: String,
val port: Int val port: Int
) : Port() { ) : Port(scope), AutoCloseable {
override val logger: KLogger = KotlinLogging.logger("[tcp]$ip:$port") override val logger: KLogger = KotlinLogging.logger("port[tcp:$host:$port]")
private val executor = Executors.newSingleThreadExecutor { r -> private val socket = scope.async {
aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(InetSocketAddress(host, port))
}
private val writeChannel = scope.async {
socket.await().openWriteChannel(true)
}
private val listenerJob = scope.launch {
val input = socket.await().openReadChannel()
input.consumeEachBufferRange { buffer, last ->
val array = ByteArray(buffer.remaining())
buffer.get(array)
receive(array)
isActive
}
}
override suspend fun write(data: ByteArray) {
writeChannel.await().writeAvailable(data)
}
}
fun CoroutineScope.openTcpPort(host: String, port: Int): TcpPort {
val executor = Executors.newSingleThreadExecutor { r ->
Thread(r).apply { Thread(r).apply {
name = "[tcp]$ip:$port" name = "port[tcp:$host:$port]"
priority = Thread.MAX_PRIORITY priority = Thread.MAX_PRIORITY
} }
} }
override val coroutineContext: CoroutineContext = parentScope.coroutineContext + executor.asCoroutineDispatcher() val job = SupervisorJob(coroutineContext[Job])
val scope = CoroutineScope(coroutineContext + executor.asCoroutineDispatcher() + job)
private var socket: AsynchronousSocketChannel = openSocket() job.invokeOnCompletion {
executor.shutdown()
private fun openSocket()= AsynchronousSocketChannel.open().bind(InetSocketAddress(ip, port))
private val listenerJob = launch {
val buffer = ByteBuffer.allocate(1024)
while (isActive) {
try {
val num = socket.readSuspended(buffer)
if (num > 0) {
receive(buffer.toArray(num))
}
} catch (ex: Exception) {
logger.error("Channel read error", ex)
delay(100)
logger.info("Reconnecting")
socket = openSocket()
}
}
} }
return TcpPort(scope, host, port)
override fun sendInternal(data: ByteArray) {
if (!socket.isOpen) socket = openSocket()
socket.write(ByteBuffer.wrap(data))
}
} }

View File

@ -0,0 +1,174 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package hep.dataforge.control.ports
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.suspendCancellableCoroutine
import java.net.SocketAddress
import java.nio.ByteBuffer
import java.nio.channels.*
import java.util.concurrent.TimeUnit
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
///**
// * Performs [AsynchronousFileChannel.lock] without blocking a thread and resumes when asynchronous operation completes.
// * This suspending function is cancellable.
// * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
// * *closes the underlying channel* and immediately resumes with [CancellationException].
// */
//suspend fun AsynchronousFileChannel.aLock() = suspendCancellableCoroutine<FileLock> { cont ->
// lock(cont, asyncIOHandler())
// closeOnCancel(cont)
//}
//
///**
// * Performs [AsynchronousFileChannel.lock] without blocking a thread and resumes when asynchronous operation completes.
// * This suspending function is cancellable.
// * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
// * *closes the underlying channel* and immediately resumes with [CancellationException].
// */
//suspend fun AsynchronousFileChannel.aLock(
// position: Long,
// size: Long,
// shared: Boolean
//) = suspendCancellableCoroutine<FileLock> { cont ->
// lock(position, size, shared, cont, asyncIOHandler())
// closeOnCancel(cont)
//}
//
///**
// * Performs [AsynchronousFileChannel.read] without blocking a thread and resumes when asynchronous operation completes.
// * This suspending function is cancellable.
// * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
// * *closes the underlying channel* and immediately resumes with [CancellationException].
// */
//suspend fun AsynchronousFileChannel.aRead(
// buf: ByteBuffer,
// position: Long
//) = suspendCancellableCoroutine<Int> { cont ->
// read(buf, position, cont, asyncIOHandler())
// closeOnCancel(cont)
//}
//
///**
// * Performs [AsynchronousFileChannel.write] without blocking a thread and resumes when asynchronous operation completes.
// * This suspending function is cancellable.
// * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
// * *closes the underlying channel* and immediately resumes with [CancellationException].
// */
//suspend fun AsynchronousFileChannel.aWrite(
// buf: ByteBuffer,
// position: Long
//) = suspendCancellableCoroutine<Int> { cont ->
// write(buf, position, cont, asyncIOHandler())
// closeOnCancel(cont)
//}
/**
* Performs [AsynchronousServerSocketChannel.accept] without blocking a thread and resumes when asynchronous operation completes.
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* *closes the underlying channel* and immediately resumes with [CancellationException].
*/
internal suspend fun AsynchronousServerSocketChannel.suspendAccept() =
suspendCancellableCoroutine<AsynchronousSocketChannel> { cont ->
accept(cont, asyncIOHandler())
closeOnCancel(cont)
}
/**
* Performs [AsynchronousSocketChannel.connect] without blocking a thread and resumes when asynchronous operation completes.
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* *closes the underlying channel* and immediately resumes with [CancellationException].
*/
internal suspend fun AsynchronousSocketChannel.suspendConnect(
socketAddress: SocketAddress
) = suspendCancellableCoroutine<Unit> { cont ->
connect(socketAddress, cont, AsyncVoidIOHandler)
closeOnCancel(cont)
}
/**
* Performs [AsynchronousSocketChannel.read] without blocking a thread and resumes when asynchronous operation completes.
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* *closes the underlying channel* and immediately resumes with [CancellationException].
*/
internal suspend fun AsynchronousSocketChannel.suspendRead(
buf: ByteBuffer,
timeout: Long = 0L,
timeUnit: TimeUnit = TimeUnit.MILLISECONDS
) = suspendCancellableCoroutine<Int> { cont ->
read(buf, timeout, timeUnit, cont, asyncIOHandler())
closeOnCancel(cont)
}
/**
* Performs [AsynchronousSocketChannel.write] without blocking a thread and resumes when asynchronous operation completes.
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* *closes the underlying channel* and immediately resumes with [CancellationException].
*/
internal suspend fun AsynchronousSocketChannel.suspendWrite(
buf: ByteBuffer,
timeout: Long = 0L,
timeUnit: TimeUnit = TimeUnit.MILLISECONDS
) = suspendCancellableCoroutine<Int> { cont ->
write(buf, timeout, timeUnit, cont, asyncIOHandler())
closeOnCancel(cont)
}
internal fun ByteBuffer.toArray(limit: Int = limit()): ByteArray {
rewind()
val response = ByteArray(limit)
get(response)
rewind()
return response
}
// ---------------- private details ----------------
private fun Channel.closeOnCancel(cont: CancellableContinuation<*>) {
cont.invokeOnCancellation {
try {
close()
} catch (ex: Throwable) {
ex.printStackTrace()
// Specification says that it is Ok to call it any time, but reality is different,
// so we have just to ignore exception
}
}
}
@Suppress("UNCHECKED_CAST")
private fun <T> asyncIOHandler(): CompletionHandler<T, CancellableContinuation<T>> =
AsyncIOHandlerAny as CompletionHandler<T, CancellableContinuation<T>>
private object AsyncIOHandlerAny : CompletionHandler<Any, CancellableContinuation<Any>> {
override fun completed(result: Any, cont: CancellableContinuation<Any>) {
cont.resume(result)
}
override fun failed(ex: Throwable, cont: CancellableContinuation<Any>) {
// just return if already cancelled and got an expected exception for that case
if (ex is AsynchronousCloseException && cont.isCancelled) return
cont.resumeWithException(ex)
}
}
private object AsyncVoidIOHandler : CompletionHandler<Void?, CancellableContinuation<Unit>> {
override fun completed(result: Void?, cont: CancellableContinuation<Unit>) {
cont.resume(Unit)
}
override fun failed(ex: Throwable, cont: CancellableContinuation<Unit>) {
// just return if already cancelled and got an expected exception for that case
if (ex is AsynchronousCloseException && cont.isCancelled) return
cont.resumeWithException(ex)
}
}

View File

@ -16,6 +16,7 @@ internal class PortIOTest{
val chunked = flow.withDelimiter("?:".encodeToByteArray()) val chunked = flow.withDelimiter("?:".encodeToByteArray())
runBlocking { runBlocking {
val result = chunked.toList() val result = chunked.toList()
assertEquals(3, result.size)
assertEquals("bb?bddd?:",result[0].decodeToString()) assertEquals("bb?bddd?:",result[0].decodeToString())
assertEquals("defgb?:", result[1].decodeToString()) assertEquals("defgb?:", result[1].decodeToString())
assertEquals("ddf34fb?:", result[2].decodeToString()) assertEquals("ddf34fb?:", result[2].decodeToString())

View File

@ -0,0 +1,73 @@
package hep.dataforge.control.ports
import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.util.KtorExperimentalAPI
import io.ktor.util.cio.write
import io.ktor.utils.io.readUTF8Line
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.collect
import org.junit.jupiter.api.Test
import java.net.InetSocketAddress
@OptIn(KtorExperimentalAPI::class)
fun CoroutineScope.launchEchoServer(port: Int): Job = launch {
val server = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().bind(InetSocketAddress("localhost", port))
println("Started echo telnet server at ${server.localAddress}")
while (isActive) {
val socket = server.accept()
launch {
println("Socket accepted: ${socket.remoteAddress}")
val input = socket.openReadChannel()
val output = socket.openWriteChannel(autoFlush = true)
try {
while (true) {
val line = input.readUTF8Line()
println("${socket.remoteAddress}: $line")
output.write("$line\r\n")
}
} catch (e: Throwable) {
if (e !is CancellationException) {
e.printStackTrace()
}
socket.close()
}
}
}
}
class TcpPortTest {
@Test
fun testWithEchoServer() {
runBlocking {
coroutineScope {
val server = launchEchoServer(22188)
val port = openTcpPort("localhost", 22188)
launch {
port.flow().collect {
println("Flow: ${it.decodeToString()}")
}
}
delay(100)
port.send("aaa\n")
delay(10)
port.send("ddd\n")
delay(200)
cancel()
}
// port.close()
// server.cancel()
}
}
}

View File

@ -1,5 +1,5 @@
plugins { plugins {
kotlin("jvm") version "1.3.72" kotlin("jvm")
id("org.openjfx.javafxplugin") version "0.0.9" id("org.openjfx.javafxplugin") version "0.0.9"
application application
} }

View File

@ -33,7 +33,7 @@ pluginManagement {
} }
} }
rootProject.name = "dataforge-device" rootProject.name = "dataforge-control"
include( include(
":dataforge-device-core", ":dataforge-device-core",