ControlsMagix refactor

This commit is contained in:
Alexander Nozik 2023-08-18 20:17:23 +03:00
parent a9f29f92ca
commit 641daec7e9
18 changed files with 104 additions and 58 deletions

View File

@ -4,7 +4,7 @@ plugins {
}
description = """
Magix service for binding controls devices (both as RPC client and server
Magix service for binding controls devices (both as RPC client and server)
""".trimIndent()
kscience {
@ -16,7 +16,7 @@ kscience {
dependencies {
api(projects.magix.magixApi)
api(projects.controlsCore)
api("com.benasher44:uuid:0.7.0")
api("com.benasher44:uuid:0.8.0")
}
}

View File

@ -19,7 +19,7 @@ import kotlin.coroutines.CoroutineContext
private fun stringUID() = uuid4().leastSignificantBits.toString(16)
/**
* An implementation of device via RPC
* A remote accessible device that relies on connection via Magix
*/
public class DeviceClient(
override val context: Context,
@ -103,11 +103,15 @@ public class DeviceClient(
}
/**
* Connect to a remote device via this client.
* Connect to a remote device via this endpoint.
*
* @param context a [Context] to run device in
* @param endpointName the name of endpoint in Magix to connect to
* @param deviceName the name of device within endpoint
*/
public fun MagixEndpoint.remoteDevice(context: Context, magixTarget: String, deviceName: Name): DeviceClient {
val subscription = subscribe(DeviceManager.magixFormat, originFilter = listOf(magixTarget)).map { it.second }
public fun MagixEndpoint.remoteDevice(context: Context, endpointName: String, deviceName: Name): DeviceClient {
val subscription = subscribe(DeviceManager.magixFormat, originFilter = listOf(endpointName)).map { it.second }
return DeviceClient(context, deviceName, subscription) {
send(DeviceManager.magixFormat, it, magixTarget, id = stringUID())
send(DeviceManager.magixFormat, it, endpointName, id = stringUID())
}
}

View File

@ -33,7 +33,7 @@ internal fun generateId(request: MagixMessage): String = if (request.id != null)
/**
* Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1)
*/
public fun DeviceManager.connectToMagix(
public fun DeviceManager.launchMagixService(
endpoint: MagixEndpoint,
endpointID: String = controlsMagixFormat.defaultFormat,
): Job = context.launch {
@ -42,9 +42,9 @@ public fun DeviceManager.connectToMagix(
if (responsePayload != null) {
endpoint.send(
format = controlsMagixFormat,
target = request.sourceEndpoint,
origin = endpointID,
payload = responsePayload,
source = endpointID,
target = request.sourceEndpoint,
id = generateId(request),
parentId = request.id
)
@ -56,8 +56,8 @@ public fun DeviceManager.connectToMagix(
hubMessageFlow(this).onEach { payload ->
endpoint.send(
format = controlsMagixFormat,
origin = endpointID,
payload = payload,
source = endpointID,
id = "df[${payload.hashCode()}]"
)
}.catch { error ->

View File

@ -77,10 +77,10 @@ public fun DeviceManager.launchTangoMagix(
suspend fun respond(request: MagixMessage, payload: TangoPayload, payloadBuilder: (TangoPayload) -> TangoPayload) {
endpoint.send(
tangoMagixFormat,
payload = payloadBuilder(payload),
source = endpointID,
id = generateId(request),
parentId = request.id,
origin = endpointID,
payload = payloadBuilder(payload)
parentId = request.id
)
}
@ -127,10 +127,10 @@ public fun DeviceManager.launchTangoMagix(
logger.error(ex) { "Error while responding to message" }
endpoint.send(
tangoMagixFormat,
payload = payload.copy(quality = TangoQuality.WARNING),
source = endpointID,
id = generateId(request),
parentId = request.id,
origin = endpointID,
payload = payload.copy(quality = TangoQuality.WARNING)
parentId = request.id
)
}
}.launchIn(this)

View File

@ -13,7 +13,7 @@ kscience{
}
dependencies(jvmMain){
api(projects.magix.magixApi)
api(projects.controlsMagixClient)
api(projects.controlsMagix)
api(projects.magix.magixServer)
}
}

View File

@ -17,7 +17,7 @@ dependencies {
implementation(projects.controlsCore)
//implementation(projects.controlsServer)
implementation(projects.magix.magixServer)
implementation(projects.controlsMagixClient)
implementation(projects.controlsMagix)
implementation(projects.magix.magixRsocket)
implementation(projects.magix.magixZmq)
implementation(projects.controlsOpcua)

View File

@ -8,7 +8,7 @@ import javafx.stage.Stage
import kotlinx.coroutines.launch
import org.eclipse.milo.opcua.sdk.server.OpcUaServer
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText
import space.kscience.controls.client.connectToMagix
import space.kscience.controls.client.launchMagixService
import space.kscience.controls.demo.DemoDevice.Companion.cosScale
import space.kscience.controls.demo.DemoDevice.Companion.sinScale
import space.kscience.controls.demo.DemoDevice.Companion.timeScale
@ -59,7 +59,7 @@ class DemoController : Controller(), ContextAware {
)
//Launch a device client and connect it to the server
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost")
deviceManager.connectToMagix(deviceEndpoint)
deviceManager.launchMagixService(deviceEndpoint)
//connect visualization to a magix endpoint
val visualEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
visualizer = startDemoDeviceServer(visualEndpoint)

View File

@ -19,7 +19,7 @@ dependencies {
implementation(projects.magix.magixServer)
implementation(projects.magix.magixRsocket)
implementation(projects.magix.magixZmq)
implementation(projects.controlsMagixClient)
implementation(projects.controlsMagix)
implementation(projects.controlsStorage.controlsXodus)
implementation(projects.magix.magixStorage.magixStorageXodus)
// implementation(projects.controlsMongo)

View File

@ -8,7 +8,7 @@ import javafx.scene.layout.Priority
import javafx.stage.Stage
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import space.kscience.controls.client.connectToMagix
import space.kscience.controls.client.launchMagixService
import space.kscience.controls.demo.car.IVirtualCar.Companion.acceleration
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.install
@ -63,7 +63,7 @@ class VirtualCarController : Controller(), ContextAware {
//mongoStorageJob = deviceManager.storeMessages(DefaultAsynchronousMongoClientFactory)
//Launch device client and connect it to the server
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost")
deviceManager.connectToMagix(deviceEndpoint)
deviceManager.launchMagixService(deviceEndpoint)
}
}

View File

@ -14,7 +14,7 @@ val rsocketVersion: String by rootProject.extra
dependencies {
implementation(projects.magix.magixServer)
implementation(projects.controlsMagixClient)
implementation(projects.controlsMagix)
implementation(projects.magix.magixRsocket)
implementation(projects.magix.magixZmq)

View File

@ -6,7 +6,7 @@ import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Clock
import space.kscience.controls.client.connectToMagix
import space.kscience.controls.client.launchMagixService
import space.kscience.controls.client.magixFormat
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.install
@ -78,7 +78,7 @@ suspend fun main() {
val endpointId = "device$it"
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost")
deviceManager.connectToMagix(deviceEndpoint, endpointId)
deviceManager.launchMagixService(deviceEndpoint, endpointId)
}
}

View File

@ -24,6 +24,6 @@ val dataforgeVersion: String by extra
dependencies {
implementation(project(":controls-ports-ktor"))
implementation(project(":controls-magix-client"))
implementation(projects.controlsMagix)
implementation("no.tornado:tornadofx:1.7.20")
}

View File

@ -41,16 +41,16 @@ public fun <T> MagixEndpoint.subscribe(
public suspend fun <T> MagixEndpoint.send(
format: MagixFormat<T>,
payload: T,
source: String,
target: String? = null,
id: String? = null,
parentId: String? = null,
user: JsonElement? = null,
origin: String = format.defaultFormat,
) {
val message = MagixMessage(
format = format.defaultFormat,
payload = magixJson.encodeToJsonElement(format.serializer, payload),
sourceEndpoint = origin,
sourceEndpoint = source,
targetEndpoint = target,
id = id,
parentId = parentId,

View File

@ -20,13 +20,17 @@ import space.kscience.magix.api.subscribe
*/
public interface MagixRegistry {
/**
* Request a property with name [propertyName] and user authentication data [user].
* Request a property with name [propertyName].
*
* Return a property value in its generic form or null if it is not present.
*
* Throw exception access is denied, or request failed.
*/
public suspend fun request(propertyName: String, user: JsonElement? = null): JsonElement?
public suspend fun get(propertyName: String): JsonElement?
}
public interface MutableMagixRegistry {
public suspend fun set(propertyName: String, value: JsonElement?, user: JsonElement?)
}
@Serializable
@ -48,7 +52,7 @@ public class MagixRegistryRequestMessage(
@SerialName("registry.value")
public class MagixRegistryValueMessage(
override val propertyName: String,
public val value: JsonElement,
public val value: JsonElement?,
) : MagixRegistryMessage()
@Serializable
@ -59,29 +63,58 @@ public class MagixRegistryErrorMessage(
public val errorMessage: String? = null,
) : MagixRegistryMessage()
@Serializable
@SerialName("registry.modify")
public class MagixRegistryModifyMessage(
override val propertyName: String,
public val value: JsonElement,
) : MagixRegistryMessage()
/**
* Launch a magix registry loop service based on local registry
*/
public fun CoroutineScope.launchMagixRegistry(
endpointName: String,
endpoint: MagixEndpoint,
registry: MagixRegistry,
originFilter: Collection<String>? = null,
targetFilter: Collection<String>? = null,
): Job = endpoint.subscribe(MagixRegistryMessage.format, originFilter, targetFilter)
.onEach { (magixMessage, payload) ->
if (payload is MagixRegistryRequestMessage) {
try {
val value = registry.request(payload.propertyName, magixMessage.user)
endpoint.send(
MagixRegistryMessage.format,
MagixRegistryValueMessage(payload.propertyName, value ?: JsonNull)
)
} catch (ex: Exception) {
endpoint.send(
MagixRegistryMessage.format,
MagixRegistryErrorMessage(payload.propertyName, ex::class.simpleName, ex.message)
)
try {
when {
payload is MagixRegistryRequestMessage -> {
endpoint.send(
MagixRegistryMessage.format,
MagixRegistryValueMessage(payload.propertyName, registry.get(payload.propertyName) ?: JsonNull),
source = endpointName,
target = magixMessage.sourceEndpoint,
parentId = magixMessage.id
)
}
payload is MagixRegistryModifyMessage && registry is MutableMagixRegistry -> {
registry.set(payload.propertyName, payload.value, magixMessage.user)
// Broadcast updates. Do not set target
endpoint.send(
MagixRegistryMessage.format,
MagixRegistryValueMessage(
payload.propertyName,
registry.get(payload.propertyName)
),
source = endpointName,
parentId = magixMessage.id
)
}
}
} catch (ex: Exception) {
endpoint.send(
MagixRegistryMessage.format,
MagixRegistryErrorMessage(payload.propertyName, ex::class.simpleName, ex.message),
source = endpointName,
target = magixMessage.sourceEndpoint,
parentId = magixMessage.id
)
}
}.launchIn(this)
@ -92,20 +125,29 @@ public fun CoroutineScope.launchMagixRegistry(
* The subscriber can terminate the flow at any moment to stop subscription, or use it indefinitely to continue observing changes.
* To request a single value, use [Flow.first] function.
*
* If [targetEndpoint] field is provided, send request only to given endpoint.
* If [registryEndpoint] field is provided, send request only to given endpoint.
*
* @param endpointName the name of endpoint requesting a property
*/
public suspend fun MagixEndpoint.getProperty(
propertyName: String,
endpointName: String,
user: JsonElement? = null,
targetEndpoint: String? = null,
): Flow<Pair<String, JsonElement>> {
send(MagixRegistryMessage.format, MagixRegistryRequestMessage(propertyName), target = targetEndpoint, user = user)
return subscribe(
registryEndpoint: String? = null,
): Flow<Pair<String, JsonElement>> = subscribe(
MagixRegistryMessage.format,
originFilter = registryEndpoint?.let { setOf(it) }
).mapNotNull { (message, response) ->
if (response is MagixRegistryValueMessage && response.propertyName == propertyName) {
message.sourceEndpoint to (response.value ?: return@mapNotNull null)
} else null
}.also {
//send the initial request after subscription
send(
MagixRegistryMessage.format,
originFilter = targetEndpoint?.let { setOf(it) }
).mapNotNull { (message, response) ->
if (response is MagixRegistryValueMessage && response.propertyName == propertyName) {
message.sourceEndpoint to response.value
} else null
}
MagixRegistryRequestMessage(propertyName),
source = endpointName,
target = registryEndpoint,
user = user
)
}

View File

@ -50,11 +50,11 @@ public fun MagixEndpoint.launchHistory(
send(
format = MagixHistory.magixFormat,
payload = sendPayload,
source = origin,
target = request.sourceEndpoint,
id = generateId(request),
parentId = request.id,
user = user,
origin = origin,
)
}
}

View File

@ -60,7 +60,7 @@ include(
":magix:magix-mqtt",
":magix:magix-storage",
":magix:magix-storage:magix-storage-xodus",
":controls-magix-client",
":controls-magix",
":demo:all-things",
":demo:many-devices",
":demo:magix-demo",