Minor refactoring to adopt SharedFlow

This commit is contained in:
Alexander Nozik 2020-11-06 16:28:21 +03:00
parent 78ee05371b
commit dcf08d4426
7 changed files with 42 additions and 32 deletions

View File

@ -6,7 +6,7 @@ plugins {
val dataforgeVersion: String by extra("0.2.0-dev-4") val dataforgeVersion: String by extra("0.2.0-dev-4")
val ktorVersion: String by extra("1.4.1") val ktorVersion: String by extra("1.4.1")
val rsocketVersion by extra("0.11.0-SNAPSHOT") val rsocketVersion by extra("0.11.1")
allprojects { allprojects {
repositories { repositories {
@ -17,7 +17,7 @@ allprojects {
maven("https://maven.pkg.github.com/altavir/kotlin-logging/") maven("https://maven.pkg.github.com/altavir/kotlin-logging/")
maven("https://dl.bintray.com/rsocket-admin/RSocket") maven("https://dl.bintray.com/rsocket-admin/RSocket")
maven("https://maven.pkg.github.com/altavir/ktor-client-sse") maven("https://maven.pkg.github.com/altavir/ktor-client-sse")
maven("https://oss.jfrog.org/oss-snapshot-local") // maven("https://oss.jfrog.org/oss-snapshot-local")
} }
group = "hep.dataforge" group = "hep.dataforge"

View File

@ -7,7 +7,7 @@ val dataforgeVersion: String by rootProject.extra
val ktorVersion: String by rootProject.extra val ktorVersion: String by rootProject.extra
kscience { kscience {
useCoroutines() useCoroutines("1.4.1")
useSerialization{ useSerialization{
json() json()
} }

View File

@ -14,10 +14,10 @@ import hep.dataforge.names.Name
import hep.dataforge.names.NameToken import hep.dataforge.names.NameToken
import hep.dataforge.names.toName import hep.dataforge.names.toName
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.isActive import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
@ -27,20 +27,18 @@ public class HubController(
public val scope: CoroutineScope, public val scope: CoroutineScope,
) : Consumer, Responder { ) : Consumer, Responder {
private val messageOutbox = Channel<DeviceMessage>(Channel.CONFLATED)
private val envelopeOutbox = Channel<Envelope>(Channel.CONFLATED) private val messageOutbox = MutableSharedFlow<DeviceMessage>()
public fun messageOutput(): Flow<DeviceMessage> = messageOutbox.consumeAsFlow() private val envelopeOutbox = MutableSharedFlow<Envelope>()
public fun envelopeOutput(): Flow<Envelope> = envelopeOutbox.consumeAsFlow() public val messageOutput: SharedFlow<DeviceMessage> get() = messageOutbox
private val packJob = scope.launch { public val envelopeOutput: SharedFlow<Envelope> get() = envelopeOutbox
while (isActive) {
val message = messageOutbox.receive() private val packJob = messageOutbox.onEach { message ->
envelopeOutbox.send(message.toEnvelope()) envelopeOutbox.emit(message.toEnvelope())
} }.launchIn(scope)
}
private val listeners: Map<NameToken, DeviceListener> = hub.devices.mapValues { (name, device) -> private val listeners: Map<NameToken, DeviceListener> = hub.devices.mapValues { (name, device) ->
object : DeviceListener { object : DeviceListener {
@ -53,8 +51,7 @@ public class HubController(
key = propertyName, key = propertyName,
value = value value = value
) )
messageOutbox.emit(change)
messageOutbox.send(change)
} }
} }
}.also { }.also {
@ -74,7 +71,8 @@ public class HubController(
val targetName = request.meta[DeviceMessage.TARGET_KEY].string?.toName() ?: Name.EMPTY val targetName = request.meta[DeviceMessage.TARGET_KEY].string?.toName() ?: Name.EMPTY
val device = hub[targetName] ?: error("The device with name $targetName not found in $hub") val device = hub[targetName] ?: error("The device with name $targetName not found in $hub")
if (request.data == null) { if (request.data == null) {
DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.fromMeta(request.meta)).toEnvelope() DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.fromMeta(request.meta))
.toEnvelope()
} else { } else {
DeviceController.respond(device, targetName.toString(), request) DeviceController.respond(device, targetName.toString(), request)
} }

View File

@ -172,7 +172,7 @@ public fun Application.deviceModule(
try { try {
application.log.debug("Opened server socket for ${call.request.queryParameters}") application.log.debug("Opened server socket for ${call.request.queryParameters}")
manager.controller.envelopeOutput().collect { manager.controller.envelopeOutput.collect {
outgoing.send(it.toFrame()) outgoing.send(it.toFrame())
} }

View File

@ -6,12 +6,13 @@ import hep.dataforge.control.controllers.respondMessage
import hep.dataforge.magix.api.MagixEndpoint import hep.dataforge.magix.api.MagixEndpoint
import hep.dataforge.magix.api.MagixMessage import hep.dataforge.magix.api.MagixMessage
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
public const val DATAFORGE_FORMAT: String = "dataforge" public const val DATAFORGE_MAGIX_FORMAT: String = "dataforge"
private fun generateId(request: MagixMessage<DeviceMessage>): String = if (request.id != null) { private fun generateId(request: MagixMessage<DeviceMessage>): String = if (request.id != null) {
"${request.id}.response" "${request.id}.response"
@ -24,29 +25,33 @@ private fun generateId(request: MagixMessage<DeviceMessage>): String = if (reque
*/ */
public fun DeviceManager.launchMagixClient( public fun DeviceManager.launchMagixClient(
endpoint: MagixEndpoint, endpoint: MagixEndpoint,
endpointID: String = "dataforge", endpointID: String = DATAFORGE_MAGIX_FORMAT,
): Job = context.launch { ): Job = context.launch {
endpoint.subscribe(DeviceMessage.serializer()).onEach { request -> endpoint.subscribe(DeviceMessage.serializer()).onEach { request ->
//TODO analyze action //TODO analyze action
val responsePayload = respondMessage(request.payload) val responsePayload = respondMessage(request.payload)
val response = MagixMessage( val response = MagixMessage(
format = DATAFORGE_FORMAT, format = DATAFORGE_MAGIX_FORMAT,
id = generateId(request), id = generateId(request),
parentId = request.id, parentId = request.id,
origin = endpointID, origin = endpointID,
payload = responsePayload payload = responsePayload
) )
endpoint.broadcast(DeviceMessage.serializer(), response) endpoint.broadcast(DeviceMessage.serializer(), response)
}.catch { error ->
logger.error(error){"Error while responding to message"}
}.launchIn(endpoint.scope) }.launchIn(endpoint.scope)
controller.messageOutput().onEach { payload -> controller.messageOutput.onEach { payload ->
MagixMessage( MagixMessage(
format = DATAFORGE_FORMAT, format = DATAFORGE_MAGIX_FORMAT,
id = "df[${payload.hashCode()}]", id = "df[${payload.hashCode()}]",
origin = endpointID, origin = endpointID,
payload = payload payload = payload
) )
}.catch { error ->
logger.error(error){"Error while sending a message"}
}.launchIn(endpoint.scope) }.launchIn(endpoint.scope)
} }

View File

@ -20,6 +20,8 @@ import io.ktor.websocket.WebSockets
import io.rsocket.kotlin.ConnectionAcceptor import io.rsocket.kotlin.ConnectionAcceptor
import io.rsocket.kotlin.RSocketRequestHandler import io.rsocket.kotlin.RSocketRequestHandler
import io.rsocket.kotlin.payload.Payload import io.rsocket.kotlin.payload.Payload
import io.rsocket.kotlin.payload.buildPayload
import io.rsocket.kotlin.payload.data
import io.rsocket.kotlin.transport.ktor.server.RSocketSupport import io.rsocket.kotlin.transport.ktor.server.RSocketSupport
import io.rsocket.kotlin.transport.ktor.server.rSocket import io.rsocket.kotlin.transport.ktor.server.rSocket
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
@ -42,7 +44,7 @@ internal fun CoroutineScope.magixAcceptor(magixFlow: MutableSharedFlow<GenericMa
val filter = magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText()) val filter = magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText())
magixFlow.filter(filter).map { message -> magixFlow.filter(filter).map { message ->
val string = magixJson.encodeToString(genericMessageSerializer, message) val string = magixJson.encodeToString(genericMessageSerializer, message)
Payload(string) buildPayload { data(string) }
} }
} }
fireAndForget { request: Payload -> fireAndForget { request: Payload ->
@ -57,7 +59,7 @@ internal fun CoroutineScope.magixAcceptor(magixFlow: MutableSharedFlow<GenericMa
magixFlow.map { message -> magixFlow.map { message ->
val string = magixJson.encodeToString(genericMessageSerializer, message) val string = magixJson.encodeToString(genericMessageSerializer, message)
Payload(string) buildPayload { data(string) }
} }
} }
} }

View File

@ -9,7 +9,8 @@ import io.ktor.util.KtorExperimentalAPI
import io.rsocket.kotlin.RSocket import io.rsocket.kotlin.RSocket
import io.rsocket.kotlin.core.RSocketConnector import io.rsocket.kotlin.core.RSocketConnector
import io.rsocket.kotlin.core.RSocketConnectorBuilder import io.rsocket.kotlin.core.RSocketConnectorBuilder
import io.rsocket.kotlin.payload.Payload import io.rsocket.kotlin.payload.buildPayload
import io.rsocket.kotlin.payload.data
import io.rsocket.kotlin.transport.ktor.client.RSocketSupport import io.rsocket.kotlin.transport.ktor.client.RSocketSupport
import io.rsocket.kotlin.transport.ktor.client.rSocket import io.rsocket.kotlin.transport.ktor.client.rSocket
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
@ -21,7 +22,7 @@ import kotlinx.serialization.encodeToString
public class RSocketMagixEndpoint( public class RSocketMagixEndpoint(
override val scope: CoroutineScope, override val scope: CoroutineScope,
public val rSocket: RSocket, private val rSocket: RSocket,
) : MagixEndpoint { ) : MagixEndpoint {
override fun <T> subscribe( override fun <T> subscribe(
@ -29,7 +30,9 @@ public class RSocketMagixEndpoint(
filter: MagixMessageFilter, filter: MagixMessageFilter,
): Flow<MagixMessage<T>> { ): Flow<MagixMessage<T>> {
val serializer = MagixMessage.serializer(payloadSerializer) val serializer = MagixMessage.serializer(payloadSerializer)
val payload = Payload(MagixEndpoint.magixJson.encodeToString(filter)) val payload = buildPayload {
data(MagixEndpoint.magixJson.encodeToString(filter))
}
val flow = rSocket.requestStream(payload) val flow = rSocket.requestStream(payload)
return flow.map { MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) } return flow.map { MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) }
} }
@ -37,7 +40,9 @@ public class RSocketMagixEndpoint(
override suspend fun <T> broadcast(payloadSerializer: KSerializer<T>, message: MagixMessage<T>) { override suspend fun <T> broadcast(payloadSerializer: KSerializer<T>, message: MagixMessage<T>) {
scope.launch { scope.launch {
val serializer = MagixMessage.serializer(payloadSerializer) val serializer = MagixMessage.serializer(payloadSerializer)
val payload = Payload(MagixEndpoint.magixJson.encodeToString(serializer, message)) val payload = buildPayload {
data(MagixEndpoint.magixJson.encodeToString(serializer, message))
}
rSocket.fireAndForget(payload) rSocket.fireAndForget(payload)
} }
} }