Ports
This commit is contained in:
parent
aa58674a23
commit
2f8584829f
@ -17,6 +17,17 @@ kotlin {
|
|||||||
dependencies {
|
dependencies {
|
||||||
api("hep.dataforge:dataforge-io:$dataforgeVersion")
|
api("hep.dataforge:dataforge-io:$dataforgeVersion")
|
||||||
//implementation("org.jetbrains.kotlinx:atomicfu-common:0.14.3")
|
//implementation("org.jetbrains.kotlinx:atomicfu-common:0.14.3")
|
||||||
|
//api("io.github.microutils:kotlin-logging-common:1.8.3")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jvmMain{
|
||||||
|
dependencies{
|
||||||
|
//api("io.github.microutils:kotlin-logging:1.8.3")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jsMain{
|
||||||
|
dependencies{
|
||||||
|
//api("io.github.microutils:kotlin-logging-js:1.8.3")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,92 @@
|
|||||||
|
package hep.dataforge.control.ports
|
||||||
|
|
||||||
|
import kotlinx.coroutines.CoroutineScope
|
||||||
|
import kotlinx.coroutines.cancel
|
||||||
|
import kotlinx.coroutines.channels.Channel
|
||||||
|
import kotlinx.coroutines.channels.ReceiveChannel
|
||||||
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
import kotlinx.coroutines.flow.collect
|
||||||
|
import kotlinx.coroutines.flow.flow
|
||||||
|
import kotlinx.coroutines.flow.receiveAsFlow
|
||||||
|
import kotlinx.coroutines.launch
|
||||||
|
import kotlinx.io.ByteArrayOutput
|
||||||
|
import kotlinx.io.Closeable
|
||||||
|
import mu.KLogger
|
||||||
|
|
||||||
|
abstract class Port : Closeable, CoroutineScope {
|
||||||
|
|
||||||
|
abstract val logger: KLogger
|
||||||
|
|
||||||
|
private val outgoing = Channel<ByteArray>(100)
|
||||||
|
private val incoming = Channel<ByteArray>(Channel.CONFLATED)
|
||||||
|
val receiveChannel: ReceiveChannel<ByteArray> get() = incoming
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal method to synchronously send data
|
||||||
|
*/
|
||||||
|
protected abstract fun sendInternal(data: ByteArray)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal method to receive data synchronously
|
||||||
|
*/
|
||||||
|
protected fun receive(data: ByteArray) {
|
||||||
|
launch {
|
||||||
|
incoming.send(data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private val sendJob = launch {
|
||||||
|
//using special dispatcher to avoid threading problems
|
||||||
|
for (data in outgoing) {
|
||||||
|
try {
|
||||||
|
sendInternal(data)
|
||||||
|
logger.debug { "SEND: ${data.decodeToString()}" }
|
||||||
|
} catch (ex: Exception) {
|
||||||
|
logger.error(ex) { "Error while sending data" }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
suspend fun send(data: ByteArray) {
|
||||||
|
outgoing.send(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
fun flow(): Flow<ByteArray> {
|
||||||
|
return incoming.receiveAsFlow()
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun close() {
|
||||||
|
cancel("The port is closed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send UTF-8 encoded string
|
||||||
|
*/
|
||||||
|
suspend fun Port.send(string: String) = send(string.encodeToByteArray())
|
||||||
|
|
||||||
|
fun Flow<ByteArray>.withDelimiter(delimiter: ByteArray, expectedMessageSize: Int = 32): Flow<ByteArray> = flow {
|
||||||
|
require(delimiter.isNotEmpty()) { "Delimiter must not be empty" }
|
||||||
|
|
||||||
|
var output = ByteArrayOutput(expectedMessageSize)
|
||||||
|
var matcherPosition = 0
|
||||||
|
|
||||||
|
collect { chunk ->
|
||||||
|
chunk.forEach { byte ->
|
||||||
|
output.writeByte(byte)
|
||||||
|
//matching current symbol in delimiter
|
||||||
|
if (byte == delimiter[matcherPosition]) {
|
||||||
|
matcherPosition++
|
||||||
|
if (matcherPosition == delimiter.size) {
|
||||||
|
//full match achieved, sending result
|
||||||
|
emit(output.toByteArray())
|
||||||
|
output = ByteArrayOutput(expectedMessageSize)
|
||||||
|
matcherPosition = 0
|
||||||
|
}
|
||||||
|
} else if (matcherPosition > 0) {
|
||||||
|
//Reset matcher since full match not achieved
|
||||||
|
matcherPosition = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,96 @@
|
|||||||
|
package hep.dataforge.control.ports
|
||||||
|
|
||||||
|
import kotlinx.coroutines.*
|
||||||
|
import mu.KLogger
|
||||||
|
import mu.KotlinLogging
|
||||||
|
import java.net.InetSocketAddress
|
||||||
|
import java.nio.ByteBuffer
|
||||||
|
import java.nio.channels.AsynchronousCloseException
|
||||||
|
import java.nio.channels.AsynchronousSocketChannel
|
||||||
|
import java.nio.channels.CompletionHandler
|
||||||
|
import java.util.concurrent.Executors
|
||||||
|
import kotlin.coroutines.CoroutineContext
|
||||||
|
import kotlin.coroutines.resume
|
||||||
|
import kotlin.coroutines.resumeWithException
|
||||||
|
|
||||||
|
@Suppress("UNCHECKED_CAST")
|
||||||
|
private fun <T> asyncIOHandler(): CompletionHandler<T, CancellableContinuation<T>> =
|
||||||
|
object : CompletionHandler<T, CancellableContinuation<T>> {
|
||||||
|
override fun completed(result: T, cont: CancellableContinuation<T>) {
|
||||||
|
cont.resume(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun failed(ex: Throwable, cont: CancellableContinuation<T>) {
|
||||||
|
// just return if already cancelled and got an expected exception for that case
|
||||||
|
if (ex is AsynchronousCloseException && cont.isCancelled) return
|
||||||
|
cont.resumeWithException(ex)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
suspend fun AsynchronousSocketChannel.readSuspended(
|
||||||
|
buf: ByteBuffer
|
||||||
|
) = suspendCancellableCoroutine<Int> { cont ->
|
||||||
|
read(buf, cont, asyncIOHandler<Int>())
|
||||||
|
cont.invokeOnCancellation {
|
||||||
|
try {
|
||||||
|
close()
|
||||||
|
} catch (ex: Throwable) {
|
||||||
|
// Specification says that it is Ok to call it any time, but reality is different,
|
||||||
|
// so we have just to ignore exception
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private fun ByteBuffer.toArray(limit: Int = limit()): ByteArray{
|
||||||
|
rewind()
|
||||||
|
val response = ByteArray(limit)
|
||||||
|
get(response)
|
||||||
|
rewind()
|
||||||
|
return response
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class TcpPort(
|
||||||
|
parentScope: CoroutineScope,
|
||||||
|
val ip: String,
|
||||||
|
val port: Int
|
||||||
|
) : Port() {
|
||||||
|
|
||||||
|
override val logger: KLogger = KotlinLogging.logger("[tcp]$ip:$port")
|
||||||
|
|
||||||
|
private val executor = Executors.newSingleThreadExecutor { r ->
|
||||||
|
Thread(r).apply {
|
||||||
|
name = "[tcp]$ip:$port"
|
||||||
|
priority = Thread.MAX_PRIORITY
|
||||||
|
}
|
||||||
|
}
|
||||||
|
override val coroutineContext: CoroutineContext = parentScope.coroutineContext + executor.asCoroutineDispatcher()
|
||||||
|
|
||||||
|
private var socket: AsynchronousSocketChannel = openSocket()
|
||||||
|
|
||||||
|
private fun openSocket()= AsynchronousSocketChannel.open().bind(InetSocketAddress(ip, port))
|
||||||
|
|
||||||
|
private val listenerJob = launch {
|
||||||
|
val buffer = ByteBuffer.allocate(1024)
|
||||||
|
while (isActive) {
|
||||||
|
try {
|
||||||
|
val num = socket.readSuspended(buffer)
|
||||||
|
if (num > 0) {
|
||||||
|
receive(buffer.toArray(num))
|
||||||
|
}
|
||||||
|
} catch (ex: Exception) {
|
||||||
|
logger.error("Channel read error", ex)
|
||||||
|
delay(100)
|
||||||
|
logger.info("Reconnecting")
|
||||||
|
socket = openSocket()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun sendInternal(data: ByteArray) {
|
||||||
|
if (!socket.isOpen) socket = openSocket()
|
||||||
|
socket.write(ByteBuffer.wrap(data))
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,24 @@
|
|||||||
|
package hep.dataforge.control.ports
|
||||||
|
|
||||||
|
import kotlinx.coroutines.flow.flowOf
|
||||||
|
import kotlinx.coroutines.flow.map
|
||||||
|
import kotlinx.coroutines.flow.toList
|
||||||
|
import kotlinx.coroutines.runBlocking
|
||||||
|
import org.junit.jupiter.api.Test
|
||||||
|
import kotlin.test.assertEquals
|
||||||
|
|
||||||
|
|
||||||
|
internal class PortIOTest{
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun testDelimiteredByteArrayFlow(){
|
||||||
|
val flow = flowOf("bb?b","ddd?",":defgb?:ddf","34fb?:--").map { it.encodeToByteArray() }
|
||||||
|
val chunked = flow.withDelimiter("?:".encodeToByteArray())
|
||||||
|
runBlocking {
|
||||||
|
val result = chunked.toList()
|
||||||
|
assertEquals("bb?bddd?:",result[0].decodeToString())
|
||||||
|
assertEquals("defgb?:", result[1].decodeToString())
|
||||||
|
assertEquals("ddf34fb?:", result[2].decodeToString())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,6 @@
|
|||||||
pluginManagement {
|
pluginManagement {
|
||||||
val kotlinVersion = "1.3.72"
|
val kotlinVersion = "1.3.72"
|
||||||
val toolsVersion = "0.5.0"
|
val toolsVersion = "0.5.2"
|
||||||
|
|
||||||
repositories {
|
repositories {
|
||||||
mavenLocal()
|
mavenLocal()
|
||||||
|
Loading…
Reference in New Issue
Block a user