Generalize nio ports architecture. Add Udp(datagram) port

This commit is contained in:
Alexander Nozik 2023-05-18 09:30:26 +03:00
parent 5e91ec9a97
commit a6bf9b8db6
2 changed files with 70 additions and 30 deletions

View File

@ -10,6 +10,8 @@ import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.string import space.kscience.dataforge.meta.string
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.nio.channels.ByteChannel
import java.nio.channels.DatagramChannel
import java.nio.channels.SocketChannel import java.nio.channels.SocketChannel
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
@ -21,19 +23,17 @@ internal fun ByteBuffer.readArray(limit: Int = limit()): ByteArray {
return response return response
} }
public class TcpPort private constructor( /**
* A port based on nio [ByteChannel]
*/
public class ChannelPort (
context: Context, context: Context,
public val host: String,
public val port: Int,
coroutineContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
channelBuilder: suspend () -> ByteChannel,
) : AbstractPort(context, coroutineContext), AutoCloseable { ) : AbstractPort(context, coroutineContext), AutoCloseable {
override fun toString(): String = "port[tcp:$host:$port]" private val futureChannel: Deferred<ByteChannel> = this.scope.async(Dispatchers.IO) {
channelBuilder()
private val futureChannel: Deferred<SocketChannel> = this.scope.async(Dispatchers.IO) {
SocketChannel.open(InetSocketAddress(host, port)).apply {
configureBlocking(false)
}
} }
/** /**
@ -58,7 +58,7 @@ public class TcpPort private constructor(
} }
} }
override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO){ override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) {
futureChannel.await().write(ByteBuffer.wrap(data)) futureChannel.await().write(ByteBuffer.wrap(data))
} }
@ -72,24 +72,56 @@ public class TcpPort private constructor(
} }
super.close() super.close()
} }
}
public companion object : PortFactory { /**
* A [PortFactory] for TCP connections
*/
public object TcpPort : PortFactory {
override val type: String = "tcp" override val type: String = "tcp"
public fun open( public fun open(
context: Context, context: Context,
host: String, host: String,
port: Int, port: Int,
coroutineContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
): TcpPort { ): ChannelPort = ChannelPort(context,coroutineContext){
return TcpPort(context, host, port, coroutineContext) SocketChannel.open(InetSocketAddress(host, port)).apply {
} configureBlocking(false)
override fun build(context: Context, meta: Meta): Port {
val host = meta["host"].string ?: "localhost"
val port = meta["port"].int ?: error("Port value for TCP port is not defined in $meta")
return open(context, host, port)
} }
} }
override fun build(context: Context, meta: Meta): ChannelPort {
val host = meta["host"].string ?: "localhost"
val port = meta["port"].int ?: error("Port value for TCP port is not defined in $meta")
return open(context, host, port)
}
}
/**
* A [PortFactory] for UDP connections
*/
public object UdpPort : PortFactory {
override val type: String = "udp"
public fun open(
context: Context,
host: String,
port: Int,
coroutineContext: CoroutineContext = context.coroutineContext,
): ChannelPort = ChannelPort(context,coroutineContext){
DatagramChannel.open().apply {
bind(InetSocketAddress(host, port))
configureBlocking(false)
}
}
override fun build(context: Context, meta: Meta): ChannelPort {
val host = meta["host"].string ?: "localhost"
val port = meta["port"].int ?: error("Port value for UDP port is not defined in $meta")
return open(context, host, port)
}
} }

View File

@ -6,21 +6,29 @@ import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.parseAsName
public class TcpPortPlugin : AbstractPlugin() { /**
* A plugin for loading JVM nio-based ports
*/
public class JvmPortsPlugin : AbstractPlugin() {
public val ports: Ports by require(Ports)
override val tag: PluginTag get() = Companion.tag override val tag: PluginTag get() = Companion.tag
override fun content(target: String): Map<Name, Any> = when(target){ override fun content(target: String): Map<Name, Any> = when(target){
PortFactory.TYPE -> mapOf(Name.EMPTY to TcpPort) PortFactory.TYPE -> mapOf(
TcpPort.type.parseAsName() to TcpPort,
UdpPort.type.parseAsName() to UdpPort
)
else -> emptyMap() else -> emptyMap()
} }
public companion object : PluginFactory<TcpPortPlugin> { public companion object : PluginFactory<JvmPortsPlugin> {
override val tag: PluginTag = PluginTag("controls.ports.tcp", group = PluginTag.DATAFORGE_GROUP) override val tag: PluginTag = PluginTag("controls.ports.jvm", group = PluginTag.DATAFORGE_GROUP)
override fun build(context: Context, meta: Meta): TcpPortPlugin = TcpPortPlugin() override fun build(context: Context, meta: Meta): JvmPortsPlugin = JvmPortsPlugin()
} }