diff --git a/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/UdpSocketPort.kt b/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/UdpSocketPort.kt new file mode 100644 index 0000000..27bb0d8 --- /dev/null +++ b/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/UdpSocketPort.kt @@ -0,0 +1,55 @@ +package space.kscience.controls.ports + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import space.kscience.dataforge.context.Context +import java.net.DatagramPacket +import java.net.DatagramSocket +import kotlin.coroutines.CoroutineContext + +/** + * A port based on [DatagramSocket] for cases, where [ChannelPort] does not work for some reason + */ +public class UdpSocketPort( + override val context: Context, + private val socket: DatagramSocket, + coroutineContext: CoroutineContext = context.coroutineContext, +) : AbstractPort(context, coroutineContext) { + + private val listenerJob = context.launch(Dispatchers.IO) { + while (isActive) { + val buf = ByteArray(socket.receiveBufferSize) + + val packet = DatagramPacket( + buf, + buf.size, + ) + socket.receive(packet) + + val bytes = packet.data.copyOfRange( + packet.offset, + packet.offset + packet.length + ) + receive(bytes) + } + } + + override fun close() { + listenerJob.cancel() + } + + override fun isOpen(): Boolean = listenerJob.isActive + + + override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) { + val packet = DatagramPacket( + data, + data.size, + socket.remoteSocketAddress + ) + socket.send(packet) + } + +} \ No newline at end of file diff --git a/controls-ports-ktor/src/main/kotlin/space/kscience/controls/ports/KtorTcpPort.kt b/controls-ports-ktor/src/main/kotlin/space/kscience/controls/ports/KtorTcpPort.kt index 7f906d3..a7a698e 100644 --- a/controls-ports-ktor/src/main/kotlin/space/kscience/controls/ports/KtorTcpPort.kt +++ b/controls-ports-ktor/src/main/kotlin/space/kscience/controls/ports/KtorTcpPort.kt @@ -1,6 +1,7 @@ package space.kscience.controls.ports import io.ktor.network.selector.ActorSelectorManager +import io.ktor.network.sockets.SocketOptions import io.ktor.network.sockets.aSocket import io.ktor.network.sockets.openReadChannel import io.ktor.network.sockets.openWriteChannel @@ -23,12 +24,13 @@ public class KtorTcpPort internal constructor( public val host: String, public val port: Int, coroutineContext: CoroutineContext = context.coroutineContext, + socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {} ) : AbstractPort(context, coroutineContext), Closeable { override fun toString(): String = "port[tcp:$host:$port]" private val futureSocket = scope.async { - aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(host, port) + aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(host, port, socketOptions) } private val writeChannel = scope.async { @@ -64,8 +66,9 @@ public class KtorTcpPort internal constructor( host: String, port: Int, coroutineContext: CoroutineContext = context.coroutineContext, + socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {} ): KtorTcpPort { - return KtorTcpPort(context, host, port, coroutineContext) + return KtorTcpPort(context, host, port, coroutineContext, socketOptions) } override fun build(context: Context, meta: Meta): Port { diff --git a/controls-ports-ktor/src/main/kotlin/space/kscience/controls/ports/KtorUdpPort.kt b/controls-ports-ktor/src/main/kotlin/space/kscience/controls/ports/KtorUdpPort.kt index 8b8446c..1f30914 100644 --- a/controls-ports-ktor/src/main/kotlin/space/kscience/controls/ports/KtorUdpPort.kt +++ b/controls-ports-ktor/src/main/kotlin/space/kscience/controls/ports/KtorUdpPort.kt @@ -1,17 +1,12 @@ package space.kscience.controls.ports import io.ktor.network.selector.ActorSelectorManager -import io.ktor.network.sockets.InetSocketAddress -import io.ktor.network.sockets.aSocket -import io.ktor.network.sockets.openReadChannel -import io.ktor.network.sockets.openWriteChannel +import io.ktor.network.sockets.* +import io.ktor.utils.io.ByteWriteChannel import io.ktor.utils.io.consumeEachBufferRange import io.ktor.utils.io.core.Closeable import io.ktor.utils.io.writeAvailable -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.async -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch +import kotlinx.coroutines.* import space.kscience.dataforge.context.Context import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.int @@ -26,6 +21,7 @@ public class KtorUdpPort internal constructor( public val localPort: Int? = null, public val localHost: String = "localhost", coroutineContext: CoroutineContext = context.coroutineContext, + socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {} ) : AbstractPort(context, coroutineContext), Closeable { override fun toString(): String = "port[udp:$remoteHost:$remotePort]" @@ -33,11 +29,12 @@ public class KtorUdpPort internal constructor( private val futureSocket = scope.async { aSocket(ActorSelectorManager(Dispatchers.IO)).udp().connect( remoteAddress = InetSocketAddress(remoteHost, remotePort), - localAddress = localPort?.let { InetSocketAddress(localHost, localPort) } + localAddress = localPort?.let { InetSocketAddress(localHost, localPort) }, + configure = socketOptions ) } - private val writeChannel = scope.async { + private val writeChannel: Deferred = scope.async { futureSocket.await().openWriteChannel(true) } @@ -72,9 +69,16 @@ public class KtorUdpPort internal constructor( localPort: Int? = null, localHost: String = "localhost", coroutineContext: CoroutineContext = context.coroutineContext, - ): KtorUdpPort { - return KtorUdpPort(context, remoteHost, remotePort, localPort, localHost, coroutineContext) - } + socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {} + ): KtorUdpPort = KtorUdpPort( + context = context, + remoteHost = remoteHost, + remotePort = remotePort, + localPort = localPort, + localHost = localHost, + coroutineContext = coroutineContext, + socketOptions = socketOptions + ) override fun build(context: Context, meta: Meta): Port { val remoteHost by meta.string { error("Remote host is not specified") } diff --git a/controls-serial/build.gradle.kts b/controls-serial/build.gradle.kts index a9afc41..aa950e1 100644 --- a/controls-serial/build.gradle.kts +++ b/controls-serial/build.gradle.kts @@ -9,7 +9,7 @@ description = "Implementation of direct serial port communication with JSerialCo dependencies{ api(project(":controls-core")) - implementation("com.fazecast:jSerialComm:2.10.3") + implementation("com.fazecast:jSerialComm:2.10.4") } readme{ diff --git a/controls-serial/src/main/kotlin/space/kscience/controls/serial/JSerialCommPort.kt b/controls-serial/src/main/kotlin/space/kscience/controls/serial/JSerialCommPort.kt index 3e0601c..8a2caab 100644 --- a/controls-serial/src/main/kotlin/space/kscience/controls/serial/JSerialCommPort.kt +++ b/controls-serial/src/main/kotlin/space/kscience/controls/serial/JSerialCommPort.kt @@ -28,7 +28,7 @@ public class JSerialCommPort( override fun getListeningEvents(): Int = SerialPort.LISTENING_EVENT_DATA_AVAILABLE override fun serialEvent(event: SerialPortEvent) { - if (event.eventType == SerialPort.LISTENING_EVENT_DATA_AVAILABLE) { + if (event.eventType == SerialPort.LISTENING_EVENT_DATA_AVAILABLE && event.receivedData != null) { scope.launch { receive(event.receivedData) } } }