From c28944e10f5861c8f69390a8589e12a5b21556d4 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sun, 23 Jul 2023 14:01:47 +0300 Subject: [PATCH] Refactor connection infrastructure --- .../opcua/client/OpcUaDeviceBySpec.kt | 18 +++--- .../controls/opcua/client/miloClient.kt | 14 ++--- .../controls/opcua/server/DeviceNameSpace.kt | 30 ++-------- .../space/kscience/controls/demo/echo/main.kt | 4 +- .../kscience/controls/demo/MassDevice.kt | 27 ++++----- .../pimotionmaster/fxDeviceProperties.kt | 14 ++--- .../kscience/magix/api/MagixFlowPlugin.kt | 22 +++++++- .../kscience/magix/connections/magixPortal.kt | 30 ++++++++++ .../magix/rsocket/RSocketMagixEndpoint.kt | 8 ++- .../rsocket/RSocketStreamMagixEndpoint.kt | 33 +++++------ .../space/kscience/magix/rsocket/withTcp.kt | 4 +- .../magix/server/RSocketMagixFlowPlugin.kt | 56 ++++++++++++++----- .../kscience/magix/server/magixModule.kt | 7 ++- .../kscince/magix/zmq/ZmqMagixEndpoint.kt | 1 - .../kscince/magix/zmq/ZmqMagixFlowPlugin.kt | 13 +++-- 15 files changed, 172 insertions(+), 109 deletions(-) create mode 100644 magix/magix-api/src/commonMain/kotlin/space/kscience/magix/connections/magixPortal.kt diff --git a/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/client/OpcUaDeviceBySpec.kt b/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/client/OpcUaDeviceBySpec.kt index 821b693..eb9b688 100644 --- a/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/client/OpcUaDeviceBySpec.kt +++ b/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/client/OpcUaDeviceBySpec.kt @@ -1,7 +1,7 @@ package space.kscience.controls.opcua.client import org.eclipse.milo.opcua.sdk.client.OpcUaClient -import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider +import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy import space.kscience.controls.api.Device @@ -12,12 +12,12 @@ import space.kscience.dataforge.context.Global import space.kscience.dataforge.meta.* -public sealed class MiloIdentity: Scheme() +public sealed class MiloIdentity : Scheme() public class MiloUsername : MiloIdentity() { - public var username: String by string{ error("Username not defined") } - public var password: String by string{ error("Password not defined") } + public var username: String by string { error("Username not defined") } + public var password: String by string { error("Password not defined") } public companion object : SchemeSpec(::MiloUsername) } @@ -35,6 +35,12 @@ public class MiloConfiguration : Scheme() { public var securityPolicy: SecurityPolicy by enum(SecurityPolicy.None) + internal fun configureClient(builder: OpcUaClientConfigBuilder) { + username?.let { + builder.setIdentityProvider(UsernameProvider(it.username, it.password)) + } + } + public companion object : SchemeSpec(::MiloConfiguration) } @@ -51,9 +57,7 @@ public open class OpcUaDeviceBySpec( context.createOpcUaClient( config.endpointUrl, securityPolicy = config.securityPolicy, - identityProvider = config.username?.let { - UsernameProvider(it.username,it.password) - } ?: AnonymousProvider() + opcClientConfig = { config.configureClient(this) } ).apply { connect().get() } diff --git a/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/client/miloClient.kt b/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/client/miloClient.kt index face6cc..8149bd9 100644 --- a/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/client/miloClient.kt +++ b/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/client/miloClient.kt @@ -3,7 +3,6 @@ package space.kscience.controls.opcua.client import org.eclipse.milo.opcua.sdk.client.OpcUaClient import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider -import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider import org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy @@ -18,14 +17,14 @@ import java.nio.file.Path import java.nio.file.Paths import java.util.* -internal fun T?.toOptional(): Optional = if(this == null) Optional.empty() else Optional.of(this) +internal fun T?.toOptional(): Optional = Optional.ofNullable(this) internal fun Context.createOpcUaClient( endpointUrl: String, //"opc.tcp://localhost:12686/milo" securityPolicy: SecurityPolicy = SecurityPolicy.Basic256Sha256, - identityProvider: IdentityProvider = AnonymousProvider(), - endpointFilter: (EndpointDescription?) -> Boolean = { securityPolicy.uri == it?.securityPolicyUri } + endpointFilter: (EndpointDescription?) -> Boolean = { securityPolicy.uri == it?.securityPolicyUri }, + opcClientConfig: OpcUaClientConfigBuilder.() -> Unit, ): OpcUaClient { val securityTempDir: Path = Paths.get(System.getProperty("java.io.tmpdir"), "client", "security") @@ -47,14 +46,15 @@ internal fun Context.createOpcUaClient( } ) { configBuilder: OpcUaClientConfigBuilder -> configBuilder - .setApplicationName(LocalizedText.english("Controls.kt")) - .setApplicationUri("urn:ru.mipt:npm:controls:opcua") + .setApplicationName(LocalizedText.english("Controls-kt")) + .setApplicationUri("urn:space.kscience:controls:opcua") // .setKeyPair(loader.getClientKeyPair()) // .setCertificate(loader.getClientCertificate()) // .setCertificateChain(loader.getClientCertificateChain()) .setCertificateValidator(certificateValidator) - .setIdentityProvider(identityProvider) + .setIdentityProvider(AnonymousProvider()) .setRequestTimeout(uint(5000)) + .apply(opcClientConfig) .build() } // .apply { diff --git a/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/server/DeviceNameSpace.kt b/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/server/DeviceNameSpace.kt index 6b8e44c..010c2c0 100644 --- a/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/server/DeviceNameSpace.kt +++ b/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/server/DeviceNameSpace.kt @@ -12,6 +12,7 @@ import org.eclipse.milo.opcua.sdk.server.api.ManagedNamespaceWithLifecycle import org.eclipse.milo.opcua.sdk.server.api.MonitoredItem import org.eclipse.milo.opcua.sdk.server.nodes.UaFolderNode import org.eclipse.milo.opcua.sdk.server.nodes.UaNode +import org.eclipse.milo.opcua.sdk.server.nodes.UaNodeContext import org.eclipse.milo.opcua.sdk.server.nodes.UaVariableNode import org.eclipse.milo.opcua.sdk.server.util.SubscriptionModel import org.eclipse.milo.opcua.stack.core.AttributeId @@ -27,7 +28,6 @@ import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.MetaSerializer import space.kscience.dataforge.meta.ValueType import space.kscience.dataforge.names.Name -import space.kscience.dataforge.names.asName import space.kscience.dataforge.names.plus @@ -50,25 +50,7 @@ public class DeviceNameSpace( lifecycleManager.addLifecycle(subscription) lifecycleManager.addStartupTask { - deviceManager.devices.forEach { (deviceName, device) -> - val tokenAsString = deviceName.toString() - val deviceFolder = UaFolderNode( - this.nodeContext, - newNodeId(tokenAsString), - newQualifiedName(tokenAsString), - LocalizedText.english(tokenAsString) - ) - deviceFolder.addReference( - Reference( - deviceFolder.nodeId, - Identifiers.Organizes, - Identifiers.ObjectsFolder.expanded(), - false - ) - ) - deviceFolder.registerDeviceNodes(deviceName.asName(), device) - this.nodeManager.addNode(deviceFolder) - } + nodeContext.registerHub(deviceManager, Name.EMPTY) } lifecycleManager.addLifecycle(object : Lifecycle { @@ -88,7 +70,7 @@ public class DeviceNameSpace( val node: UaVariableNode = UaVariableNode.UaVariableNodeBuilder(nodeContext).apply { - //for now use DF path as id + //for now, use DF paths as ids nodeId = newNodeId("${deviceName.tokens.joinToString("/")}/$propertyName") when { descriptor.readable && descriptor.writable -> { @@ -161,15 +143,15 @@ public class DeviceNameSpace( } //recursively add sub-devices if (device is DeviceHub) { - registerHub(device, deviceName) + nodeContext.registerHub(device, deviceName) } } - private fun UaNode.registerHub(hub: DeviceHub, namePrefix: Name) { + private fun UaNodeContext.registerHub(hub: DeviceHub, namePrefix: Name) { hub.devices.forEach { (deviceName, device) -> val tokenAsString = deviceName.toString() val deviceFolder = UaFolderNode( - this.nodeContext, + this, newNodeId(tokenAsString), newQualifiedName(tokenAsString), LocalizedText.english(tokenAsString) diff --git a/demo/echo/src/main/kotlin/space/kscience/controls/demo/echo/main.kt b/demo/echo/src/main/kotlin/space/kscience/controls/demo/echo/main.kt index 3dbb43d..f172b9a 100644 --- a/demo/echo/src/main/kotlin/space/kscience/controls/demo/echo/main.kt +++ b/demo/echo/src/main/kotlin/space/kscience/controls/demo/echo/main.kt @@ -60,14 +60,14 @@ private suspend fun MagixEndpoint.collectEcho(scope: CoroutineScope, n: Int) { @OptIn(ExperimentalTime::class) suspend fun main(): Unit = coroutineScope { launch(Dispatchers.Default) { - val server = startMagixServer(MagixFlowPlugin { _, flow -> + val server = startMagixServer(MagixFlowPlugin { _, flow, send -> val logger = LoggerFactory.getLogger("echo") //echo each message flow.onEach { message -> if (message.parentId == null) { val m = message.copy(origin = "loop", parentId = message.id, id = message.id + ".response") logger.info(m.toString()) - flow.emit(m) + send(m) } }.launchIn(this) }) diff --git a/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt b/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt index 6370d8d..6dae782 100644 --- a/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt +++ b/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt @@ -7,7 +7,6 @@ import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.datetime.Clock -import kotlinx.datetime.Instant import space.kscience.controls.client.connectToMagix import space.kscience.controls.client.controlsMagixFormat import space.kscience.controls.manager.DeviceManager @@ -21,7 +20,7 @@ import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.int import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.subscribe -import space.kscience.magix.rsocket.rSocketStreamWithTcp +import space.kscience.magix.rsocket.rSocketWithWebSockets import space.kscience.magix.server.RSocketMagixFlowPlugin import space.kscience.magix.server.startMagixServer import space.kscience.plotly.Plotly @@ -31,8 +30,10 @@ import space.kscience.plotly.plot import space.kscience.plotly.server.PlotlyUpdateMode import space.kscience.plotly.server.serve import space.kscience.plotly.server.show +import space.kscince.magix.zmq.ZmqMagixFlowPlugin import java.util.concurrent.ConcurrentHashMap import kotlin.random.Random +import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds @@ -48,7 +49,7 @@ class MassDevice(context: Context, meta: Meta) : DeviceBySpec(MassDe val value by doubleProperty { randomValue } override suspend fun MassDevice.onOpen() { - doRecurring(100.milliseconds) { + doRecurring(50.milliseconds) { read(value) } } @@ -60,13 +61,13 @@ fun main() { context.startMagixServer( RSocketMagixFlowPlugin(), -// ZmqMagixFlowPlugin() + ZmqMagixFlowPlugin() ) val numDevices = 100 - context.launch(Dispatchers.IO) { - repeat(numDevices) { + repeat(numDevices) { + context.launch(Dispatchers.IO) { val deviceContext = Context("Device${it}") { plugin(DeviceManager) } @@ -76,7 +77,7 @@ fun main() { deviceManager.install("device$it", MassDevice) val endpointId = "device$it" - val deviceEndpoint = MagixEndpoint.rSocketStreamWithTcp("localhost") + val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost") deviceManager.connectToMagix(deviceEndpoint, endpointId) } } @@ -90,21 +91,21 @@ fun main() { title = "Latest event" } bar { - launch(Dispatchers.Default){ - val monitorEndpoint = MagixEndpoint.rSocketStreamWithTcp("localhost") + launch(Dispatchers.IO) { + val monitorEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost") - val latest = ConcurrentHashMap() + val latest = ConcurrentHashMap() monitorEndpoint.subscribe(controlsMagixFormat).onEach { (magixMessage, payload) -> - latest[magixMessage.origin] = payload.time ?: Clock.System.now() + latest[magixMessage.origin] = Clock.System.now() - payload.time!! }.launchIn(this) while (isActive) { delay(200) - val now = Clock.System.now() val sorted = latest.mapKeys { it.key.substring(6).toInt() }.toSortedMap() + latest.clear() x.numbers = sorted.keys - y.numbers = sorted.values.map { now.minus(it).inWholeMilliseconds / 1000.0 } + y.numbers = sorted.values.map { it.inWholeMilliseconds / 1000.0 + 0.0001 } } } } diff --git a/demo/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/fxDeviceProperties.kt b/demo/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/fxDeviceProperties.kt index 93f9f7c..0328631 100644 --- a/demo/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/fxDeviceProperties.kt +++ b/demo/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/fxDeviceProperties.kt @@ -40,16 +40,12 @@ fun D.fxProperty(spec: WritableDevicePropertySpec): init { //Read incoming changes onPropertyChange(spec) { - if (it != null) { - runLater { - try { - set(it) - } catch (ex: Throwable) { - logger.info { "Failed to set property $name to $it" } - } + runLater { + try { + set(it) + } catch (ex: Throwable) { + logger.info { "Failed to set property $name to $it" } } - } else { - invalidated() } } diff --git a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFlowPlugin.kt b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFlowPlugin.kt index 8cf9ccd..83c95cc 100644 --- a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFlowPlugin.kt +++ b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFlowPlugin.kt @@ -2,8 +2,28 @@ package space.kscience.magix.api import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow +/** + * A plugin that could be inserted into basic loop implementation. + */ public fun interface MagixFlowPlugin { - public fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow): Job + + /** + * Attach a [Job] to magix loop. + * Receive messages from [receive]. + * Send messages via [sendMessage] + */ + public fun start( + scope: CoroutineScope, + receive: Flow, + sendMessage: suspend (MagixMessage) -> Unit, + ): Job + + /** + * Use the same [MutableSharedFlow] to send and receive messages. Could be a bottleneck in case of many plugins. + */ + public fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow): Job = + start(scope, magixFlow) { magixFlow.emit(it) } } \ No newline at end of file diff --git a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/connections/magixPortal.kt b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/connections/magixPortal.kt new file mode 100644 index 0000000..9ede63f --- /dev/null +++ b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/connections/magixPortal.kt @@ -0,0 +1,30 @@ +package space.kscience.magix.connections + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import space.kscience.magix.api.MagixEndpoint +import space.kscience.magix.api.MagixMessageFilter + +/** + * Create a gateway between two magix endpoints using filters for forward and backward message passing. + * Portal is useful to create segmented magix loops: + * * limit the load on given loop segment by filtering some messages; + * * use different loop implementations. + */ +public fun CoroutineScope.launchMagixPortal( + firstEndpoint: MagixEndpoint, + secondEndpoint: MagixEndpoint, + forwardFilter: MagixMessageFilter = MagixMessageFilter.ALL, + backwardFilter: MagixMessageFilter = MagixMessageFilter.ALL, +): Job = launch { + firstEndpoint.subscribe(forwardFilter).onEach { + secondEndpoint.broadcast(it) + }.launchIn(this) + + secondEndpoint.subscribe(backwardFilter).onEach { + firstEndpoint.broadcast(it) + }.launchIn(this) +} diff --git a/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketMagixEndpoint.kt b/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketMagixEndpoint.kt index 19cc8f8..d3f5a2d 100644 --- a/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketMagixEndpoint.kt +++ b/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketMagixEndpoint.kt @@ -10,7 +10,10 @@ import io.rsocket.kotlin.ktor.client.RSocketSupport import io.rsocket.kotlin.ktor.client.rSocket import io.rsocket.kotlin.payload.buildPayload import io.rsocket.kotlin.payload.data -import kotlinx.coroutines.* +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map @@ -18,7 +21,6 @@ import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessageFilter import space.kscience.magix.api.filter -import kotlin.coroutines.coroutineContext public class RSocketMagixEndpoint(private val rSocket: RSocket) : MagixEndpoint, Closeable { @@ -34,7 +36,7 @@ public class RSocketMagixEndpoint(private val rSocket: RSocket) : MagixEndpoint, }.filter(filter).flowOn(rSocket.coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined) } - override suspend fun broadcast(message: MagixMessage): Unit = withContext(coroutineContext) { + override suspend fun broadcast(message: MagixMessage): Unit { val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message)) } diff --git a/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketStreamMagixEndpoint.kt b/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketStreamMagixEndpoint.kt index e4eae79..025036e 100644 --- a/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketStreamMagixEndpoint.kt +++ b/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketStreamMagixEndpoint.kt @@ -10,20 +10,16 @@ import io.rsocket.kotlin.ktor.client.rSocket import io.rsocket.kotlin.payload.Payload import io.rsocket.kotlin.payload.buildPayload import io.rsocket.kotlin.payload.data -import kotlinx.coroutines.CoroutineDispatcher -import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.flow.map import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessageFilter import space.kscience.magix.api.filter -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.coroutineContext /** * RSocket endpoint based on an established channel. This way it works a lot faster than [RSocketMagixEndpoint] @@ -33,11 +29,10 @@ import kotlin.coroutines.coroutineContext */ public class RSocketStreamMagixEndpoint( private val rSocket: RSocket, - private val coroutineContext: CoroutineContext, public val streamFilter: MagixMessageFilter = MagixMessageFilter(), ) : MagixEndpoint, Closeable { - private val output: MutableSharedFlow = MutableSharedFlow() + private val output: Channel = Channel() private val input: Flow by lazy { rSocket.requestChannel( @@ -49,24 +44,22 @@ public class RSocketStreamMagixEndpoint( ) ) }, - output.map { message -> - buildPayload { - data(MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message)) - } - }.flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined) + output.consumeAsFlow() ) } override fun subscribe( filter: MagixMessageFilter, - ): Flow { - return input.map { - MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText()) - }.filter(filter).flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined) - } + ): Flow = input.map { + MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText()) + }.filter(filter) override suspend fun broadcast(message: MagixMessage): Unit { - output.emit(message) + output.send( + buildPayload { + data(MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message)) + } + ) } override fun close() { @@ -95,5 +88,5 @@ public suspend fun MagixEndpoint.Companion.rSocketStreamWithWebSockets( client.close() } - return RSocketStreamMagixEndpoint(rSocket, coroutineContext, filter) + return RSocketStreamMagixEndpoint(rSocket, filter) } \ No newline at end of file diff --git a/magix/magix-rsocket/src/jvmMain/kotlin/space/kscience/magix/rsocket/withTcp.kt b/magix/magix-rsocket/src/jvmMain/kotlin/space/kscience/magix/rsocket/withTcp.kt index 9dc0abd..02a6c9b 100644 --- a/magix/magix-rsocket/src/jvmMain/kotlin/space/kscience/magix/rsocket/withTcp.kt +++ b/magix/magix-rsocket/src/jvmMain/kotlin/space/kscience/magix/rsocket/withTcp.kt @@ -20,6 +20,7 @@ public suspend fun MagixEndpoint.Companion.rSocketWithTcp( val transport = TcpClientTransport( hostname = host, port = port, + context = coroutineContext, configure = tcpConfig ) val rSocket = buildConnector(rSocketConfig).connect(transport) @@ -38,9 +39,10 @@ public suspend fun MagixEndpoint.Companion.rSocketStreamWithTcp( val transport = TcpClientTransport( hostname = host, port = port, + context = coroutineContext, configure = tcpConfig ) val rSocket = buildConnector(rSocketConfig).connect(transport) - return RSocketStreamMagixEndpoint(rSocket, coroutineContext, filter) + return RSocketStreamMagixEndpoint(rSocket, filter) } \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt index 3116513..84c9c75 100644 --- a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt +++ b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt @@ -1,8 +1,10 @@ package space.kscience.magix.server +import io.ktor.network.sockets.SocketOptions import io.rsocket.kotlin.ConnectionAcceptor import io.rsocket.kotlin.RSocketRequestHandler import io.rsocket.kotlin.core.RSocketServer +import io.rsocket.kotlin.core.RSocketServerBuilder import io.rsocket.kotlin.payload.Payload import io.rsocket.kotlin.payload.buildPayload import io.rsocket.kotlin.payload.data @@ -10,18 +12,32 @@ import io.rsocket.kotlin.transport.ktor.tcp.TcpServer import io.rsocket.kotlin.transport.ktor.tcp.TcpServerTransport import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.* +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onEach import kotlinx.serialization.encodeToString import space.kscience.magix.api.* import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT /** - * Raw TCP magix server + * Raw TCP magix server plugin */ -public class RSocketMagixFlowPlugin(public val port: Int = DEFAULT_MAGIX_RAW_PORT): MagixFlowPlugin { - override fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow): Job { - val tcpTransport = TcpServerTransport(port = port) - val rSocketJob: TcpServer = RSocketServer().bindIn(scope, tcpTransport, acceptor(scope, magixFlow)) +public class RSocketMagixFlowPlugin( + private val serverHost: String = "0.0.0.0", + private val serverPort: Int = DEFAULT_MAGIX_RAW_PORT, + private val transportConfiguration: SocketOptions.AcceptorOptions.() -> Unit = {}, + private val rsocketConfiguration: RSocketServerBuilder.() -> Unit = {}, +) : MagixFlowPlugin { + + override fun start( + scope: CoroutineScope, + receive: Flow, + sendMessage: suspend (MagixMessage) -> Unit, + ): Job { + val tcpTransport = TcpServerTransport(hostname = serverHost, port = serverPort, configure = transportConfiguration) + val rSocketJob: TcpServer = RSocketServer(rsocketConfiguration) + .bindIn(scope, tcpTransport, acceptor(scope, receive, sendMessage)) scope.coroutineContext[Job]?.invokeOnCompletion { rSocketJob.handlerJob.cancel() @@ -30,40 +46,50 @@ public class RSocketMagixFlowPlugin(public val port: Int = DEFAULT_MAGIX_RAW_POR return rSocketJob.handlerJob } - public companion object{ + public companion object { public fun acceptor( coroutineScope: CoroutineScope, - magixFlow: MutableSharedFlow, + receive: Flow, + sendMessage: suspend (MagixMessage) -> Unit, ): ConnectionAcceptor = ConnectionAcceptor { RSocketRequestHandler(coroutineScope.coroutineContext) { //handler for request/stream requestStream { request: Payload -> - val filter = MagixEndpoint.magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText()) - magixFlow.filter(filter).map { message -> + val filter = MagixEndpoint.magixJson.decodeFromString( + MagixMessageFilter.serializer(), + request.data.readText() + ) + receive.filter(filter).map { message -> val string = MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message) buildPayload { data(string) } } } //single send fireAndForget { request: Payload -> - val message = MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), request.data.readText()) - magixFlow.emit(message) + val message = + MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), request.data.readText()) + sendMessage(message) } // bidirectional connection, used for streaming connection requestChannel { request: Payload, input: Flow -> input.onEach { - magixFlow.emit(MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText())) + sendMessage( + MagixEndpoint.magixJson.decodeFromString( + MagixMessage.serializer(), + it.data.readText() + ) + ) }.launchIn(this) val filterText = request.data.readText() - val filter = if(filterText.isNotBlank()){ + val filter = if (filterText.isNotBlank()) { MagixEndpoint.magixJson.decodeFromString(MagixMessageFilter.serializer(), filterText) } else { MagixMessageFilter() } - magixFlow.filter(filter).map { message -> + receive.filter(filter).map { message -> val string = MagixEndpoint.magixJson.encodeToString(message) buildPayload { data(string) } } diff --git a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/magixModule.kt b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/magixModule.kt index e5cb6cb..d257bb7 100644 --- a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/magixModule.kt +++ b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/magixModule.kt @@ -104,8 +104,11 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow, r val message = call.receive() magixFlow.emit(message) } - //rSocket server. Filter from Payload - rSocket("rsocket", acceptor = RSocketMagixFlowPlugin.acceptor( application, magixFlow)) + //rSocket WS server. Filter from Payload + rSocket( + "rsocket", + acceptor = RSocketMagixFlowPlugin.acceptor(application, magixFlow) { magixFlow.emit(it) } + ) } } } diff --git a/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt b/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt index e878ce9..23715a7 100644 --- a/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt +++ b/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt @@ -23,7 +23,6 @@ public class ZmqMagixEndpoint( ) : MagixEndpoint, AutoCloseable { private val zmqContext by lazy { ZContext() } - @OptIn(ExperimentalCoroutinesApi::class) override fun subscribe(filter: MagixMessageFilter): Flow { val socket = zmqContext.createSocket(SocketType.SUB) socket.connect("$protocol://$host:$pubPort") diff --git a/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixFlowPlugin.kt b/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixFlowPlugin.kt index e66d326..fa4f54a 100644 --- a/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixFlowPlugin.kt +++ b/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixFlowPlugin.kt @@ -1,7 +1,7 @@ package space.kscince.magix.zmq import kotlinx.coroutines.* -import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.serialization.decodeFromString @@ -19,7 +19,12 @@ public class ZmqMagixFlowPlugin( public val zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT, public val zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, ) : MagixFlowPlugin { - override fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow): Job = + + override fun start( + scope: CoroutineScope, + receive: Flow, + sendMessage: suspend (MagixMessage) -> Unit, + ): Job = scope.launch(Dispatchers.IO) { val logger = LoggerFactory.getLogger("magix-server-zmq") @@ -27,7 +32,7 @@ public class ZmqMagixFlowPlugin( //launch the publishing job val pubSocket = context.createSocket(SocketType.PUB) pubSocket.bind("$localHost:$zmqPubSocketPort") - magixFlow.onEach { message -> + receive.onEach { message -> val string = MagixEndpoint.magixJson.encodeToString(message) pubSocket.send(string) logger.trace("Published: $string") @@ -43,7 +48,7 @@ public class ZmqMagixFlowPlugin( if (string != null) { logger.trace("Received: $string") val message = MagixEndpoint.magixJson.decodeFromString(string) - magixFlow.emit(message) + sendMessage(message) } } }