From 3cbabd5d4bd372461f11a0dd5879f142b7881f1a Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Mon, 21 Jun 2021 15:37:36 +0300 Subject: [PATCH] ZMQ client implementation --- .../npm/magix/rsocket/RSocketMagixEndpoint.kt | 6 +- .../ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt | 59 +++++++++++++++++-- 2 files changed, 56 insertions(+), 9 deletions(-) diff --git a/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt b/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt index 2753a0e..8589a6a 100644 --- a/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt +++ b/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt @@ -22,14 +22,15 @@ import kotlin.coroutines.coroutineContext public class RSocketMagixEndpoint( private val coroutineContext: CoroutineContext, - private val payloadSerializer: KSerializer, + payloadSerializer: KSerializer, private val rSocket: RSocket, ) : MagixEndpoint { + private val serializer = MagixMessage.serializer(payloadSerializer) + override fun subscribe( filter: MagixMessageFilter, ): Flow> { - val serializer = MagixMessage.serializer(payloadSerializer) val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(filter)) } val flow = rSocket.requestStream(payload) return flow.map { MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) } @@ -37,7 +38,6 @@ public class RSocketMagixEndpoint( override suspend fun broadcast(message: MagixMessage) { withContext(coroutineContext) { - val serializer = MagixMessage.serializer(payloadSerializer) val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(serializer, message)) } rSocket.fireAndForget(payload) } diff --git a/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt b/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt index ca02a78..8a272df 100644 --- a/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt +++ b/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt @@ -1,22 +1,69 @@ package ru.mipt.npm.magix.zmq +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.channelFlow +import kotlinx.coroutines.withContext import kotlinx.serialization.KSerializer +import kotlinx.serialization.encodeToString +import org.zeromq.SocketType +import org.zeromq.ZContext +import org.zeromq.ZMQ +import org.zeromq.ZMQException import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.api.MagixMessage import ru.mipt.npm.magix.api.MagixMessageFilter import kotlin.coroutines.CoroutineContext -class ZmqMagixEndpoint( +public class ZmqMagixEndpoint( private val coroutineContext: CoroutineContext, - private val payloadSerializer: KSerializer, -) : MagixEndpoint { + payloadSerializer: KSerializer, + private val address: String, +) : MagixEndpoint, AutoCloseable { + private val zmqContext = ZContext() + + private val serializer = MagixMessage.serializer(payloadSerializer) override fun subscribe(filter: MagixMessageFilter): Flow> { - TODO("Not yet implemented") + val socket = zmqContext.createSocket(SocketType.XSUB) + socket.bind(address) + + val topic = MagixEndpoint.magixJson.encodeToString(filter) + socket.subscribe(topic) + + return channelFlow { + var activeFlag = true + invokeOnClose { + activeFlag = false + socket.close() + } + while (activeFlag) { + try { + val string = socket.recvStr() + val message = MagixEndpoint.magixJson.decodeFromString(serializer, string) + send(message) + } catch (t: Throwable) { + socket.close() + if (t is ZMQException && t.errorCode == ZMQ.Error.ETERM.code) { + activeFlag = false + } else { + zmqContext.close() + } + } + } + } } - override suspend fun broadcast(message: MagixMessage) { - TODO("Not yet implemented") + private val publishSocket = zmqContext.createSocket(SocketType.XPUB).apply { + bind(address) + } + + override suspend fun broadcast(message: MagixMessage): Unit = withContext(Dispatchers.IO) { + val string = MagixEndpoint.magixJson.encodeToString(serializer, message) + publishSocket.send(string) + } + + override fun close() { + zmqContext.close() } } \ No newline at end of file