Refactor connection infrastructure

This commit is contained in:
Alexander Nozik 2023-07-23 14:01:47 +03:00
parent 4d4a9fba1c
commit c28944e10f
15 changed files with 172 additions and 109 deletions

View File

@ -1,7 +1,7 @@
package space.kscience.controls.opcua.client
import org.eclipse.milo.opcua.sdk.client.OpcUaClient
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder
import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy
import space.kscience.controls.api.Device
@ -12,12 +12,12 @@ import space.kscience.dataforge.context.Global
import space.kscience.dataforge.meta.*
public sealed class MiloIdentity: Scheme()
public sealed class MiloIdentity : Scheme()
public class MiloUsername : MiloIdentity() {
public var username: String by string{ error("Username not defined") }
public var password: String by string{ error("Password not defined") }
public var username: String by string { error("Username not defined") }
public var password: String by string { error("Password not defined") }
public companion object : SchemeSpec<MiloUsername>(::MiloUsername)
}
@ -35,6 +35,12 @@ public class MiloConfiguration : Scheme() {
public var securityPolicy: SecurityPolicy by enum(SecurityPolicy.None)
internal fun configureClient(builder: OpcUaClientConfigBuilder) {
username?.let {
builder.setIdentityProvider(UsernameProvider(it.username, it.password))
}
}
public companion object : SchemeSpec<MiloConfiguration>(::MiloConfiguration)
}
@ -51,9 +57,7 @@ public open class OpcUaDeviceBySpec<D : Device>(
context.createOpcUaClient(
config.endpointUrl,
securityPolicy = config.securityPolicy,
identityProvider = config.username?.let {
UsernameProvider(it.username,it.password)
} ?: AnonymousProvider()
opcClientConfig = { config.configureClient(this) }
).apply {
connect().get()
}

View File

@ -3,7 +3,6 @@ package space.kscience.controls.opcua.client
import org.eclipse.milo.opcua.sdk.client.OpcUaClient
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider
import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider
import org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator
import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy
@ -18,14 +17,14 @@ import java.nio.file.Path
import java.nio.file.Paths
import java.util.*
internal fun <T:Any> T?.toOptional(): Optional<T> = if(this == null) Optional.empty() else Optional.of(this)
internal fun <T : Any> T?.toOptional(): Optional<T> = Optional.ofNullable(this)
internal fun Context.createOpcUaClient(
endpointUrl: String, //"opc.tcp://localhost:12686/milo"
securityPolicy: SecurityPolicy = SecurityPolicy.Basic256Sha256,
identityProvider: IdentityProvider = AnonymousProvider(),
endpointFilter: (EndpointDescription?) -> Boolean = { securityPolicy.uri == it?.securityPolicyUri }
endpointFilter: (EndpointDescription?) -> Boolean = { securityPolicy.uri == it?.securityPolicyUri },
opcClientConfig: OpcUaClientConfigBuilder.() -> Unit,
): OpcUaClient {
val securityTempDir: Path = Paths.get(System.getProperty("java.io.tmpdir"), "client", "security")
@ -47,14 +46,15 @@ internal fun Context.createOpcUaClient(
}
) { configBuilder: OpcUaClientConfigBuilder ->
configBuilder
.setApplicationName(LocalizedText.english("Controls.kt"))
.setApplicationUri("urn:ru.mipt:npm:controls:opcua")
.setApplicationName(LocalizedText.english("Controls-kt"))
.setApplicationUri("urn:space.kscience:controls:opcua")
// .setKeyPair(loader.getClientKeyPair())
// .setCertificate(loader.getClientCertificate())
// .setCertificateChain(loader.getClientCertificateChain())
.setCertificateValidator(certificateValidator)
.setIdentityProvider(identityProvider)
.setIdentityProvider(AnonymousProvider())
.setRequestTimeout(uint(5000))
.apply(opcClientConfig)
.build()
}
// .apply {

View File

@ -12,6 +12,7 @@ import org.eclipse.milo.opcua.sdk.server.api.ManagedNamespaceWithLifecycle
import org.eclipse.milo.opcua.sdk.server.api.MonitoredItem
import org.eclipse.milo.opcua.sdk.server.nodes.UaFolderNode
import org.eclipse.milo.opcua.sdk.server.nodes.UaNode
import org.eclipse.milo.opcua.sdk.server.nodes.UaNodeContext
import org.eclipse.milo.opcua.sdk.server.nodes.UaVariableNode
import org.eclipse.milo.opcua.sdk.server.util.SubscriptionModel
import org.eclipse.milo.opcua.stack.core.AttributeId
@ -27,7 +28,6 @@ import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaSerializer
import space.kscience.dataforge.meta.ValueType
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.plus
@ -50,25 +50,7 @@ public class DeviceNameSpace(
lifecycleManager.addLifecycle(subscription)
lifecycleManager.addStartupTask {
deviceManager.devices.forEach { (deviceName, device) ->
val tokenAsString = deviceName.toString()
val deviceFolder = UaFolderNode(
this.nodeContext,
newNodeId(tokenAsString),
newQualifiedName(tokenAsString),
LocalizedText.english(tokenAsString)
)
deviceFolder.addReference(
Reference(
deviceFolder.nodeId,
Identifiers.Organizes,
Identifiers.ObjectsFolder.expanded(),
false
)
)
deviceFolder.registerDeviceNodes(deviceName.asName(), device)
this.nodeManager.addNode(deviceFolder)
}
nodeContext.registerHub(deviceManager, Name.EMPTY)
}
lifecycleManager.addLifecycle(object : Lifecycle {
@ -88,7 +70,7 @@ public class DeviceNameSpace(
val node: UaVariableNode = UaVariableNode.UaVariableNodeBuilder(nodeContext).apply {
//for now use DF path as id
//for now, use DF paths as ids
nodeId = newNodeId("${deviceName.tokens.joinToString("/")}/$propertyName")
when {
descriptor.readable && descriptor.writable -> {
@ -161,15 +143,15 @@ public class DeviceNameSpace(
}
//recursively add sub-devices
if (device is DeviceHub) {
registerHub(device, deviceName)
nodeContext.registerHub(device, deviceName)
}
}
private fun UaNode.registerHub(hub: DeviceHub, namePrefix: Name) {
private fun UaNodeContext.registerHub(hub: DeviceHub, namePrefix: Name) {
hub.devices.forEach { (deviceName, device) ->
val tokenAsString = deviceName.toString()
val deviceFolder = UaFolderNode(
this.nodeContext,
this,
newNodeId(tokenAsString),
newQualifiedName(tokenAsString),
LocalizedText.english(tokenAsString)

View File

@ -60,14 +60,14 @@ private suspend fun MagixEndpoint.collectEcho(scope: CoroutineScope, n: Int) {
@OptIn(ExperimentalTime::class)
suspend fun main(): Unit = coroutineScope {
launch(Dispatchers.Default) {
val server = startMagixServer(MagixFlowPlugin { _, flow ->
val server = startMagixServer(MagixFlowPlugin { _, flow, send ->
val logger = LoggerFactory.getLogger("echo")
//echo each message
flow.onEach { message ->
if (message.parentId == null) {
val m = message.copy(origin = "loop", parentId = message.id, id = message.id + ".response")
logger.info(m.toString())
flow.emit(m)
send(m)
}
}.launchIn(this)
})

View File

@ -7,7 +7,6 @@ import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import space.kscience.controls.client.connectToMagix
import space.kscience.controls.client.controlsMagixFormat
import space.kscience.controls.manager.DeviceManager
@ -21,7 +20,7 @@ import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.int
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.subscribe
import space.kscience.magix.rsocket.rSocketStreamWithTcp
import space.kscience.magix.rsocket.rSocketWithWebSockets
import space.kscience.magix.server.RSocketMagixFlowPlugin
import space.kscience.magix.server.startMagixServer
import space.kscience.plotly.Plotly
@ -31,8 +30,10 @@ import space.kscience.plotly.plot
import space.kscience.plotly.server.PlotlyUpdateMode
import space.kscience.plotly.server.serve
import space.kscience.plotly.server.show
import space.kscince.magix.zmq.ZmqMagixFlowPlugin
import java.util.concurrent.ConcurrentHashMap
import kotlin.random.Random
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
@ -48,7 +49,7 @@ class MassDevice(context: Context, meta: Meta) : DeviceBySpec<MassDevice>(MassDe
val value by doubleProperty { randomValue }
override suspend fun MassDevice.onOpen() {
doRecurring(100.milliseconds) {
doRecurring(50.milliseconds) {
read(value)
}
}
@ -60,13 +61,13 @@ fun main() {
context.startMagixServer(
RSocketMagixFlowPlugin(),
// ZmqMagixFlowPlugin()
ZmqMagixFlowPlugin()
)
val numDevices = 100
context.launch(Dispatchers.IO) {
repeat(numDevices) {
repeat(numDevices) {
context.launch(Dispatchers.IO) {
val deviceContext = Context("Device${it}") {
plugin(DeviceManager)
}
@ -76,7 +77,7 @@ fun main() {
deviceManager.install("device$it", MassDevice)
val endpointId = "device$it"
val deviceEndpoint = MagixEndpoint.rSocketStreamWithTcp("localhost")
val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
deviceManager.connectToMagix(deviceEndpoint, endpointId)
}
}
@ -90,21 +91,21 @@ fun main() {
title = "Latest event"
}
bar {
launch(Dispatchers.Default){
val monitorEndpoint = MagixEndpoint.rSocketStreamWithTcp("localhost")
launch(Dispatchers.IO) {
val monitorEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
val latest = ConcurrentHashMap<String, Instant>()
val latest = ConcurrentHashMap<String, Duration>()
monitorEndpoint.subscribe(controlsMagixFormat).onEach { (magixMessage, payload) ->
latest[magixMessage.origin] = payload.time ?: Clock.System.now()
latest[magixMessage.origin] = Clock.System.now() - payload.time!!
}.launchIn(this)
while (isActive) {
delay(200)
val now = Clock.System.now()
val sorted = latest.mapKeys { it.key.substring(6).toInt() }.toSortedMap()
latest.clear()
x.numbers = sorted.keys
y.numbers = sorted.values.map { now.minus(it).inWholeMilliseconds / 1000.0 }
y.numbers = sorted.values.map { it.inWholeMilliseconds / 1000.0 + 0.0001 }
}
}
}

View File

@ -40,16 +40,12 @@ fun <D : Device, T : Any> D.fxProperty(spec: WritableDevicePropertySpec<D, T>):
init {
//Read incoming changes
onPropertyChange(spec) {
if (it != null) {
runLater {
try {
set(it)
} catch (ex: Throwable) {
logger.info { "Failed to set property $name to $it" }
}
runLater {
try {
set(it)
} catch (ex: Throwable) {
logger.info { "Failed to set property $name to $it" }
}
} else {
invalidated()
}
}

View File

@ -2,8 +2,28 @@ package space.kscience.magix.api
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
/**
* A plugin that could be inserted into basic loop implementation.
*/
public fun interface MagixFlowPlugin {
public fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow<MagixMessage>): Job
/**
* Attach a [Job] to magix loop.
* Receive messages from [receive].
* Send messages via [sendMessage]
*/
public fun start(
scope: CoroutineScope,
receive: Flow<MagixMessage>,
sendMessage: suspend (MagixMessage) -> Unit,
): Job
/**
* Use the same [MutableSharedFlow] to send and receive messages. Could be a bottleneck in case of many plugins.
*/
public fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow<MagixMessage>): Job =
start(scope, magixFlow) { magixFlow.emit(it) }
}

View File

@ -0,0 +1,30 @@
package space.kscience.magix.connections
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixMessageFilter
/**
* Create a gateway between two magix endpoints using filters for forward and backward message passing.
* Portal is useful to create segmented magix loops:
* * limit the load on given loop segment by filtering some messages;
* * use different loop implementations.
*/
public fun CoroutineScope.launchMagixPortal(
firstEndpoint: MagixEndpoint,
secondEndpoint: MagixEndpoint,
forwardFilter: MagixMessageFilter = MagixMessageFilter.ALL,
backwardFilter: MagixMessageFilter = MagixMessageFilter.ALL,
): Job = launch {
firstEndpoint.subscribe(forwardFilter).onEach {
secondEndpoint.broadcast(it)
}.launchIn(this)
secondEndpoint.subscribe(backwardFilter).onEach {
firstEndpoint.broadcast(it)
}.launchIn(this)
}

View File

@ -10,7 +10,10 @@ import io.rsocket.kotlin.ktor.client.RSocketSupport
import io.rsocket.kotlin.ktor.client.rSocket
import io.rsocket.kotlin.payload.buildPayload
import io.rsocket.kotlin.payload.data
import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
@ -18,7 +21,6 @@ import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.MagixMessageFilter
import space.kscience.magix.api.filter
import kotlin.coroutines.coroutineContext
public class RSocketMagixEndpoint(private val rSocket: RSocket) : MagixEndpoint, Closeable {
@ -34,7 +36,7 @@ public class RSocketMagixEndpoint(private val rSocket: RSocket) : MagixEndpoint,
}.filter(filter).flowOn(rSocket.coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined)
}
override suspend fun broadcast(message: MagixMessage): Unit = withContext(coroutineContext) {
override suspend fun broadcast(message: MagixMessage): Unit {
val payload = buildPayload {
data(MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message))
}

View File

@ -10,20 +10,16 @@ import io.rsocket.kotlin.ktor.client.rSocket
import io.rsocket.kotlin.payload.Payload
import io.rsocket.kotlin.payload.buildPayload
import io.rsocket.kotlin.payload.data
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.map
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.MagixMessageFilter
import space.kscience.magix.api.filter
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
/**
* RSocket endpoint based on an established channel. This way it works a lot faster than [RSocketMagixEndpoint]
@ -33,11 +29,10 @@ import kotlin.coroutines.coroutineContext
*/
public class RSocketStreamMagixEndpoint(
private val rSocket: RSocket,
private val coroutineContext: CoroutineContext,
public val streamFilter: MagixMessageFilter = MagixMessageFilter(),
) : MagixEndpoint, Closeable {
private val output: MutableSharedFlow<MagixMessage> = MutableSharedFlow()
private val output: Channel<Payload> = Channel()
private val input: Flow<Payload> by lazy {
rSocket.requestChannel(
@ -49,24 +44,22 @@ public class RSocketStreamMagixEndpoint(
)
)
},
output.map { message ->
buildPayload {
data(MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message))
}
}.flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined)
output.consumeAsFlow()
)
}
override fun subscribe(
filter: MagixMessageFilter,
): Flow<MagixMessage> {
return input.map {
MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText())
}.filter(filter).flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined)
}
): Flow<MagixMessage> = input.map {
MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText())
}.filter(filter)
override suspend fun broadcast(message: MagixMessage): Unit {
output.emit(message)
output.send(
buildPayload {
data(MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message))
}
)
}
override fun close() {
@ -95,5 +88,5 @@ public suspend fun MagixEndpoint.Companion.rSocketStreamWithWebSockets(
client.close()
}
return RSocketStreamMagixEndpoint(rSocket, coroutineContext, filter)
return RSocketStreamMagixEndpoint(rSocket, filter)
}

View File

@ -20,6 +20,7 @@ public suspend fun MagixEndpoint.Companion.rSocketWithTcp(
val transport = TcpClientTransport(
hostname = host,
port = port,
context = coroutineContext,
configure = tcpConfig
)
val rSocket = buildConnector(rSocketConfig).connect(transport)
@ -38,9 +39,10 @@ public suspend fun MagixEndpoint.Companion.rSocketStreamWithTcp(
val transport = TcpClientTransport(
hostname = host,
port = port,
context = coroutineContext,
configure = tcpConfig
)
val rSocket = buildConnector(rSocketConfig).connect(transport)
return RSocketStreamMagixEndpoint(rSocket, coroutineContext, filter)
return RSocketStreamMagixEndpoint(rSocket, filter)
}

View File

@ -1,8 +1,10 @@
package space.kscience.magix.server
import io.ktor.network.sockets.SocketOptions
import io.rsocket.kotlin.ConnectionAcceptor
import io.rsocket.kotlin.RSocketRequestHandler
import io.rsocket.kotlin.core.RSocketServer
import io.rsocket.kotlin.core.RSocketServerBuilder
import io.rsocket.kotlin.payload.Payload
import io.rsocket.kotlin.payload.buildPayload
import io.rsocket.kotlin.payload.data
@ -10,18 +12,32 @@ import io.rsocket.kotlin.transport.ktor.tcp.TcpServer
import io.rsocket.kotlin.transport.ktor.tcp.TcpServerTransport
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.serialization.encodeToString
import space.kscience.magix.api.*
import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT
/**
* Raw TCP magix server
* Raw TCP magix server plugin
*/
public class RSocketMagixFlowPlugin(public val port: Int = DEFAULT_MAGIX_RAW_PORT): MagixFlowPlugin {
override fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow<MagixMessage>): Job {
val tcpTransport = TcpServerTransport(port = port)
val rSocketJob: TcpServer = RSocketServer().bindIn(scope, tcpTransport, acceptor(scope, magixFlow))
public class RSocketMagixFlowPlugin(
private val serverHost: String = "0.0.0.0",
private val serverPort: Int = DEFAULT_MAGIX_RAW_PORT,
private val transportConfiguration: SocketOptions.AcceptorOptions.() -> Unit = {},
private val rsocketConfiguration: RSocketServerBuilder.() -> Unit = {},
) : MagixFlowPlugin {
override fun start(
scope: CoroutineScope,
receive: Flow<MagixMessage>,
sendMessage: suspend (MagixMessage) -> Unit,
): Job {
val tcpTransport = TcpServerTransport(hostname = serverHost, port = serverPort, configure = transportConfiguration)
val rSocketJob: TcpServer = RSocketServer(rsocketConfiguration)
.bindIn(scope, tcpTransport, acceptor(scope, receive, sendMessage))
scope.coroutineContext[Job]?.invokeOnCompletion {
rSocketJob.handlerJob.cancel()
@ -30,40 +46,50 @@ public class RSocketMagixFlowPlugin(public val port: Int = DEFAULT_MAGIX_RAW_POR
return rSocketJob.handlerJob
}
public companion object{
public companion object {
public fun acceptor(
coroutineScope: CoroutineScope,
magixFlow: MutableSharedFlow<MagixMessage>,
receive: Flow<MagixMessage>,
sendMessage: suspend (MagixMessage) -> Unit,
): ConnectionAcceptor = ConnectionAcceptor {
RSocketRequestHandler(coroutineScope.coroutineContext) {
//handler for request/stream
requestStream { request: Payload ->
val filter = MagixEndpoint.magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText())
magixFlow.filter(filter).map { message ->
val filter = MagixEndpoint.magixJson.decodeFromString(
MagixMessageFilter.serializer(),
request.data.readText()
)
receive.filter(filter).map { message ->
val string = MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message)
buildPayload { data(string) }
}
}
//single send
fireAndForget { request: Payload ->
val message = MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), request.data.readText())
magixFlow.emit(message)
val message =
MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), request.data.readText())
sendMessage(message)
}
// bidirectional connection, used for streaming connection
requestChannel { request: Payload, input: Flow<Payload> ->
input.onEach {
magixFlow.emit(MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText()))
sendMessage(
MagixEndpoint.magixJson.decodeFromString(
MagixMessage.serializer(),
it.data.readText()
)
)
}.launchIn(this)
val filterText = request.data.readText()
val filter = if(filterText.isNotBlank()){
val filter = if (filterText.isNotBlank()) {
MagixEndpoint.magixJson.decodeFromString(MagixMessageFilter.serializer(), filterText)
} else {
MagixMessageFilter()
}
magixFlow.filter(filter).map { message ->
receive.filter(filter).map { message ->
val string = MagixEndpoint.magixJson.encodeToString(message)
buildPayload { data(string) }
}

View File

@ -104,8 +104,11 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, r
val message = call.receive<MagixMessage>()
magixFlow.emit(message)
}
//rSocket server. Filter from Payload
rSocket("rsocket", acceptor = RSocketMagixFlowPlugin.acceptor( application, magixFlow))
//rSocket WS server. Filter from Payload
rSocket(
"rsocket",
acceptor = RSocketMagixFlowPlugin.acceptor(application, magixFlow) { magixFlow.emit(it) }
)
}
}
}

View File

@ -23,7 +23,6 @@ public class ZmqMagixEndpoint(
) : MagixEndpoint, AutoCloseable {
private val zmqContext by lazy { ZContext() }
@OptIn(ExperimentalCoroutinesApi::class)
override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage> {
val socket = zmqContext.createSocket(SocketType.SUB)
socket.connect("$protocol://$host:$pubPort")

View File

@ -1,7 +1,7 @@
package space.kscince.magix.zmq
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.serialization.decodeFromString
@ -19,7 +19,12 @@ public class ZmqMagixFlowPlugin(
public val zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT,
public val zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,
) : MagixFlowPlugin {
override fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow<MagixMessage>): Job =
override fun start(
scope: CoroutineScope,
receive: Flow<MagixMessage>,
sendMessage: suspend (MagixMessage) -> Unit,
): Job =
scope.launch(Dispatchers.IO) {
val logger = LoggerFactory.getLogger("magix-server-zmq")
@ -27,7 +32,7 @@ public class ZmqMagixFlowPlugin(
//launch the publishing job
val pubSocket = context.createSocket(SocketType.PUB)
pubSocket.bind("$localHost:$zmqPubSocketPort")
magixFlow.onEach { message ->
receive.onEach { message ->
val string = MagixEndpoint.magixJson.encodeToString(message)
pubSocket.send(string)
logger.trace("Published: $string")
@ -43,7 +48,7 @@ public class ZmqMagixFlowPlugin(
if (string != null) {
logger.trace("Received: $string")
val message = MagixEndpoint.magixJson.decodeFromString<MagixMessage>(string)
magixFlow.emit(message)
sendMessage(message)
}
}
}