diff --git a/build.gradle.kts b/build.gradle.kts index ac98e5b..7794d5d 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -6,7 +6,7 @@ plugins { val dataforgeVersion: String by extra("0.2.0-dev-4") val ktorVersion: String by extra("1.4.1") -val rsocketVersion by extra("0.11.0-SNAPSHOT") +val rsocketVersion by extra("0.11.1") allprojects { repositories { @@ -17,7 +17,7 @@ allprojects { maven("https://maven.pkg.github.com/altavir/kotlin-logging/") maven("https://dl.bintray.com/rsocket-admin/RSocket") maven("https://maven.pkg.github.com/altavir/ktor-client-sse") - maven("https://oss.jfrog.org/oss-snapshot-local") +// maven("https://oss.jfrog.org/oss-snapshot-local") } group = "hep.dataforge" diff --git a/dataforge-device-core/build.gradle.kts b/dataforge-device-core/build.gradle.kts index 58f616c..1da9011 100644 --- a/dataforge-device-core/build.gradle.kts +++ b/dataforge-device-core/build.gradle.kts @@ -7,7 +7,7 @@ val dataforgeVersion: String by rootProject.extra val ktorVersion: String by rootProject.extra kscience { - useCoroutines() + useCoroutines("1.4.1") useSerialization{ json() } diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt index c61d43a..e60d02a 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt @@ -14,10 +14,10 @@ import hep.dataforge.names.Name import hep.dataforge.names.NameToken import hep.dataforge.names.toName import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.consumeAsFlow -import kotlinx.coroutines.isActive +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch @@ -27,20 +27,18 @@ public class HubController( public val scope: CoroutineScope, ) : Consumer, Responder { - private val messageOutbox = Channel(Channel.CONFLATED) - private val envelopeOutbox = Channel(Channel.CONFLATED) + private val messageOutbox = MutableSharedFlow() - public fun messageOutput(): Flow = messageOutbox.consumeAsFlow() + private val envelopeOutbox = MutableSharedFlow() - public fun envelopeOutput(): Flow = envelopeOutbox.consumeAsFlow() + public val messageOutput: SharedFlow get() = messageOutbox - private val packJob = scope.launch { - while (isActive) { - val message = messageOutbox.receive() - envelopeOutbox.send(message.toEnvelope()) - } - } + public val envelopeOutput: SharedFlow get() = envelopeOutbox + + private val packJob = messageOutbox.onEach { message -> + envelopeOutbox.emit(message.toEnvelope()) + }.launchIn(scope) private val listeners: Map = hub.devices.mapValues { (name, device) -> object : DeviceListener { @@ -53,8 +51,7 @@ public class HubController( key = propertyName, value = value ) - - messageOutbox.send(change) + messageOutbox.emit(change) } } }.also { @@ -74,7 +71,8 @@ public class HubController( val targetName = request.meta[DeviceMessage.TARGET_KEY].string?.toName() ?: Name.EMPTY val device = hub[targetName] ?: error("The device with name $targetName not found in $hub") if (request.data == null) { - DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.fromMeta(request.meta)).toEnvelope() + DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.fromMeta(request.meta)) + .toEnvelope() } else { DeviceController.respond(device, targetName.toString(), request) } diff --git a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt index f9561a1..cffe8ad 100644 --- a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt +++ b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt @@ -172,7 +172,7 @@ public fun Application.deviceModule( try { application.log.debug("Opened server socket for ${call.request.queryParameters}") - manager.controller.envelopeOutput().collect { + manager.controller.envelopeOutput.collect { outgoing.send(it.toFrame()) } diff --git a/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/magixClient.kt b/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/magixClient.kt index 8815c5c..31b556a 100644 --- a/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/magixClient.kt +++ b/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/magixClient.kt @@ -6,12 +6,13 @@ import hep.dataforge.control.controllers.respondMessage import hep.dataforge.magix.api.MagixEndpoint import hep.dataforge.magix.api.MagixMessage import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch -public const val DATAFORGE_FORMAT: String = "dataforge" +public const val DATAFORGE_MAGIX_FORMAT: String = "dataforge" private fun generateId(request: MagixMessage): String = if (request.id != null) { "${request.id}.response" @@ -24,29 +25,33 @@ private fun generateId(request: MagixMessage): String = if (reque */ public fun DeviceManager.launchMagixClient( endpoint: MagixEndpoint, - endpointID: String = "dataforge", + endpointID: String = DATAFORGE_MAGIX_FORMAT, ): Job = context.launch { endpoint.subscribe(DeviceMessage.serializer()).onEach { request -> //TODO analyze action val responsePayload = respondMessage(request.payload) val response = MagixMessage( - format = DATAFORGE_FORMAT, + format = DATAFORGE_MAGIX_FORMAT, id = generateId(request), parentId = request.id, origin = endpointID, payload = responsePayload ) endpoint.broadcast(DeviceMessage.serializer(), response) + }.catch { error -> + logger.error(error){"Error while responding to message"} }.launchIn(endpoint.scope) - controller.messageOutput().onEach { payload -> + controller.messageOutput.onEach { payload -> MagixMessage( - format = DATAFORGE_FORMAT, + format = DATAFORGE_MAGIX_FORMAT, id = "df[${payload.hashCode()}]", origin = endpointID, payload = payload ) + }.catch { error -> + logger.error(error){"Error while sending a message"} }.launchIn(endpoint.scope) } diff --git a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt b/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt index b747771..9560df8 100644 --- a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt +++ b/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt @@ -20,6 +20,8 @@ import io.ktor.websocket.WebSockets import io.rsocket.kotlin.ConnectionAcceptor import io.rsocket.kotlin.RSocketRequestHandler import io.rsocket.kotlin.payload.Payload +import io.rsocket.kotlin.payload.buildPayload +import io.rsocket.kotlin.payload.data import io.rsocket.kotlin.transport.ktor.server.RSocketSupport import io.rsocket.kotlin.transport.ktor.server.rSocket import kotlinx.coroutines.CoroutineScope @@ -42,7 +44,7 @@ internal fun CoroutineScope.magixAcceptor(magixFlow: MutableSharedFlow val string = magixJson.encodeToString(genericMessageSerializer, message) - Payload(string) + buildPayload { data(string) } } } fireAndForget { request: Payload -> @@ -57,7 +59,7 @@ internal fun CoroutineScope.magixAcceptor(magixFlow: MutableSharedFlow val string = magixJson.encodeToString(genericMessageSerializer, message) - Payload(string) + buildPayload { data(string) } } } } diff --git a/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt b/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt index 05b7cbf..2f1ad13 100644 --- a/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt +++ b/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt @@ -9,7 +9,8 @@ import io.ktor.util.KtorExperimentalAPI import io.rsocket.kotlin.RSocket import io.rsocket.kotlin.core.RSocketConnector import io.rsocket.kotlin.core.RSocketConnectorBuilder -import io.rsocket.kotlin.payload.Payload +import io.rsocket.kotlin.payload.buildPayload +import io.rsocket.kotlin.payload.data import io.rsocket.kotlin.transport.ktor.client.RSocketSupport import io.rsocket.kotlin.transport.ktor.client.rSocket import kotlinx.coroutines.CoroutineScope @@ -21,7 +22,7 @@ import kotlinx.serialization.encodeToString public class RSocketMagixEndpoint( override val scope: CoroutineScope, - public val rSocket: RSocket, + private val rSocket: RSocket, ) : MagixEndpoint { override fun subscribe( @@ -29,7 +30,9 @@ public class RSocketMagixEndpoint( filter: MagixMessageFilter, ): Flow> { val serializer = MagixMessage.serializer(payloadSerializer) - val payload = Payload(MagixEndpoint.magixJson.encodeToString(filter)) + val payload = buildPayload { + data(MagixEndpoint.magixJson.encodeToString(filter)) + } val flow = rSocket.requestStream(payload) return flow.map { MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) } } @@ -37,7 +40,9 @@ public class RSocketMagixEndpoint( override suspend fun broadcast(payloadSerializer: KSerializer, message: MagixMessage) { scope.launch { val serializer = MagixMessage.serializer(payloadSerializer) - val payload = Payload(MagixEndpoint.magixJson.encodeToString(serializer, message)) + val payload = buildPayload { + data(MagixEndpoint.magixJson.encodeToString(serializer, message)) + } rSocket.fireAndForget(payload) } }