Add ZMQ context customization

This commit is contained in:
Alexander Nozik 2023-07-23 14:29:26 +03:00
parent 6f5270ee37
commit 405bcd6ba3
2 changed files with 25 additions and 25 deletions

View File

@ -20,8 +20,9 @@ public class ZmqMagixEndpoint(
private val pubPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT, private val pubPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT,
private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,
private val coroutineContext: CoroutineContext = Dispatchers.IO, private val coroutineContext: CoroutineContext = Dispatchers.IO,
private val zmqContext: ZContext = ZContext()
) : MagixEndpoint, AutoCloseable { ) : MagixEndpoint, AutoCloseable {
private val zmqContext by lazy { ZContext() }
override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage> { override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage> {
val socket = zmqContext.createSocket(SocketType.SUB) val socket = zmqContext.createSocket(SocketType.SUB)

View File

@ -24,35 +24,34 @@ public class ZmqMagixFlowPlugin(
scope: CoroutineScope, scope: CoroutineScope,
receive: Flow<MagixMessage>, receive: Flow<MagixMessage>,
sendMessage: suspend (MagixMessage) -> Unit, sendMessage: suspend (MagixMessage) -> Unit,
): Job = ): Job = scope.launch(Dispatchers.IO) {
scope.launch(Dispatchers.IO) { val logger = LoggerFactory.getLogger("magix-server-zmq")
val logger = LoggerFactory.getLogger("magix-server-zmq")
ZContext().use { context -> ZContext().use { context ->
//launch the publishing job //launch the publishing job
val pubSocket = context.createSocket(SocketType.PUB) val pubSocket = context.createSocket(SocketType.PUB)
pubSocket.bind("$localHost:$zmqPubSocketPort") pubSocket.bind("$localHost:$zmqPubSocketPort")
receive.onEach { message -> receive.onEach { message ->
val string = MagixEndpoint.magixJson.encodeToString(message) val string = MagixEndpoint.magixJson.encodeToString(message)
pubSocket.send(string) pubSocket.send(string)
logger.trace("Published: $string") logger.trace("Published: $string")
}.launchIn(this) }.launchIn(this)
//launch pulling job //launch pulling job
val pullSocket = context.createSocket(SocketType.PULL) val pullSocket = context.createSocket(SocketType.PULL)
pullSocket.bind("$localHost:$zmqPullSocketPort") pullSocket.bind("$localHost:$zmqPullSocketPort")
pullSocket.receiveTimeOut = 500 pullSocket.receiveTimeOut = 500
//suspending loop while pulling is active //suspending loop while pulling is active
while (isActive) { while (isActive) {
val string: String? = pullSocket.recvStr() val string: String? = pullSocket.recvStr()
if (string != null) { if (string != null) {
logger.trace("Received: $string") logger.trace("Received: $string")
val message = MagixEndpoint.magixJson.decodeFromString<MagixMessage>(string) val message = MagixEndpoint.magixJson.decodeFromString<MagixMessage>(string)
sendMessage(message) sendMessage(message)
}
} }
} }
} }
}
} }