Add RPC for devices

This commit is contained in:
Alexander Nozik 2023-05-07 21:04:08 +03:00
parent 4c33c16c94
commit 53e506893f
6 changed files with 128 additions and 9 deletions

View File

@ -125,12 +125,15 @@ public data class DescriptionMessage(
/** /**
* A request to execute an action. [targetDevice] is mandatory * A request to execute an action. [targetDevice] is mandatory
*
* @param requestId action request id that should be returned in a response
*/ */
@Serializable @Serializable
@SerialName("action.execute") @SerialName("action.execute")
public data class ActionExecuteMessage( public data class ActionExecuteMessage(
public val action: String, public val action: String,
public val argument: Meta?, public val argument: Meta?,
public val requestId: String,
override val sourceDevice: Name? = null, override val sourceDevice: Name? = null,
override val targetDevice: Name, override val targetDevice: Name,
override val comment: String? = null, override val comment: String? = null,
@ -141,12 +144,15 @@ public data class ActionExecuteMessage(
/** /**
* Asynchronous action result. [sourceDevice] is mandatory * Asynchronous action result. [sourceDevice] is mandatory
*
* @param requestId request id passed in the request
*/ */
@Serializable @Serializable
@SerialName("action.result") @SerialName("action.result")
public data class ActionResultMessage( public data class ActionResultMessage(
public val action: String, public val action: String,
public val result: Meta?, public val result: Meta?,
public val requestId: String,
override val sourceDevice: Name, override val sourceDevice: Name,
override val targetDevice: Name? = null, override val targetDevice: Name? = null,
override val comment: String? = null, override val comment: String? = null,

View File

@ -41,6 +41,7 @@ public suspend fun Device.respondMessage(deviceTarget: Name, request: DeviceMess
ActionResultMessage( ActionResultMessage(
action = request.action, action = request.action,
result = execute(request.action, request.argument), result = execute(request.action, request.argument),
requestId = request.requestId,
sourceDevice = deviceTarget, sourceDevice = deviceTarget,
targetDevice = request.sourceDevice targetDevice = request.sourceDevice
) )

View File

@ -3,6 +3,10 @@ plugins {
`maven-publish` `maven-publish`
} }
description = """
Magix service for binding controls devices (both as RPC client and server
""".trimIndent()
kscience { kscience {
jvm() jvm()
js() js()
@ -10,7 +14,12 @@ kscience{
json() json()
} }
dependencies { dependencies {
implementation(project(":magix:magix-rsocket")) implementation(projects.magix.magixApi)
implementation(project(":controls-core")) implementation(projects.controlsCore)
implementation("com.benasher44:uuid:0.7.0")
} }
} }
readme {
}

View File

@ -0,0 +1,107 @@
package space.kscience.controls.client
import com.benasher44.uuid.uuid4
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.newCoroutineContext
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import space.kscience.controls.api.*
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.broadcast
import space.kscience.magix.api.subscribe
import kotlin.coroutines.CoroutineContext
private fun stringUID() = uuid4().leastSignificantBits.toString(16)
/**
* An implementation of device via RPC
*/
public class DeviceClient(
override val context: Context,
private val deviceName: Name,
incomingFlow: Flow<DeviceMessage>,
private val send: suspend (DeviceMessage) -> Unit,
) : Device {
override val coroutineContext: CoroutineContext = newCoroutineContext(context.coroutineContext)
private val mutex = Mutex()
private val propertyCache = HashMap<String, Meta>()
override var propertyDescriptors: Collection<PropertyDescriptor> = emptyList()
private set
override var actionDescriptors: Collection<ActionDescriptor> = emptyList()
private set
private val flowInternal = incomingFlow.filter {
it.sourceDevice == deviceName
}.shareIn(this, started = SharingStarted.Eagerly).also {
it.onEach { message ->
when (message) {
is PropertyChangedMessage -> mutex.withLock {
propertyCache[message.property] = message.value
}
is DescriptionMessage -> mutex.withLock {
propertyDescriptors = message.properties
actionDescriptors = message.actions
}
else -> {
//ignore
}
}
}.launchIn(this)
}
override val messageFlow: Flow<DeviceMessage> get() = flowInternal
override suspend fun readProperty(propertyName: String): Meta {
send(
PropertyGetMessage(propertyName, targetDevice = deviceName)
)
return flowInternal.filterIsInstance<PropertyChangedMessage>().first {
it.property == propertyName
}.value
}
override fun getProperty(propertyName: String): Meta? = propertyCache[propertyName]
override suspend fun invalidate(propertyName: String) {
mutex.withLock {
propertyCache.remove(propertyName)
}
}
override suspend fun writeProperty(propertyName: String, value: Meta) {
send(
PropertySetMessage(propertyName, value, targetDevice = deviceName)
)
}
override suspend fun execute(actionName: String, argument: Meta?): Meta? {
val id = stringUID()
send(
ActionExecuteMessage(actionName, argument, id, targetDevice = deviceName)
)
return flowInternal.filterIsInstance<ActionResultMessage>().first {
it.action == actionName && it.requestId == id
}.result
}
}
/**
* Connect to a remote device via this client.
*/
public fun MagixEndpoint.remoteDevice(context: Context, magixTarget: String, deviceName: Name): DeviceClient {
val subscription = subscribe(controlsMagixFormat, originFilter = listOf(magixTarget)).map { it.second }
return DeviceClient(context, deviceName, subscription) {
broadcast(controlsMagixFormat, it, magixTarget, id = stringUID())
}
}

View File

@ -11,8 +11,8 @@ val dataforgeVersion: String by rootProject.extra
val ktorVersion: String by rootProject.extra val ktorVersion: String by rootProject.extra
dependencies { dependencies {
implementation(project(":controls-core")) implementation(projects.controlsCore)
implementation(project(":controls-ktor-tcp")) implementation(projects.controlsKtorTcp)
implementation(projects.magix.magixServer) implementation(projects.magix.magixServer)
implementation("io.ktor:ktor-server-cio:$ktorVersion") implementation("io.ktor:ktor-server-cio:$ktorVersion")
implementation("io.ktor:ktor-server-websockets:$ktorVersion") implementation("io.ktor:ktor-server-websockets:$ktorVersion")

View File

@ -43,10 +43,6 @@ import space.kscience.magix.server.magixModule
private fun Application.deviceServerModule(manager: DeviceManager) { private fun Application.deviceServerModule(manager: DeviceManager) {
install(WebSockets)
// install(CORS) {
// anyHost()
// }
install(StatusPages) { install(StatusPages) {
exception<IllegalArgumentException> { call, cause -> exception<IllegalArgumentException> { call, cause ->
call.respond(HttpStatusCode.BadRequest, cause.message ?: "") call.respond(HttpStatusCode.BadRequest, cause.message ?: "")