Update UDP ports design. Rename ktor ports module.

This commit is contained in:
Alexander Nozik 2023-06-04 11:23:26 +03:00
parent 76cec0469f
commit 2aa26ea802
10 changed files with 118 additions and 23 deletions

View File

@ -4,10 +4,7 @@ import kotlinx.coroutines.*
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.meta.*
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.ByteChannel
@ -105,20 +102,30 @@ public object UdpPort : PortFactory {
override val type: String = "udp"
/**
* Connect a datagram channel to a remote host/port. If [localPort] is provided, it is used to bind local port for receiving messages.
*/
public fun open(
context: Context,
host: String,
port: Int,
remoteHost: String,
remotePort: Int,
localPort: Int? = null,
localHost: String = "localhost",
coroutineContext: CoroutineContext = context.coroutineContext,
): ChannelPort = ChannelPort(context, coroutineContext) {
DatagramChannel.open().apply {
connect(InetSocketAddress(host, port))
//bind the channel to a local port to receive messages
localPort?.let { bind(InetSocketAddress(localHost, localPort)) }
//connect to remote port to send messages
connect(InetSocketAddress(remoteHost, remotePort))
}
}
override fun build(context: Context, meta: Meta): ChannelPort {
val host = meta["host"].string ?: "localhost"
val port = meta["port"].int ?: error("Port value for UDP port is not defined in $meta")
return open(context, host, port)
val remoteHost by meta.string { error("Remote host is not specified") }
val remotePort by meta.number { error("Remote port is not specified") }
val localHost: String? by meta.string()
val localPort: Int? by meta.int()
return open(context, remoteHost, remotePort.toInt(), localPort, localHost ?: "localhost")
}
}

View File

@ -6,21 +6,22 @@ import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
public class KtorTcpPortPlugin : AbstractPlugin() {
public class KtorPortsPlugin : AbstractPlugin() {
override val tag: PluginTag get() = Companion.tag
override fun content(target: String): Map<Name, Any> = when (target) {
PortFactory.TYPE -> mapOf(Name.EMPTY to KtorTcpPort)
PortFactory.TYPE -> mapOf("tcp".asName() to KtorTcpPort, "udp".asName() to KtorUdpPort)
else -> emptyMap()
}
public companion object : PluginFactory<KtorTcpPortPlugin> {
public companion object : PluginFactory<KtorPortsPlugin> {
override val tag: PluginTag = PluginTag("controls.ports.serial", group = PluginTag.DATAFORGE_GROUP)
override val tag: PluginTag = PluginTag("controls.ports.ktor", group = PluginTag.DATAFORGE_GROUP)
override fun build(context: Context, meta: Meta): KtorTcpPortPlugin = KtorTcpPortPlugin()
override fun build(context: Context, meta: Meta): KtorPortsPlugin = KtorPortsPlugin()
}

View File

@ -0,0 +1,87 @@
package space.kscience.controls.ports
import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.InetSocketAddress
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.utils.io.consumeEachBufferRange
import io.ktor.utils.io.core.Closeable
import io.ktor.utils.io.writeAvailable
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.number
import space.kscience.dataforge.meta.string
import kotlin.coroutines.CoroutineContext
public class KtorUdpPort internal constructor(
context: Context,
public val remoteHost: String,
public val remotePort: Int,
public val localPort: Int? = null,
public val localHost: String = "localhost",
coroutineContext: CoroutineContext = context.coroutineContext,
) : AbstractPort(context, coroutineContext), Closeable {
override fun toString(): String = "port[udp:$remoteHost:$remotePort]"
private val futureSocket = scope.async {
aSocket(ActorSelectorManager(Dispatchers.IO)).udp().connect(
remoteAddress = InetSocketAddress(remoteHost, remotePort),
localAddress = localPort?.let { InetSocketAddress(localHost, localPort) }
)
}
private val writeChannel = scope.async {
futureSocket.await().openWriteChannel(true)
}
private val listenerJob = scope.launch {
val input = futureSocket.await().openReadChannel()
input.consumeEachBufferRange { buffer, _ ->
val array = ByteArray(buffer.remaining())
buffer.get(array)
receive(array)
isActive
}
}
override suspend fun write(data: ByteArray) {
writeChannel.await().writeAvailable(data)
}
override fun close() {
listenerJob.cancel()
futureSocket.cancel()
super.close()
}
public companion object : PortFactory {
override val type: String = "udp"
public fun open(
context: Context,
remoteHost: String,
remotePort: Int,
localPort: Int? = null,
localHost: String = "localhost",
coroutineContext: CoroutineContext = context.coroutineContext,
): KtorUdpPort {
return KtorUdpPort(context, remoteHost, remotePort, localPort, localHost, coroutineContext)
}
override fun build(context: Context, meta: Meta): Port {
val remoteHost by meta.string { error("Remote host is not specified") }
val remotePort by meta.number { error("Remote port is not specified") }
val localHost: String? by meta.string()
val localPort: Int? by meta.int()
return open(context, remoteHost, remotePort.toInt(), localPort, localHost ?: "localhost")
}
}
}

View File

@ -12,7 +12,7 @@ val ktorVersion: String by rootProject.extra
dependencies {
implementation(projects.controlsCore)
implementation(projects.controlsKtorTcp)
implementation(projects.controlsPortsKtor)
implementation(projects.magix.magixServer)
implementation("io.ktor:ktor-server-cio:$ktorVersion")
implementation("io.ktor:ktor-server-websockets:$ktorVersion")

View File

@ -17,5 +17,5 @@ val ktorVersion: String by rootProject.extra
val dataforgeVersion: String by extra
dependencies {
implementation(projects.controlsKtorTcp)
implementation(projects.controlsPortsKtor)
}

View File

@ -23,7 +23,7 @@ val ktorVersion: String by rootProject.extra
val dataforgeVersion: String by extra
dependencies {
implementation(project(":controls-ktor-tcp"))
implementation(project(":controls-ports-ktor"))
implementation(project(":controls-magix-client"))
implementation("no.tornado:tornadofx:1.7.20")
}

View File

@ -41,7 +41,7 @@ dependencyResolutionManagement {
include(
":controls-core",
":controls-ktor-tcp",
":controls-ports-ktor",
":controls-serial",
":controls-pi",
":controls-server",