Add heartbeat and watchdog

This commit is contained in:
Alexander Nozik 2024-03-12 22:26:43 +03:00
parent 4835376c0d
commit 29af4dfb2c
6 changed files with 113 additions and 3 deletions

View File

@ -0,0 +1,26 @@
import space.kscience.gradle.Maturity
plugins {
id("space.kscience.gradle.mpp")
`maven-publish`
}
description = """
Common utilities and services for Magix endpoints.
""".trimIndent()
val dataforgeVersion: String by rootProject.extra
kscience {
jvm()
js()
native()
commonMain {
api(projects.magix.magixApi)
api("space.kscience:dataforge-meta:$dataforgeVersion")
}
}
readme {
maturity = Maturity.EXPERIMENTAL
}

View File

@ -127,11 +127,11 @@ public fun CoroutineScope.launchMagixRegistry(
* *
* If [registryEndpoint] field is provided, send request only to given endpoint. * If [registryEndpoint] field is provided, send request only to given endpoint.
* *
* @param endpointName the name of endpoint requesting a property * @param sourceEndpoint the name of endpoint requesting a property
*/ */
public suspend fun MagixEndpoint.getProperty( public suspend fun MagixEndpoint.getProperty(
propertyName: String, propertyName: String,
endpointName: String, sourceEndpoint: String,
user: JsonElement? = null, user: JsonElement? = null,
registryEndpoint: String? = null, registryEndpoint: String? = null,
): Flow<Pair<String, JsonElement>> = subscribe( ): Flow<Pair<String, JsonElement>> = subscribe(
@ -146,7 +146,7 @@ public suspend fun MagixEndpoint.getProperty(
send( send(
MagixRegistryMessage.format, MagixRegistryMessage.format,
MagixRegistryRequestMessage(propertyName), MagixRegistryRequestMessage(propertyName),
source = endpointName, source = sourceEndpoint,
target = registryEndpoint, target = registryEndpoint,
user = user user = user
) )

View File

@ -0,0 +1,82 @@
package space.kscience.magix.services
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.onEach
import kotlinx.serialization.json.JsonNull
import kotlinx.serialization.json.JsonPrimitive
import kotlinx.serialization.json.jsonPrimitive
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.MagixMessageFilter
import space.kscience.magix.api.send
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
public class WatcherEndpointWrapper(
private val scope: CoroutineScope,
private val endpointName: String,
private val endpoint: MagixEndpoint,
private val meta: Meta,
) : MagixEndpoint {
private val watchDogJob: Job = scope.launch {
val filter = MagixMessageFilter(
format = listOf(MAGIX_WATCHDOG_FORMAT),
target = listOf(null, endpointName)
)
endpoint.subscribe(filter).filter {
it.payload.jsonPrimitive.content == MAGIX_PING
}.onEach { request ->
endpoint.send(
MagixMessage(
MAGIX_WATCHDOG_FORMAT,
JsonPrimitive(MAGIX_PONG),
sourceEndpoint = endpointName,
targetEndpoint = request.sourceEndpoint,
parentId = request.id
)
)
}.collect()
}
private val heartBeatDelay: Duration = meta["heartbeat.period"].string?.let { Duration.parse(it) } ?: 10.seconds
//TODO add update from registry
private val heartBeatJob = scope.launch {
while (isActive){
delay(heartBeatDelay)
endpoint.send(
MagixMessage(
MAGIX_HEARTBEAT_FORMAT,
JsonNull, //TODO consider adding timestamp
endpointName
)
)
}
}
override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage> = endpoint.subscribe(filter)
override suspend fun broadcast(message: MagixMessage) {
endpoint.broadcast(message)
}
override fun close() {
endpoint.close()
watchDogJob.cancel()
heartBeatJob.cancel()
}
public companion object {
public const val MAGIX_WATCHDOG_FORMAT: String = "magix.watchdog"
public const val MAGIX_PING: String = "ping"
public const val MAGIX_PONG: String = "pong"
public const val MAGIX_HEARTBEAT_FORMAT: String = "magix.heartbeat"
}
}

View File

@ -47,6 +47,7 @@ include(
":controls-server", ":controls-server",
":controls-opcua", ":controls-opcua",
":controls-modbus", ":controls-modbus",
":controls-plc4x",
// ":controls-mongo", // ":controls-mongo",
":controls-storage", ":controls-storage",
":controls-storage:controls-xodus", ":controls-storage:controls-xodus",
@ -55,6 +56,7 @@ include(
":controls-jupyter", ":controls-jupyter",
":magix", ":magix",
":magix:magix-api", ":magix:magix-api",
":magix:magix-utils",
":magix:magix-server", ":magix:magix-server",
":magix:magix-rsocket", ":magix:magix-rsocket",
":magix:magix-java-endpoint", ":magix:magix-java-endpoint",