diff --git a/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/ChannelPort.kt b/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/ChannelPort.kt index 6ddc1ab..04d1eb9 100644 --- a/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/ChannelPort.kt +++ b/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/ChannelPort.kt @@ -4,10 +4,7 @@ import kotlinx.coroutines.* import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.error import space.kscience.dataforge.context.logger -import space.kscience.dataforge.meta.Meta -import space.kscience.dataforge.meta.get -import space.kscience.dataforge.meta.int -import space.kscience.dataforge.meta.string +import space.kscience.dataforge.meta.* import java.net.InetSocketAddress import java.nio.ByteBuffer import java.nio.channels.ByteChannel @@ -26,7 +23,7 @@ public fun ByteBuffer.toArray(limit: Int = limit()): ByteArray { /** * A port based on nio [ByteChannel] */ -public class ChannelPort ( +public class ChannelPort( context: Context, coroutineContext: CoroutineContext = context.coroutineContext, channelBuilder: suspend () -> ByteChannel, @@ -86,7 +83,7 @@ public object TcpPort : PortFactory { host: String, port: Int, coroutineContext: CoroutineContext = context.coroutineContext, - ): ChannelPort = ChannelPort(context,coroutineContext){ + ): ChannelPort = ChannelPort(context, coroutineContext) { SocketChannel.open(InetSocketAddress(host, port)) } @@ -105,20 +102,30 @@ public object UdpPort : PortFactory { override val type: String = "udp" + /** + * Connect a datagram channel to a remote host/port. If [localPort] is provided, it is used to bind local port for receiving messages. + */ public fun open( context: Context, - host: String, - port: Int, + remoteHost: String, + remotePort: Int, + localPort: Int? = null, + localHost: String = "localhost", coroutineContext: CoroutineContext = context.coroutineContext, - ): ChannelPort = ChannelPort(context,coroutineContext){ + ): ChannelPort = ChannelPort(context, coroutineContext) { DatagramChannel.open().apply { - connect(InetSocketAddress(host, port)) + //bind the channel to a local port to receive messages + localPort?.let { bind(InetSocketAddress(localHost, localPort)) } + //connect to remote port to send messages + connect(InetSocketAddress(remoteHost, remotePort)) } } override fun build(context: Context, meta: Meta): ChannelPort { - val host = meta["host"].string ?: "localhost" - val port = meta["port"].int ?: error("Port value for UDP port is not defined in $meta") - return open(context, host, port) + val remoteHost by meta.string { error("Remote host is not specified") } + val remotePort by meta.number { error("Remote port is not specified") } + val localHost: String? by meta.string() + val localPort: Int? by meta.int() + return open(context, remoteHost, remotePort.toInt(), localPort, localHost ?: "localhost") } } \ No newline at end of file diff --git a/controls-ktor-tcp/README.md b/controls-ports-ktor/README.md similarity index 100% rename from controls-ktor-tcp/README.md rename to controls-ports-ktor/README.md diff --git a/controls-ktor-tcp/build.gradle.kts b/controls-ports-ktor/build.gradle.kts similarity index 100% rename from controls-ktor-tcp/build.gradle.kts rename to controls-ports-ktor/build.gradle.kts diff --git a/controls-ktor-tcp/src/main/kotlin/space/kscience/controls/ports/KtorTcpPortPlugin.kt b/controls-ports-ktor/src/main/kotlin/space/kscience/controls/ports/KtorPortsPlugin.kt similarity index 51% rename from controls-ktor-tcp/src/main/kotlin/space/kscience/controls/ports/KtorTcpPortPlugin.kt rename to controls-ports-ktor/src/main/kotlin/space/kscience/controls/ports/KtorPortsPlugin.kt index 079f457..9c6dbba 100644 --- a/controls-ktor-tcp/src/main/kotlin/space/kscience/controls/ports/KtorTcpPortPlugin.kt +++ b/controls-ports-ktor/src/main/kotlin/space/kscience/controls/ports/KtorPortsPlugin.kt @@ -6,21 +6,22 @@ import space.kscience.dataforge.context.PluginFactory import space.kscience.dataforge.context.PluginTag import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.asName -public class KtorTcpPortPlugin : AbstractPlugin() { +public class KtorPortsPlugin : AbstractPlugin() { override val tag: PluginTag get() = Companion.tag - override fun content(target: String): Map = when(target){ - PortFactory.TYPE -> mapOf(Name.EMPTY to KtorTcpPort) + override fun content(target: String): Map = when (target) { + PortFactory.TYPE -> mapOf("tcp".asName() to KtorTcpPort, "udp".asName() to KtorUdpPort) else -> emptyMap() } - public companion object : PluginFactory { + public companion object : PluginFactory { - override val tag: PluginTag = PluginTag("controls.ports.serial", group = PluginTag.DATAFORGE_GROUP) + override val tag: PluginTag = PluginTag("controls.ports.ktor", group = PluginTag.DATAFORGE_GROUP) - override fun build(context: Context, meta: Meta): KtorTcpPortPlugin = KtorTcpPortPlugin() + override fun build(context: Context, meta: Meta): KtorPortsPlugin = KtorPortsPlugin() } diff --git a/controls-ktor-tcp/src/main/kotlin/space/kscience/controls/ports/KtorTcpPort.kt b/controls-ports-ktor/src/main/kotlin/space/kscience/controls/ports/KtorTcpPort.kt similarity index 100% rename from controls-ktor-tcp/src/main/kotlin/space/kscience/controls/ports/KtorTcpPort.kt rename to controls-ports-ktor/src/main/kotlin/space/kscience/controls/ports/KtorTcpPort.kt diff --git a/controls-ports-ktor/src/main/kotlin/space/kscience/controls/ports/KtorUdpPort.kt b/controls-ports-ktor/src/main/kotlin/space/kscience/controls/ports/KtorUdpPort.kt new file mode 100644 index 0000000..8b8446c --- /dev/null +++ b/controls-ports-ktor/src/main/kotlin/space/kscience/controls/ports/KtorUdpPort.kt @@ -0,0 +1,87 @@ +package space.kscience.controls.ports + +import io.ktor.network.selector.ActorSelectorManager +import io.ktor.network.sockets.InetSocketAddress +import io.ktor.network.sockets.aSocket +import io.ktor.network.sockets.openReadChannel +import io.ktor.network.sockets.openWriteChannel +import io.ktor.utils.io.consumeEachBufferRange +import io.ktor.utils.io.core.Closeable +import io.ktor.utils.io.writeAvailable +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import space.kscience.dataforge.context.Context +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.int +import space.kscience.dataforge.meta.number +import space.kscience.dataforge.meta.string +import kotlin.coroutines.CoroutineContext + +public class KtorUdpPort internal constructor( + context: Context, + public val remoteHost: String, + public val remotePort: Int, + public val localPort: Int? = null, + public val localHost: String = "localhost", + coroutineContext: CoroutineContext = context.coroutineContext, +) : AbstractPort(context, coroutineContext), Closeable { + + override fun toString(): String = "port[udp:$remoteHost:$remotePort]" + + private val futureSocket = scope.async { + aSocket(ActorSelectorManager(Dispatchers.IO)).udp().connect( + remoteAddress = InetSocketAddress(remoteHost, remotePort), + localAddress = localPort?.let { InetSocketAddress(localHost, localPort) } + ) + } + + private val writeChannel = scope.async { + futureSocket.await().openWriteChannel(true) + } + + private val listenerJob = scope.launch { + val input = futureSocket.await().openReadChannel() + input.consumeEachBufferRange { buffer, _ -> + val array = ByteArray(buffer.remaining()) + buffer.get(array) + receive(array) + isActive + } + } + + override suspend fun write(data: ByteArray) { + writeChannel.await().writeAvailable(data) + } + + override fun close() { + listenerJob.cancel() + futureSocket.cancel() + super.close() + } + + public companion object : PortFactory { + + override val type: String = "udp" + + public fun open( + context: Context, + remoteHost: String, + remotePort: Int, + localPort: Int? = null, + localHost: String = "localhost", + coroutineContext: CoroutineContext = context.coroutineContext, + ): KtorUdpPort { + return KtorUdpPort(context, remoteHost, remotePort, localPort, localHost, coroutineContext) + } + + override fun build(context: Context, meta: Meta): Port { + val remoteHost by meta.string { error("Remote host is not specified") } + val remotePort by meta.number { error("Remote port is not specified") } + val localHost: String? by meta.string() + val localPort: Int? by meta.int() + return open(context, remoteHost, remotePort.toInt(), localPort, localHost ?: "localhost") + } + } +} \ No newline at end of file diff --git a/controls-server/build.gradle.kts b/controls-server/build.gradle.kts index d661459..2bf9b01 100644 --- a/controls-server/build.gradle.kts +++ b/controls-server/build.gradle.kts @@ -12,7 +12,7 @@ val ktorVersion: String by rootProject.extra dependencies { implementation(projects.controlsCore) - implementation(projects.controlsKtorTcp) + implementation(projects.controlsPortsKtor) implementation(projects.magix.magixServer) implementation("io.ktor:ktor-server-cio:$ktorVersion") implementation("io.ktor:ktor-server-websockets:$ktorVersion") diff --git a/demo/mks-pdr900/build.gradle.kts b/demo/mks-pdr900/build.gradle.kts index bae6263..dfa7543 100644 --- a/demo/mks-pdr900/build.gradle.kts +++ b/demo/mks-pdr900/build.gradle.kts @@ -17,5 +17,5 @@ val ktorVersion: String by rootProject.extra val dataforgeVersion: String by extra dependencies { - implementation(projects.controlsKtorTcp) + implementation(projects.controlsPortsKtor) } diff --git a/demo/motors/build.gradle.kts b/demo/motors/build.gradle.kts index b60ebb9..0acea4a 100644 --- a/demo/motors/build.gradle.kts +++ b/demo/motors/build.gradle.kts @@ -23,7 +23,7 @@ val ktorVersion: String by rootProject.extra val dataforgeVersion: String by extra dependencies { - implementation(project(":controls-ktor-tcp")) + implementation(project(":controls-ports-ktor")) implementation(project(":controls-magix-client")) implementation("no.tornado:tornadofx:1.7.20") } diff --git a/settings.gradle.kts b/settings.gradle.kts index 72a29bd..4fee116 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -41,7 +41,7 @@ dependencyResolutionManagement { include( ":controls-core", - ":controls-ktor-tcp", + ":controls-ports-ktor", ":controls-serial", ":controls-pi", ":controls-server",