diff --git a/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/TcpPort.kt b/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/ChannelPort.kt similarity index 55% rename from controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/TcpPort.kt rename to controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/ChannelPort.kt index 77fec44..79651d5 100644 --- a/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/TcpPort.kt +++ b/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/ChannelPort.kt @@ -10,6 +10,8 @@ import space.kscience.dataforge.meta.int import space.kscience.dataforge.meta.string import java.net.InetSocketAddress import java.nio.ByteBuffer +import java.nio.channels.ByteChannel +import java.nio.channels.DatagramChannel import java.nio.channels.SocketChannel import kotlin.coroutines.CoroutineContext @@ -21,19 +23,17 @@ internal fun ByteBuffer.readArray(limit: Int = limit()): ByteArray { return response } -public class TcpPort private constructor( +/** + * A port based on nio [ByteChannel] + */ +public class ChannelPort ( context: Context, - public val host: String, - public val port: Int, coroutineContext: CoroutineContext = context.coroutineContext, + channelBuilder: suspend () -> ByteChannel, ) : AbstractPort(context, coroutineContext), AutoCloseable { - override fun toString(): String = "port[tcp:$host:$port]" - - private val futureChannel: Deferred = this.scope.async(Dispatchers.IO) { - SocketChannel.open(InetSocketAddress(host, port)).apply { - configureBlocking(false) - } + private val futureChannel: Deferred = this.scope.async(Dispatchers.IO) { + channelBuilder() } /** @@ -58,7 +58,7 @@ public class TcpPort private constructor( } } - override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO){ + override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) { futureChannel.await().write(ByteBuffer.wrap(data)) } @@ -72,24 +72,56 @@ public class TcpPort private constructor( } super.close() } +} - public companion object : PortFactory { +/** + * A [PortFactory] for TCP connections + */ +public object TcpPort : PortFactory { - override val type: String = "tcp" + override val type: String = "tcp" - public fun open( - context: Context, - host: String, - port: Int, - coroutineContext: CoroutineContext = context.coroutineContext, - ): TcpPort { - return TcpPort(context, host, port, coroutineContext) - } - - override fun build(context: Context, meta: Meta): Port { - val host = meta["host"].string ?: "localhost" - val port = meta["port"].int ?: error("Port value for TCP port is not defined in $meta") - return open(context, host, port) + public fun open( + context: Context, + host: String, + port: Int, + coroutineContext: CoroutineContext = context.coroutineContext, + ): ChannelPort = ChannelPort(context,coroutineContext){ + SocketChannel.open(InetSocketAddress(host, port)).apply { + configureBlocking(false) } } + + override fun build(context: Context, meta: Meta): ChannelPort { + val host = meta["host"].string ?: "localhost" + val port = meta["port"].int ?: error("Port value for TCP port is not defined in $meta") + return open(context, host, port) + } +} + + +/** + * A [PortFactory] for UDP connections + */ +public object UdpPort : PortFactory { + + override val type: String = "udp" + + public fun open( + context: Context, + host: String, + port: Int, + coroutineContext: CoroutineContext = context.coroutineContext, + ): ChannelPort = ChannelPort(context,coroutineContext){ + DatagramChannel.open().apply { + bind(InetSocketAddress(host, port)) + configureBlocking(false) + } + } + + override fun build(context: Context, meta: Meta): ChannelPort { + val host = meta["host"].string ?: "localhost" + val port = meta["port"].int ?: error("Port value for UDP port is not defined in $meta") + return open(context, host, port) + } } \ No newline at end of file diff --git a/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/TcpPortPlugin.kt b/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/JvmPortsPlugin.kt similarity index 51% rename from controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/TcpPortPlugin.kt rename to controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/JvmPortsPlugin.kt index 592d0c3..d9d87e2 100644 --- a/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/TcpPortPlugin.kt +++ b/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/JvmPortsPlugin.kt @@ -6,21 +6,29 @@ import space.kscience.dataforge.context.PluginFactory import space.kscience.dataforge.context.PluginTag import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.parseAsName -public class TcpPortPlugin : AbstractPlugin() { +/** + * A plugin for loading JVM nio-based ports + */ +public class JvmPortsPlugin : AbstractPlugin() { + public val ports: Ports by require(Ports) override val tag: PluginTag get() = Companion.tag override fun content(target: String): Map = when(target){ - PortFactory.TYPE -> mapOf(Name.EMPTY to TcpPort) + PortFactory.TYPE -> mapOf( + TcpPort.type.parseAsName() to TcpPort, + UdpPort.type.parseAsName() to UdpPort + ) else -> emptyMap() } - public companion object : PluginFactory { + public companion object : PluginFactory { - override val tag: PluginTag = PluginTag("controls.ports.tcp", group = PluginTag.DATAFORGE_GROUP) + override val tag: PluginTag = PluginTag("controls.ports.jvm", group = PluginTag.DATAFORGE_GROUP) - override fun build(context: Context, meta: Meta): TcpPortPlugin = TcpPortPlugin() + override fun build(context: Context, meta: Meta): JvmPortsPlugin = JvmPortsPlugin() }