diff --git a/magix/magix-utils/build.gradle.kts b/magix/magix-utils/build.gradle.kts new file mode 100644 index 0000000..1f0ca20 --- /dev/null +++ b/magix/magix-utils/build.gradle.kts @@ -0,0 +1,26 @@ +import space.kscience.gradle.Maturity + +plugins { + id("space.kscience.gradle.mpp") + `maven-publish` +} + +description = """ + Common utilities and services for Magix endpoints. +""".trimIndent() + +val dataforgeVersion: String by rootProject.extra + +kscience { + jvm() + js() + native() + commonMain { + api(projects.magix.magixApi) + api("space.kscience:dataforge-meta:$dataforgeVersion") + } +} + +readme { + maturity = Maturity.EXPERIMENTAL +} \ No newline at end of file diff --git a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/services/MagixRegistry.kt b/magix/magix-utils/src/commonMain/kotlin/space/kscience/magix/services/MagixRegistry.kt similarity index 97% rename from magix/magix-api/src/commonMain/kotlin/space/kscience/magix/services/MagixRegistry.kt rename to magix/magix-utils/src/commonMain/kotlin/space/kscience/magix/services/MagixRegistry.kt index 3272131..c1a9466 100644 --- a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/services/MagixRegistry.kt +++ b/magix/magix-utils/src/commonMain/kotlin/space/kscience/magix/services/MagixRegistry.kt @@ -127,11 +127,11 @@ public fun CoroutineScope.launchMagixRegistry( * * If [registryEndpoint] field is provided, send request only to given endpoint. * - * @param endpointName the name of endpoint requesting a property + * @param sourceEndpoint the name of endpoint requesting a property */ public suspend fun MagixEndpoint.getProperty( propertyName: String, - endpointName: String, + sourceEndpoint: String, user: JsonElement? = null, registryEndpoint: String? = null, ): Flow> = subscribe( @@ -146,7 +146,7 @@ public suspend fun MagixEndpoint.getProperty( send( MagixRegistryMessage.format, MagixRegistryRequestMessage(propertyName), - source = endpointName, + source = sourceEndpoint, target = registryEndpoint, user = user ) diff --git a/magix/magix-utils/src/commonMain/kotlin/space/kscience/magix/services/WatcherEndpointWrapper.kt b/magix/magix-utils/src/commonMain/kotlin/space/kscience/magix/services/WatcherEndpointWrapper.kt new file mode 100644 index 0000000..8560c79 --- /dev/null +++ b/magix/magix-utils/src/commonMain/kotlin/space/kscience/magix/services/WatcherEndpointWrapper.kt @@ -0,0 +1,82 @@ +package space.kscience.magix.services + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.onEach +import kotlinx.serialization.json.JsonNull +import kotlinx.serialization.json.JsonPrimitive +import kotlinx.serialization.json.jsonPrimitive +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.get +import space.kscience.dataforge.meta.string +import space.kscience.magix.api.MagixEndpoint +import space.kscience.magix.api.MagixMessage +import space.kscience.magix.api.MagixMessageFilter +import space.kscience.magix.api.send +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds + +public class WatcherEndpointWrapper( + private val scope: CoroutineScope, + private val endpointName: String, + private val endpoint: MagixEndpoint, + private val meta: Meta, +) : MagixEndpoint { + + private val watchDogJob: Job = scope.launch { + val filter = MagixMessageFilter( + format = listOf(MAGIX_WATCHDOG_FORMAT), + target = listOf(null, endpointName) + ) + endpoint.subscribe(filter).filter { + it.payload.jsonPrimitive.content == MAGIX_PING + }.onEach { request -> + endpoint.send( + MagixMessage( + MAGIX_WATCHDOG_FORMAT, + JsonPrimitive(MAGIX_PONG), + sourceEndpoint = endpointName, + targetEndpoint = request.sourceEndpoint, + parentId = request.id + ) + ) + }.collect() + } + + private val heartBeatDelay: Duration = meta["heartbeat.period"].string?.let { Duration.parse(it) } ?: 10.seconds + //TODO add update from registry + + private val heartBeatJob = scope.launch { + while (isActive){ + delay(heartBeatDelay) + endpoint.send( + MagixMessage( + MAGIX_HEARTBEAT_FORMAT, + JsonNull, //TODO consider adding timestamp + endpointName + ) + ) + } + } + + override fun subscribe(filter: MagixMessageFilter): Flow = endpoint.subscribe(filter) + + override suspend fun broadcast(message: MagixMessage) { + endpoint.broadcast(message) + } + + override fun close() { + endpoint.close() + watchDogJob.cancel() + heartBeatJob.cancel() + } + + public companion object { + public const val MAGIX_WATCHDOG_FORMAT: String = "magix.watchdog" + public const val MAGIX_PING: String = "ping" + public const val MAGIX_PONG: String = "pong" + public const val MAGIX_HEARTBEAT_FORMAT: String = "magix.heartbeat" + } +} \ No newline at end of file diff --git a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/services/converters.kt b/magix/magix-utils/src/commonMain/kotlin/space/kscience/magix/services/converters.kt similarity index 100% rename from magix/magix-api/src/commonMain/kotlin/space/kscience/magix/services/converters.kt rename to magix/magix-utils/src/commonMain/kotlin/space/kscience/magix/services/converters.kt diff --git a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/services/magixPortal.kt b/magix/magix-utils/src/commonMain/kotlin/space/kscience/magix/services/magixPortal.kt similarity index 100% rename from magix/magix-api/src/commonMain/kotlin/space/kscience/magix/services/magixPortal.kt rename to magix/magix-utils/src/commonMain/kotlin/space/kscience/magix/services/magixPortal.kt diff --git a/settings.gradle.kts b/settings.gradle.kts index 7de4241..01bb379 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -47,6 +47,7 @@ include( ":controls-server", ":controls-opcua", ":controls-modbus", + ":controls-plc4x", // ":controls-mongo", ":controls-storage", ":controls-storage:controls-xodus", @@ -55,6 +56,7 @@ include( ":controls-jupyter", ":magix", ":magix:magix-api", + ":magix:magix-utils", ":magix:magix-server", ":magix:magix-rsocket", ":magix:magix-java-endpoint",