From 405bcd6ba31a0ee55e250419a1686bfe516476ba Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sun, 23 Jul 2023 14:29:26 +0300 Subject: [PATCH] Add ZMQ context customization --- .../kscince/magix/zmq/ZmqMagixEndpoint.kt | 3 +- .../kscince/magix/zmq/ZmqMagixFlowPlugin.kt | 47 +++++++++---------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt b/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt index 23715a7..376208d 100644 --- a/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt +++ b/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt @@ -20,8 +20,9 @@ public class ZmqMagixEndpoint( private val pubPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT, private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, private val coroutineContext: CoroutineContext = Dispatchers.IO, + private val zmqContext: ZContext = ZContext() + ) : MagixEndpoint, AutoCloseable { - private val zmqContext by lazy { ZContext() } override fun subscribe(filter: MagixMessageFilter): Flow { val socket = zmqContext.createSocket(SocketType.SUB) diff --git a/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixFlowPlugin.kt b/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixFlowPlugin.kt index fa4f54a..c4ec16a 100644 --- a/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixFlowPlugin.kt +++ b/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixFlowPlugin.kt @@ -24,35 +24,34 @@ public class ZmqMagixFlowPlugin( scope: CoroutineScope, receive: Flow, sendMessage: suspend (MagixMessage) -> Unit, - ): Job = - scope.launch(Dispatchers.IO) { - val logger = LoggerFactory.getLogger("magix-server-zmq") + ): Job = scope.launch(Dispatchers.IO) { + val logger = LoggerFactory.getLogger("magix-server-zmq") - ZContext().use { context -> - //launch the publishing job - val pubSocket = context.createSocket(SocketType.PUB) - pubSocket.bind("$localHost:$zmqPubSocketPort") - receive.onEach { message -> - val string = MagixEndpoint.magixJson.encodeToString(message) - pubSocket.send(string) - logger.trace("Published: $string") - }.launchIn(this) + ZContext().use { context -> + //launch the publishing job + val pubSocket = context.createSocket(SocketType.PUB) + pubSocket.bind("$localHost:$zmqPubSocketPort") + receive.onEach { message -> + val string = MagixEndpoint.magixJson.encodeToString(message) + pubSocket.send(string) + logger.trace("Published: $string") + }.launchIn(this) - //launch pulling job - val pullSocket = context.createSocket(SocketType.PULL) - pullSocket.bind("$localHost:$zmqPullSocketPort") - pullSocket.receiveTimeOut = 500 - //suspending loop while pulling is active - while (isActive) { - val string: String? = pullSocket.recvStr() - if (string != null) { - logger.trace("Received: $string") - val message = MagixEndpoint.magixJson.decodeFromString(string) - sendMessage(message) - } + //launch pulling job + val pullSocket = context.createSocket(SocketType.PULL) + pullSocket.bind("$localHost:$zmqPullSocketPort") + pullSocket.receiveTimeOut = 500 + //suspending loop while pulling is active + while (isActive) { + val string: String? = pullSocket.recvStr() + if (string != null) { + logger.trace("Received: $string") + val message = MagixEndpoint.magixJson.decodeFromString(string) + sendMessage(message) } } } + } } \ No newline at end of file