ZMQ client implementation

This commit is contained in:
Alexander Nozik 2021-06-21 15:37:36 +03:00
parent 5e9cbab94d
commit 3cbabd5d4b
2 changed files with 56 additions and 9 deletions

View File

@ -22,14 +22,15 @@ import kotlin.coroutines.coroutineContext
public class RSocketMagixEndpoint<T>(
private val coroutineContext: CoroutineContext,
private val payloadSerializer: KSerializer<T>,
payloadSerializer: KSerializer<T>,
private val rSocket: RSocket,
) : MagixEndpoint<T> {
private val serializer = MagixMessage.serializer(payloadSerializer)
override fun subscribe(
filter: MagixMessageFilter,
): Flow<MagixMessage<T>> {
val serializer = MagixMessage.serializer(payloadSerializer)
val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(filter)) }
val flow = rSocket.requestStream(payload)
return flow.map { MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) }
@ -37,7 +38,6 @@ public class RSocketMagixEndpoint<T>(
override suspend fun broadcast(message: MagixMessage<T>) {
withContext(coroutineContext) {
val serializer = MagixMessage.serializer(payloadSerializer)
val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(serializer, message)) }
rSocket.fireAndForget(payload)
}

View File

@ -1,22 +1,69 @@
package ru.mipt.npm.magix.zmq
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.withContext
import kotlinx.serialization.KSerializer
import kotlinx.serialization.encodeToString
import org.zeromq.SocketType
import org.zeromq.ZContext
import org.zeromq.ZMQ
import org.zeromq.ZMQException
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixMessage
import ru.mipt.npm.magix.api.MagixMessageFilter
import kotlin.coroutines.CoroutineContext
class ZmqMagixEndpoint<T>(
public class ZmqMagixEndpoint<T>(
private val coroutineContext: CoroutineContext,
private val payloadSerializer: KSerializer<T>,
) : MagixEndpoint<T> {
payloadSerializer: KSerializer<T>,
private val address: String,
) : MagixEndpoint<T>, AutoCloseable {
private val zmqContext = ZContext()
private val serializer = MagixMessage.serializer(payloadSerializer)
override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage<T>> {
TODO("Not yet implemented")
val socket = zmqContext.createSocket(SocketType.XSUB)
socket.bind(address)
val topic = MagixEndpoint.magixJson.encodeToString(filter)
socket.subscribe(topic)
return channelFlow {
var activeFlag = true
invokeOnClose {
activeFlag = false
socket.close()
}
while (activeFlag) {
try {
val string = socket.recvStr()
val message = MagixEndpoint.magixJson.decodeFromString(serializer, string)
send(message)
} catch (t: Throwable) {
socket.close()
if (t is ZMQException && t.errorCode == ZMQ.Error.ETERM.code) {
activeFlag = false
} else {
zmqContext.close()
}
}
}
}
}
override suspend fun broadcast(message: MagixMessage<T>) {
TODO("Not yet implemented")
private val publishSocket = zmqContext.createSocket(SocketType.XPUB).apply {
bind(address)
}
override suspend fun broadcast(message: MagixMessage<T>): Unit = withContext(Dispatchers.IO) {
val string = MagixEndpoint.magixJson.encodeToString(serializer, message)
publishSocket.send(string)
}
override fun close() {
zmqContext.close()
}
}