Fixed tcp port. Two versions of it
This commit is contained in:
parent
53ffe49875
commit
ff2c7ebbcc
@ -32,17 +32,16 @@ abstract class Port(val scope: CoroutineScope) : Closeable {
|
|||||||
*/
|
*/
|
||||||
protected fun receive(data: ByteArray) {
|
protected fun receive(data: ByteArray) {
|
||||||
scope.launch {
|
scope.launch {
|
||||||
logger.debug { "RECEIVE: ${data.decodeToString()}" }
|
logger.debug { "RECEIVED: ${data.decodeToString()}" }
|
||||||
incoming.send(data)
|
incoming.send(data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private val sendJob = scope.launch {
|
private val sendJob = scope.launch {
|
||||||
//The port scope should be organized in order to avoid threading problems
|
|
||||||
for (data in outgoing) {
|
for (data in outgoing) {
|
||||||
try {
|
try {
|
||||||
write(data)
|
write(data)
|
||||||
logger.debug { "SEND: ${data.decodeToString()}" }
|
logger.debug { "SENT: ${data.decodeToString()}" }
|
||||||
} catch (ex: Exception) {
|
} catch (ex: Exception) {
|
||||||
if(ex is CancellationException) throw ex
|
if(ex is CancellationException) throw ex
|
||||||
logger.error(ex) { "Error while writing data to the port" }
|
logger.error(ex) { "Error while writing data to the port" }
|
||||||
@ -60,6 +59,8 @@ abstract class Port(val scope: CoroutineScope) : Closeable {
|
|||||||
|
|
||||||
override fun close() {
|
override fun close() {
|
||||||
scope.cancel("The port is closed")
|
scope.cancel("The port is closed")
|
||||||
|
outgoing.close()
|
||||||
|
incoming.close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,49 @@
|
|||||||
|
package hep.dataforge.control.ports
|
||||||
|
|
||||||
|
import io.ktor.network.selector.ActorSelectorManager
|
||||||
|
import io.ktor.network.sockets.aSocket
|
||||||
|
import io.ktor.network.sockets.openReadChannel
|
||||||
|
import io.ktor.network.sockets.openWriteChannel
|
||||||
|
import io.ktor.utils.io.consumeEachBufferRange
|
||||||
|
import io.ktor.utils.io.writeAvailable
|
||||||
|
import kotlinx.coroutines.*
|
||||||
|
import mu.KLogger
|
||||||
|
import mu.KotlinLogging
|
||||||
|
import java.net.InetSocketAddress
|
||||||
|
|
||||||
|
class KtorTcpPort internal constructor(
|
||||||
|
scope: CoroutineScope,
|
||||||
|
val host: String,
|
||||||
|
val port: Int
|
||||||
|
) : Port(scope), AutoCloseable {
|
||||||
|
|
||||||
|
override val logger: KLogger = KotlinLogging.logger("port[tcp:$host:$port]")
|
||||||
|
|
||||||
|
private val socket = scope.async {
|
||||||
|
aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(InetSocketAddress(host, port))
|
||||||
|
}
|
||||||
|
|
||||||
|
private val writeChannel = scope.async {
|
||||||
|
socket.await().openWriteChannel(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
private val listenerJob = scope.launch {
|
||||||
|
val input = socket.await().openReadChannel()
|
||||||
|
input.consumeEachBufferRange { buffer, last ->
|
||||||
|
val array = ByteArray(buffer.remaining())
|
||||||
|
buffer.get(array)
|
||||||
|
receive(array)
|
||||||
|
isActive
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun write(data: ByteArray) {
|
||||||
|
writeChannel.await().writeAvailable(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
fun CoroutineScope.openKtorTcpPort(host: String, port: Int): TcpPort {
|
||||||
|
val scope = CoroutineScope(SupervisorJob(coroutineContext[Job]))
|
||||||
|
return TcpPort(scope, host, port)
|
||||||
|
}
|
@ -1,16 +1,19 @@
|
|||||||
package hep.dataforge.control.ports
|
package hep.dataforge.control.ports
|
||||||
|
|
||||||
import io.ktor.network.selector.ActorSelectorManager
|
|
||||||
import io.ktor.network.sockets.aSocket
|
|
||||||
import io.ktor.network.sockets.openReadChannel
|
|
||||||
import io.ktor.network.sockets.openWriteChannel
|
|
||||||
import io.ktor.utils.io.consumeEachBufferRange
|
|
||||||
import io.ktor.utils.io.writeAvailable
|
|
||||||
import kotlinx.coroutines.*
|
import kotlinx.coroutines.*
|
||||||
import mu.KLogger
|
import mu.KLogger
|
||||||
import mu.KotlinLogging
|
import mu.KotlinLogging
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import java.util.concurrent.Executors
|
import java.nio.ByteBuffer
|
||||||
|
import java.nio.channels.SocketChannel
|
||||||
|
|
||||||
|
internal fun ByteBuffer.readArray(limit: Int = limit()): ByteArray {
|
||||||
|
rewind()
|
||||||
|
val response = ByteArray(limit)
|
||||||
|
get(response)
|
||||||
|
rewind()
|
||||||
|
return response
|
||||||
|
}
|
||||||
|
|
||||||
class TcpPort internal constructor(
|
class TcpPort internal constructor(
|
||||||
scope: CoroutineScope,
|
scope: CoroutineScope,
|
||||||
@ -20,41 +23,39 @@ class TcpPort internal constructor(
|
|||||||
|
|
||||||
override val logger: KLogger = KotlinLogging.logger("port[tcp:$host:$port]")
|
override val logger: KLogger = KotlinLogging.logger("port[tcp:$host:$port]")
|
||||||
|
|
||||||
private val socket = scope.async {
|
private val futureChannel: Deferred<SocketChannel> = this.scope.async(Dispatchers.IO) {
|
||||||
aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(InetSocketAddress(host, port))
|
SocketChannel.open(InetSocketAddress(host, port))
|
||||||
}
|
}
|
||||||
|
|
||||||
private val writeChannel = scope.async {
|
/**
|
||||||
socket.await().openWriteChannel(true)
|
* A handler to await port connection
|
||||||
}
|
*/
|
||||||
|
val startJob: Job get() = futureChannel
|
||||||
|
|
||||||
private val listenerJob = scope.launch {
|
private val listenerJob = this.scope.launch {
|
||||||
val input = socket.await().openReadChannel()
|
val channel = futureChannel.await()
|
||||||
input.consumeEachBufferRange { buffer, last ->
|
val buffer = ByteBuffer.allocate(1024)
|
||||||
val array = ByteArray(buffer.remaining())
|
while (isActive) {
|
||||||
buffer.get(array)
|
try {
|
||||||
receive(array)
|
val num = channel.read(buffer)
|
||||||
isActive
|
if (num > 0) {
|
||||||
|
receive(buffer.readArray(num))
|
||||||
|
}
|
||||||
|
if (num < 0) cancel("The input channel is exhausted")
|
||||||
|
} catch (ex: Exception) {
|
||||||
|
logger.error("Channel read error", ex)
|
||||||
|
delay(1000)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun write(data: ByteArray) {
|
override suspend fun write(data: ByteArray) {
|
||||||
writeChannel.await().writeAvailable(data)
|
futureChannel.await().write(ByteBuffer.wrap(data))
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fun CoroutineScope.openTcpPort(host: String, port: Int): TcpPort {
|
fun CoroutineScope.openTcpPort(host: String, port: Int): TcpPort {
|
||||||
val executor = Executors.newSingleThreadExecutor { r ->
|
val scope = CoroutineScope(SupervisorJob(coroutineContext[Job]))
|
||||||
Thread(r).apply {
|
|
||||||
name = "port[tcp:$host:$port]"
|
|
||||||
priority = Thread.MAX_PRIORITY
|
|
||||||
}
|
|
||||||
}
|
|
||||||
val job = SupervisorJob(coroutineContext[Job])
|
|
||||||
val scope = CoroutineScope(coroutineContext + executor.asCoroutineDispatcher() + job)
|
|
||||||
job.invokeOnCompletion {
|
|
||||||
executor.shutdown()
|
|
||||||
}
|
|
||||||
return TcpPort(scope, host, port)
|
return TcpPort(scope, host, port)
|
||||||
|
|
||||||
}
|
}
|
@ -1,174 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package hep.dataforge.control.ports
|
|
||||||
|
|
||||||
import kotlinx.coroutines.CancellableContinuation
|
|
||||||
import kotlinx.coroutines.suspendCancellableCoroutine
|
|
||||||
import java.net.SocketAddress
|
|
||||||
import java.nio.ByteBuffer
|
|
||||||
import java.nio.channels.*
|
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
import kotlin.coroutines.resume
|
|
||||||
import kotlin.coroutines.resumeWithException
|
|
||||||
|
|
||||||
///**
|
|
||||||
// * Performs [AsynchronousFileChannel.lock] without blocking a thread and resumes when asynchronous operation completes.
|
|
||||||
// * This suspending function is cancellable.
|
|
||||||
// * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
|
|
||||||
// * *closes the underlying channel* and immediately resumes with [CancellationException].
|
|
||||||
// */
|
|
||||||
//suspend fun AsynchronousFileChannel.aLock() = suspendCancellableCoroutine<FileLock> { cont ->
|
|
||||||
// lock(cont, asyncIOHandler())
|
|
||||||
// closeOnCancel(cont)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
///**
|
|
||||||
// * Performs [AsynchronousFileChannel.lock] without blocking a thread and resumes when asynchronous operation completes.
|
|
||||||
// * This suspending function is cancellable.
|
|
||||||
// * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
|
|
||||||
// * *closes the underlying channel* and immediately resumes with [CancellationException].
|
|
||||||
// */
|
|
||||||
//suspend fun AsynchronousFileChannel.aLock(
|
|
||||||
// position: Long,
|
|
||||||
// size: Long,
|
|
||||||
// shared: Boolean
|
|
||||||
//) = suspendCancellableCoroutine<FileLock> { cont ->
|
|
||||||
// lock(position, size, shared, cont, asyncIOHandler())
|
|
||||||
// closeOnCancel(cont)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
///**
|
|
||||||
// * Performs [AsynchronousFileChannel.read] without blocking a thread and resumes when asynchronous operation completes.
|
|
||||||
// * This suspending function is cancellable.
|
|
||||||
// * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
|
|
||||||
// * *closes the underlying channel* and immediately resumes with [CancellationException].
|
|
||||||
// */
|
|
||||||
//suspend fun AsynchronousFileChannel.aRead(
|
|
||||||
// buf: ByteBuffer,
|
|
||||||
// position: Long
|
|
||||||
//) = suspendCancellableCoroutine<Int> { cont ->
|
|
||||||
// read(buf, position, cont, asyncIOHandler())
|
|
||||||
// closeOnCancel(cont)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
///**
|
|
||||||
// * Performs [AsynchronousFileChannel.write] without blocking a thread and resumes when asynchronous operation completes.
|
|
||||||
// * This suspending function is cancellable.
|
|
||||||
// * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
|
|
||||||
// * *closes the underlying channel* and immediately resumes with [CancellationException].
|
|
||||||
// */
|
|
||||||
//suspend fun AsynchronousFileChannel.aWrite(
|
|
||||||
// buf: ByteBuffer,
|
|
||||||
// position: Long
|
|
||||||
//) = suspendCancellableCoroutine<Int> { cont ->
|
|
||||||
// write(buf, position, cont, asyncIOHandler())
|
|
||||||
// closeOnCancel(cont)
|
|
||||||
//}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Performs [AsynchronousServerSocketChannel.accept] without blocking a thread and resumes when asynchronous operation completes.
|
|
||||||
* This suspending function is cancellable.
|
|
||||||
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
|
|
||||||
* *closes the underlying channel* and immediately resumes with [CancellationException].
|
|
||||||
*/
|
|
||||||
internal suspend fun AsynchronousServerSocketChannel.suspendAccept() =
|
|
||||||
suspendCancellableCoroutine<AsynchronousSocketChannel> { cont ->
|
|
||||||
accept(cont, asyncIOHandler())
|
|
||||||
closeOnCancel(cont)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Performs [AsynchronousSocketChannel.connect] without blocking a thread and resumes when asynchronous operation completes.
|
|
||||||
* This suspending function is cancellable.
|
|
||||||
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
|
|
||||||
* *closes the underlying channel* and immediately resumes with [CancellationException].
|
|
||||||
*/
|
|
||||||
internal suspend fun AsynchronousSocketChannel.suspendConnect(
|
|
||||||
socketAddress: SocketAddress
|
|
||||||
) = suspendCancellableCoroutine<Unit> { cont ->
|
|
||||||
connect(socketAddress, cont, AsyncVoidIOHandler)
|
|
||||||
closeOnCancel(cont)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Performs [AsynchronousSocketChannel.read] without blocking a thread and resumes when asynchronous operation completes.
|
|
||||||
* This suspending function is cancellable.
|
|
||||||
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
|
|
||||||
* *closes the underlying channel* and immediately resumes with [CancellationException].
|
|
||||||
*/
|
|
||||||
internal suspend fun AsynchronousSocketChannel.suspendRead(
|
|
||||||
buf: ByteBuffer,
|
|
||||||
timeout: Long = 0L,
|
|
||||||
timeUnit: TimeUnit = TimeUnit.MILLISECONDS
|
|
||||||
) = suspendCancellableCoroutine<Int> { cont ->
|
|
||||||
read(buf, timeout, timeUnit, cont, asyncIOHandler())
|
|
||||||
closeOnCancel(cont)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Performs [AsynchronousSocketChannel.write] without blocking a thread and resumes when asynchronous operation completes.
|
|
||||||
* This suspending function is cancellable.
|
|
||||||
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
|
|
||||||
* *closes the underlying channel* and immediately resumes with [CancellationException].
|
|
||||||
*/
|
|
||||||
internal suspend fun AsynchronousSocketChannel.suspendWrite(
|
|
||||||
buf: ByteBuffer,
|
|
||||||
timeout: Long = 0L,
|
|
||||||
timeUnit: TimeUnit = TimeUnit.MILLISECONDS
|
|
||||||
) = suspendCancellableCoroutine<Int> { cont ->
|
|
||||||
write(buf, timeout, timeUnit, cont, asyncIOHandler())
|
|
||||||
closeOnCancel(cont)
|
|
||||||
}
|
|
||||||
|
|
||||||
internal fun ByteBuffer.toArray(limit: Int = limit()): ByteArray {
|
|
||||||
rewind()
|
|
||||||
val response = ByteArray(limit)
|
|
||||||
get(response)
|
|
||||||
rewind()
|
|
||||||
return response
|
|
||||||
}
|
|
||||||
|
|
||||||
// ---------------- private details ----------------
|
|
||||||
|
|
||||||
private fun Channel.closeOnCancel(cont: CancellableContinuation<*>) {
|
|
||||||
cont.invokeOnCancellation {
|
|
||||||
try {
|
|
||||||
close()
|
|
||||||
} catch (ex: Throwable) {
|
|
||||||
ex.printStackTrace()
|
|
||||||
// Specification says that it is Ok to call it any time, but reality is different,
|
|
||||||
// so we have just to ignore exception
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Suppress("UNCHECKED_CAST")
|
|
||||||
private fun <T> asyncIOHandler(): CompletionHandler<T, CancellableContinuation<T>> =
|
|
||||||
AsyncIOHandlerAny as CompletionHandler<T, CancellableContinuation<T>>
|
|
||||||
|
|
||||||
private object AsyncIOHandlerAny : CompletionHandler<Any, CancellableContinuation<Any>> {
|
|
||||||
override fun completed(result: Any, cont: CancellableContinuation<Any>) {
|
|
||||||
cont.resume(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun failed(ex: Throwable, cont: CancellableContinuation<Any>) {
|
|
||||||
// just return if already cancelled and got an expected exception for that case
|
|
||||||
if (ex is AsynchronousCloseException && cont.isCancelled) return
|
|
||||||
cont.resumeWithException(ex)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private object AsyncVoidIOHandler : CompletionHandler<Void?, CancellableContinuation<Unit>> {
|
|
||||||
override fun completed(result: Void?, cont: CancellableContinuation<Unit>) {
|
|
||||||
cont.resume(Unit)
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun failed(ex: Throwable, cont: CancellableContinuation<Unit>) {
|
|
||||||
// just return if already cancelled and got an expected exception for that case
|
|
||||||
if (ex is AsynchronousCloseException && cont.isCancelled) return
|
|
||||||
cont.resumeWithException(ex)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -18,25 +18,30 @@ fun CoroutineScope.launchEchoServer(port: Int): Job = launch {
|
|||||||
println("Started echo telnet server at ${server.localAddress}")
|
println("Started echo telnet server at ${server.localAddress}")
|
||||||
|
|
||||||
while (isActive) {
|
while (isActive) {
|
||||||
val socket = server.accept()
|
val socket = try {
|
||||||
|
server.accept()
|
||||||
|
} catch (ex: Exception) {
|
||||||
|
server.close()
|
||||||
|
return@launch
|
||||||
|
}
|
||||||
|
|
||||||
launch {
|
launch {
|
||||||
println("Socket accepted: ${socket.remoteAddress}")
|
println("Socket accepted: ${socket.remoteAddress}")
|
||||||
|
|
||||||
|
try {
|
||||||
val input = socket.openReadChannel()
|
val input = socket.openReadChannel()
|
||||||
val output = socket.openWriteChannel(autoFlush = true)
|
val output = socket.openWriteChannel(autoFlush = true)
|
||||||
|
|
||||||
try {
|
|
||||||
while (true) {
|
while (isActive) {
|
||||||
val line = input.readUTF8Line()
|
val line = input.readUTF8Line()
|
||||||
|
|
||||||
println("${socket.remoteAddress}: $line")
|
//println("${socket.remoteAddress}: $line")
|
||||||
output.write("$line\r\n")
|
output.write("[response] $line")
|
||||||
}
|
|
||||||
} catch (e: Throwable) {
|
|
||||||
if (e !is CancellationException) {
|
|
||||||
e.printStackTrace()
|
|
||||||
}
|
}
|
||||||
|
} catch (ex: Exception) {
|
||||||
|
cancel()
|
||||||
|
} finally {
|
||||||
socket.close()
|
socket.close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -47,27 +52,27 @@ fun CoroutineScope.launchEchoServer(port: Int): Job = launch {
|
|||||||
class TcpPortTest {
|
class TcpPortTest {
|
||||||
@Test
|
@Test
|
||||||
fun testWithEchoServer() {
|
fun testWithEchoServer() {
|
||||||
runBlocking {
|
try {
|
||||||
coroutineScope {
|
runBlocking{
|
||||||
val server = launchEchoServer(22188)
|
val server = launchEchoServer(22188)
|
||||||
val port = openTcpPort("localhost", 22188)
|
val port = openTcpPort("localhost", 22188)
|
||||||
launch {
|
|
||||||
|
val logJob = launch {
|
||||||
port.flow().collect {
|
port.flow().collect {
|
||||||
println("Flow: ${it.decodeToString()}")
|
println("Flow: ${it.decodeToString()}")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
delay(100)
|
port.startJob.join()
|
||||||
port.send("aaa\n")
|
port.send("aaa\n")
|
||||||
delay(10)
|
// delay(20)
|
||||||
port.send("ddd\n")
|
port.send("ddd\n")
|
||||||
|
|
||||||
delay(200)
|
delay(200)
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
// port.close()
|
} catch (ex: Exception) {
|
||||||
// server.cancel()
|
if (ex !is CancellationException) throw ex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user