Minor fixes to port implementations

This commit is contained in:
Alexander Nozik 2023-12-15 16:55:56 +03:00
parent 5e64b79b77
commit 701ea8cf57
5 changed files with 79 additions and 17 deletions

View File

@ -0,0 +1,55 @@
package space.kscience.controls.ports
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import space.kscience.dataforge.context.Context
import java.net.DatagramPacket
import java.net.DatagramSocket
import kotlin.coroutines.CoroutineContext
/**
* A port based on [DatagramSocket] for cases, where [ChannelPort] does not work for some reason
*/
public class UdpSocketPort(
override val context: Context,
private val socket: DatagramSocket,
coroutineContext: CoroutineContext = context.coroutineContext,
) : AbstractPort(context, coroutineContext) {
private val listenerJob = context.launch(Dispatchers.IO) {
while (isActive) {
val buf = ByteArray(socket.receiveBufferSize)
val packet = DatagramPacket(
buf,
buf.size,
)
socket.receive(packet)
val bytes = packet.data.copyOfRange(
packet.offset,
packet.offset + packet.length
)
receive(bytes)
}
}
override fun close() {
listenerJob.cancel()
}
override fun isOpen(): Boolean = listenerJob.isActive
override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) {
val packet = DatagramPacket(
data,
data.size,
socket.remoteSocketAddress
)
socket.send(packet)
}
}

View File

@ -1,6 +1,7 @@
package space.kscience.controls.ports package space.kscience.controls.ports
import io.ktor.network.selector.ActorSelectorManager import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.SocketOptions
import io.ktor.network.sockets.aSocket import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel import io.ktor.network.sockets.openWriteChannel
@ -23,12 +24,13 @@ public class KtorTcpPort internal constructor(
public val host: String, public val host: String,
public val port: Int, public val port: Int,
coroutineContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {}
) : AbstractPort(context, coroutineContext), Closeable { ) : AbstractPort(context, coroutineContext), Closeable {
override fun toString(): String = "port[tcp:$host:$port]" override fun toString(): String = "port[tcp:$host:$port]"
private val futureSocket = scope.async { private val futureSocket = scope.async {
aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(host, port) aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(host, port, socketOptions)
} }
private val writeChannel = scope.async { private val writeChannel = scope.async {
@ -64,8 +66,9 @@ public class KtorTcpPort internal constructor(
host: String, host: String,
port: Int, port: Int,
coroutineContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {}
): KtorTcpPort { ): KtorTcpPort {
return KtorTcpPort(context, host, port, coroutineContext) return KtorTcpPort(context, host, port, coroutineContext, socketOptions)
} }
override fun build(context: Context, meta: Meta): Port { override fun build(context: Context, meta: Meta): Port {

View File

@ -1,17 +1,12 @@
package space.kscience.controls.ports package space.kscience.controls.ports
import io.ktor.network.selector.ActorSelectorManager import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.InetSocketAddress import io.ktor.network.sockets.*
import io.ktor.network.sockets.aSocket import io.ktor.utils.io.ByteWriteChannel
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.utils.io.consumeEachBufferRange import io.ktor.utils.io.consumeEachBufferRange
import io.ktor.utils.io.core.Closeable import io.ktor.utils.io.core.Closeable
import io.ktor.utils.io.writeAvailable import io.ktor.utils.io.writeAvailable
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.*
import kotlinx.coroutines.async
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.int import space.kscience.dataforge.meta.int
@ -26,6 +21,7 @@ public class KtorUdpPort internal constructor(
public val localPort: Int? = null, public val localPort: Int? = null,
public val localHost: String = "localhost", public val localHost: String = "localhost",
coroutineContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {}
) : AbstractPort(context, coroutineContext), Closeable { ) : AbstractPort(context, coroutineContext), Closeable {
override fun toString(): String = "port[udp:$remoteHost:$remotePort]" override fun toString(): String = "port[udp:$remoteHost:$remotePort]"
@ -33,11 +29,12 @@ public class KtorUdpPort internal constructor(
private val futureSocket = scope.async { private val futureSocket = scope.async {
aSocket(ActorSelectorManager(Dispatchers.IO)).udp().connect( aSocket(ActorSelectorManager(Dispatchers.IO)).udp().connect(
remoteAddress = InetSocketAddress(remoteHost, remotePort), remoteAddress = InetSocketAddress(remoteHost, remotePort),
localAddress = localPort?.let { InetSocketAddress(localHost, localPort) } localAddress = localPort?.let { InetSocketAddress(localHost, localPort) },
configure = socketOptions
) )
} }
private val writeChannel = scope.async { private val writeChannel: Deferred<ByteWriteChannel> = scope.async {
futureSocket.await().openWriteChannel(true) futureSocket.await().openWriteChannel(true)
} }
@ -72,9 +69,16 @@ public class KtorUdpPort internal constructor(
localPort: Int? = null, localPort: Int? = null,
localHost: String = "localhost", localHost: String = "localhost",
coroutineContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
): KtorUdpPort { socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {}
return KtorUdpPort(context, remoteHost, remotePort, localPort, localHost, coroutineContext) ): KtorUdpPort = KtorUdpPort(
} context = context,
remoteHost = remoteHost,
remotePort = remotePort,
localPort = localPort,
localHost = localHost,
coroutineContext = coroutineContext,
socketOptions = socketOptions
)
override fun build(context: Context, meta: Meta): Port { override fun build(context: Context, meta: Meta): Port {
val remoteHost by meta.string { error("Remote host is not specified") } val remoteHost by meta.string { error("Remote host is not specified") }

View File

@ -9,7 +9,7 @@ description = "Implementation of direct serial port communication with JSerialCo
dependencies{ dependencies{
api(project(":controls-core")) api(project(":controls-core"))
implementation("com.fazecast:jSerialComm:2.10.3") implementation("com.fazecast:jSerialComm:2.10.4")
} }
readme{ readme{

View File

@ -28,7 +28,7 @@ public class JSerialCommPort(
override fun getListeningEvents(): Int = SerialPort.LISTENING_EVENT_DATA_AVAILABLE override fun getListeningEvents(): Int = SerialPort.LISTENING_EVENT_DATA_AVAILABLE
override fun serialEvent(event: SerialPortEvent) { override fun serialEvent(event: SerialPortEvent) {
if (event.eventType == SerialPort.LISTENING_EVENT_DATA_AVAILABLE) { if (event.eventType == SerialPort.LISTENING_EVENT_DATA_AVAILABLE && event.receivedData != null) {
scope.launch { receive(event.receivedData) } scope.launch { receive(event.receivedData) }
} }
} }