add duplicating local filter for RSocketMagixEndpoint

This commit is contained in:
Alexander Nozik 2021-11-13 13:25:06 +03:00
parent 039e0d083b
commit df6defd637
2 changed files with 5 additions and 5 deletions

View File

@ -20,6 +20,7 @@ import kotlinx.serialization.encodeToString
import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixMessage import ru.mipt.npm.magix.api.MagixMessage
import ru.mipt.npm.magix.api.MagixMessageFilter import ru.mipt.npm.magix.api.MagixMessageFilter
import ru.mipt.npm.magix.api.filter
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext import kotlin.coroutines.coroutineContext
@ -38,7 +39,7 @@ public class RSocketMagixEndpoint<T>(
val flow = rSocket.requestStream(payload) val flow = rSocket.requestStream(payload)
return flow.map { return flow.map {
MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText())
}.flowOn(coroutineContext[CoroutineDispatcher]?:Dispatchers.Unconfined) }.filter(filter).flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined)
} }
override suspend fun broadcast(message: MagixMessage<T>) { override suspend fun broadcast(message: MagixMessage<T>) {

View File

@ -21,7 +21,7 @@ public class ZmqMagixEndpoint<T>(
payloadSerializer: KSerializer<T>, payloadSerializer: KSerializer<T>,
private val pubPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT, private val pubPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT,
private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,
private val coroutineContext: CoroutineContext = Dispatchers.IO private val coroutineContext: CoroutineContext = Dispatchers.IO,
) : MagixEndpoint<T>, AutoCloseable { ) : MagixEndpoint<T>, AutoCloseable {
private val zmqContext by lazy { ZContext() } private val zmqContext by lazy { ZContext() }
@ -54,9 +54,8 @@ public class ZmqMagixEndpoint<T>(
} }
} }
} }
}.filter(filter).flowOn( }.filter(filter).flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.IO)
coroutineContext[CoroutineDispatcher] ?: Dispatchers.IO //should be flown on IO because of blocking calls
) //should be flown on IO because of blocking calls
} }
private val publishSocket by lazy { private val publishSocket by lazy {