From 53e506893f517bcadbef3edf06fc54adef04aa32 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sun, 7 May 2023 21:04:08 +0300 Subject: [PATCH] Add RPC for devices --- .../kscience/controls/api/DeviceMessage.kt | 6 + .../controls/manager/respondMessage.kt | 1 + controls-magix-client/build.gradle.kts | 15 ++- .../kscience/controls/client/DeviceClient.kt | 107 ++++++++++++++++++ controls-server/build.gradle.kts | 4 +- .../controls/server/deviceWebServer.kt | 4 - 6 files changed, 128 insertions(+), 9 deletions(-) create mode 100644 controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/api/DeviceMessage.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/api/DeviceMessage.kt index 80a7f9d..0b4131e 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/api/DeviceMessage.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/api/DeviceMessage.kt @@ -125,12 +125,15 @@ public data class DescriptionMessage( /** * A request to execute an action. [targetDevice] is mandatory + * + * @param requestId action request id that should be returned in a response */ @Serializable @SerialName("action.execute") public data class ActionExecuteMessage( public val action: String, public val argument: Meta?, + public val requestId: String, override val sourceDevice: Name? = null, override val targetDevice: Name, override val comment: String? = null, @@ -141,12 +144,15 @@ public data class ActionExecuteMessage( /** * Asynchronous action result. [sourceDevice] is mandatory + * + * @param requestId request id passed in the request */ @Serializable @SerialName("action.result") public data class ActionResultMessage( public val action: String, public val result: Meta?, + public val requestId: String, override val sourceDevice: Name, override val targetDevice: Name? = null, override val comment: String? = null, diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/manager/respondMessage.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/manager/respondMessage.kt index 4db7894..13d072c 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/manager/respondMessage.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/manager/respondMessage.kt @@ -41,6 +41,7 @@ public suspend fun Device.respondMessage(deviceTarget: Name, request: DeviceMess ActionResultMessage( action = request.action, result = execute(request.action, request.argument), + requestId = request.requestId, sourceDevice = deviceTarget, targetDevice = request.sourceDevice ) diff --git a/controls-magix-client/build.gradle.kts b/controls-magix-client/build.gradle.kts index a94e770..0f108f4 100644 --- a/controls-magix-client/build.gradle.kts +++ b/controls-magix-client/build.gradle.kts @@ -3,14 +3,23 @@ plugins { `maven-publish` } -kscience{ +description = """ + Magix service for binding controls devices (both as RPC client and server +""".trimIndent() + +kscience { jvm() js() useSerialization { json() } dependencies { - implementation(project(":magix:magix-rsocket")) - implementation(project(":controls-core")) + implementation(projects.magix.magixApi) + implementation(projects.controlsCore) + implementation("com.benasher44:uuid:0.7.0") } +} + +readme { + } \ No newline at end of file diff --git a/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt b/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt new file mode 100644 index 0000000..16fd4b0 --- /dev/null +++ b/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt @@ -0,0 +1,107 @@ +package space.kscience.controls.client + +import com.benasher44.uuid.uuid4 +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.newCoroutineContext +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import space.kscience.controls.api.* +import space.kscience.dataforge.context.Context +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.names.Name +import space.kscience.magix.api.MagixEndpoint +import space.kscience.magix.api.broadcast +import space.kscience.magix.api.subscribe +import kotlin.coroutines.CoroutineContext + +private fun stringUID() = uuid4().leastSignificantBits.toString(16) + +/** + * An implementation of device via RPC + */ +public class DeviceClient( + override val context: Context, + private val deviceName: Name, + incomingFlow: Flow, + private val send: suspend (DeviceMessage) -> Unit, +) : Device { + + override val coroutineContext: CoroutineContext = newCoroutineContext(context.coroutineContext) + + private val mutex = Mutex() + + private val propertyCache = HashMap() + + override var propertyDescriptors: Collection = emptyList() + private set + + override var actionDescriptors: Collection = emptyList() + private set + + private val flowInternal = incomingFlow.filter { + it.sourceDevice == deviceName + }.shareIn(this, started = SharingStarted.Eagerly).also { + it.onEach { message -> + when (message) { + is PropertyChangedMessage -> mutex.withLock { + propertyCache[message.property] = message.value + } + + is DescriptionMessage -> mutex.withLock { + propertyDescriptors = message.properties + actionDescriptors = message.actions + } + + else -> { + //ignore + } + } + }.launchIn(this) + } + + override val messageFlow: Flow get() = flowInternal + + + override suspend fun readProperty(propertyName: String): Meta { + send( + PropertyGetMessage(propertyName, targetDevice = deviceName) + ) + return flowInternal.filterIsInstance().first { + it.property == propertyName + }.value + } + + override fun getProperty(propertyName: String): Meta? = propertyCache[propertyName] + + override suspend fun invalidate(propertyName: String) { + mutex.withLock { + propertyCache.remove(propertyName) + } + } + + override suspend fun writeProperty(propertyName: String, value: Meta) { + send( + PropertySetMessage(propertyName, value, targetDevice = deviceName) + ) + } + + override suspend fun execute(actionName: String, argument: Meta?): Meta? { + val id = stringUID() + send( + ActionExecuteMessage(actionName, argument, id, targetDevice = deviceName) + ) + return flowInternal.filterIsInstance().first { + it.action == actionName && it.requestId == id + }.result + } +} + +/** + * Connect to a remote device via this client. + */ +public fun MagixEndpoint.remoteDevice(context: Context, magixTarget: String, deviceName: Name): DeviceClient { + val subscription = subscribe(controlsMagixFormat, originFilter = listOf(magixTarget)).map { it.second } + return DeviceClient(context, deviceName, subscription) { + broadcast(controlsMagixFormat, it, magixTarget, id = stringUID()) + } +} \ No newline at end of file diff --git a/controls-server/build.gradle.kts b/controls-server/build.gradle.kts index 0553b72..d661459 100644 --- a/controls-server/build.gradle.kts +++ b/controls-server/build.gradle.kts @@ -11,8 +11,8 @@ val dataforgeVersion: String by rootProject.extra val ktorVersion: String by rootProject.extra dependencies { - implementation(project(":controls-core")) - implementation(project(":controls-ktor-tcp")) + implementation(projects.controlsCore) + implementation(projects.controlsKtorTcp) implementation(projects.magix.magixServer) implementation("io.ktor:ktor-server-cio:$ktorVersion") implementation("io.ktor:ktor-server-websockets:$ktorVersion") diff --git a/controls-server/src/main/kotlin/space/kscience/controls/server/deviceWebServer.kt b/controls-server/src/main/kotlin/space/kscience/controls/server/deviceWebServer.kt index 65ffb07..ba63583 100644 --- a/controls-server/src/main/kotlin/space/kscience/controls/server/deviceWebServer.kt +++ b/controls-server/src/main/kotlin/space/kscience/controls/server/deviceWebServer.kt @@ -43,10 +43,6 @@ import space.kscience.magix.server.magixModule private fun Application.deviceServerModule(manager: DeviceManager) { - install(WebSockets) -// install(CORS) { -// anyHost() -// } install(StatusPages) { exception { call, cause -> call.respond(HttpStatusCode.BadRequest, cause.message ?: "")