Compare commits
2 Commits
8bd9bcc6a6
...
57e9df140b
Author | SHA1 | Date | |
---|---|---|---|
57e9df140b | |||
231f1bc858 |
@ -7,6 +7,8 @@ import kotlinx.coroutines.sync.Mutex
|
|||||||
import kotlinx.coroutines.sync.withLock
|
import kotlinx.coroutines.sync.withLock
|
||||||
import space.kscience.controls.api.*
|
import space.kscience.controls.api.*
|
||||||
import space.kscience.controls.manager.DeviceManager
|
import space.kscience.controls.manager.DeviceManager
|
||||||
|
import space.kscience.controls.spec.DevicePropertySpec
|
||||||
|
import space.kscience.controls.spec.name
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.misc.DFExperimental
|
import space.kscience.dataforge.misc.DFExperimental
|
||||||
@ -106,12 +108,75 @@ public class DeviceClient(
|
|||||||
* Connect to a remote device via this endpoint.
|
* Connect to a remote device via this endpoint.
|
||||||
*
|
*
|
||||||
* @param context a [Context] to run device in
|
* @param context a [Context] to run device in
|
||||||
* @param endpointName the name of endpoint in Magix to connect to
|
* @param sourceEndpointName the name of this endpoint
|
||||||
|
* @param targetEndpointName the name of endpoint in Magix to connect to
|
||||||
* @param deviceName the name of device within endpoint
|
* @param deviceName the name of device within endpoint
|
||||||
*/
|
*/
|
||||||
public fun MagixEndpoint.remoteDevice(context: Context, endpointName: String, deviceName: Name): DeviceClient {
|
public fun MagixEndpoint.remoteDevice(
|
||||||
val subscription = subscribe(DeviceManager.magixFormat, originFilter = listOf(endpointName)).map { it.second }
|
context: Context,
|
||||||
|
sourceEndpointName: String,
|
||||||
|
targetEndpointName: String,
|
||||||
|
deviceName: Name,
|
||||||
|
): DeviceClient {
|
||||||
|
val subscription = subscribe(DeviceManager.magixFormat, originFilter = listOf(targetEndpointName)).map { it.second }
|
||||||
return DeviceClient(context, deviceName, subscription) {
|
return DeviceClient(context, deviceName, subscription) {
|
||||||
send(DeviceManager.magixFormat, it, endpointName, id = stringUID())
|
send(
|
||||||
|
format = DeviceManager.magixFormat,
|
||||||
|
payload = it,
|
||||||
|
source = sourceEndpointName,
|
||||||
|
target = targetEndpointName,
|
||||||
|
id = stringUID()
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subscribe on specific property of a device without creating a device
|
||||||
|
*/
|
||||||
|
public fun <T> MagixEndpoint.controlsPropertyFlow(
|
||||||
|
endpointName: String,
|
||||||
|
deviceName: Name,
|
||||||
|
propertySpec: DevicePropertySpec<*, T>,
|
||||||
|
): Flow<T> {
|
||||||
|
val subscription = subscribe(DeviceManager.magixFormat, originFilter = listOf(endpointName)).map { it.second }
|
||||||
|
|
||||||
|
return subscription.filterIsInstance<PropertyChangedMessage>()
|
||||||
|
.filter { message ->
|
||||||
|
message.sourceDevice == deviceName && message.property == propertySpec.name
|
||||||
|
}.map {
|
||||||
|
propertySpec.converter.metaToObject(it.value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public suspend fun <T> MagixEndpoint.sendControlsPropertyChange(
|
||||||
|
sourceEndpointName: String,
|
||||||
|
targetEndpointName: String,
|
||||||
|
deviceName: Name,
|
||||||
|
propertySpec: DevicePropertySpec<*, T>,
|
||||||
|
value: T,
|
||||||
|
) {
|
||||||
|
val message = PropertySetMessage(
|
||||||
|
property = propertySpec.name,
|
||||||
|
value = propertySpec.converter.objectToMeta(value),
|
||||||
|
targetDevice = deviceName
|
||||||
|
)
|
||||||
|
send(DeviceManager.magixFormat, message, source = sourceEndpointName, target = targetEndpointName)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subscribe on property change messages together with property values
|
||||||
|
*/
|
||||||
|
public fun <T> MagixEndpoint.controlsPropertyMessageFlow(
|
||||||
|
endpointName: String,
|
||||||
|
deviceName: Name,
|
||||||
|
propertySpec: DevicePropertySpec<*, T>,
|
||||||
|
): Flow<Pair<PropertyChangedMessage, T>> {
|
||||||
|
val subscription = subscribe(DeviceManager.magixFormat, originFilter = listOf(endpointName)).map { it.second }
|
||||||
|
|
||||||
|
return subscription.filterIsInstance<PropertyChangedMessage>()
|
||||||
|
.filter { message ->
|
||||||
|
message.sourceDevice == deviceName && message.property == propertySpec.name
|
||||||
|
}.map {
|
||||||
|
it to propertySpec.converter.metaToObject(it.value)
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,79 @@
|
|||||||
|
package space.kscience.controls.client
|
||||||
|
|
||||||
|
import kotlinx.coroutines.CoroutineScope
|
||||||
|
import kotlinx.coroutines.Job
|
||||||
|
import kotlinx.coroutines.flow.*
|
||||||
|
import kotlinx.coroutines.launch
|
||||||
|
import space.kscience.controls.api.PropertyChangedMessage
|
||||||
|
import space.kscience.controls.api.requestProperty
|
||||||
|
import space.kscience.controls.spec.DeviceActionSpec
|
||||||
|
import space.kscience.controls.spec.DevicePropertySpec
|
||||||
|
import space.kscience.controls.spec.MutableDevicePropertySpec
|
||||||
|
import space.kscience.controls.spec.name
|
||||||
|
import space.kscience.dataforge.meta.Meta
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An accessor that allows DeviceClient to connect to any property without type checks
|
||||||
|
*/
|
||||||
|
public suspend fun <T> DeviceClient.read(propertySpec: DevicePropertySpec<*, T>): T =
|
||||||
|
propertySpec.converter.metaToObject(readProperty(propertySpec.name)) ?: error("Property read result is not valid")
|
||||||
|
|
||||||
|
|
||||||
|
public suspend fun <T> DeviceClient.request(propertySpec: DevicePropertySpec<*, T>): T =
|
||||||
|
propertySpec.converter.metaToObject(requestProperty(propertySpec.name))
|
||||||
|
|
||||||
|
public suspend fun <T> DeviceClient.write(propertySpec: MutableDevicePropertySpec<*, T>, value: T) {
|
||||||
|
writeProperty(propertySpec.name, propertySpec.converter.objectToMeta(value))
|
||||||
|
}
|
||||||
|
|
||||||
|
public fun <T> DeviceClient.writeAsync(propertySpec: MutableDevicePropertySpec<*, T>, value: T): Job = launch {
|
||||||
|
write(propertySpec, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
public fun <T> DeviceClient.propertyFlow(spec: DevicePropertySpec<*, T>): Flow<T> = messageFlow
|
||||||
|
.filterIsInstance<PropertyChangedMessage>()
|
||||||
|
.filter { it.property == spec.name }
|
||||||
|
.mapNotNull { spec.converter.metaToObject(it.value) }
|
||||||
|
|
||||||
|
public fun <T> DeviceClient.onPropertyChange(
|
||||||
|
spec: DevicePropertySpec<*, T>,
|
||||||
|
scope: CoroutineScope = this,
|
||||||
|
callback: suspend PropertyChangedMessage.(T) -> Unit,
|
||||||
|
): Job = messageFlow
|
||||||
|
.filterIsInstance<PropertyChangedMessage>()
|
||||||
|
.filter { it.property == spec.name }
|
||||||
|
.onEach { change ->
|
||||||
|
val newValue = spec.converter.metaToObject(change.value)
|
||||||
|
if (newValue != null) {
|
||||||
|
change.callback(newValue)
|
||||||
|
}
|
||||||
|
}.launchIn(scope)
|
||||||
|
|
||||||
|
public fun <T> DeviceClient.useProperty(
|
||||||
|
spec: DevicePropertySpec<*, T>,
|
||||||
|
scope: CoroutineScope = this,
|
||||||
|
callback: suspend (T) -> Unit,
|
||||||
|
): Job = scope.launch {
|
||||||
|
callback(read(spec))
|
||||||
|
messageFlow
|
||||||
|
.filterIsInstance<PropertyChangedMessage>()
|
||||||
|
.filter { it.property == spec.name }
|
||||||
|
.collect { change ->
|
||||||
|
val newValue = spec.converter.metaToObject(change.value)
|
||||||
|
if (newValue != null) {
|
||||||
|
callback(newValue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public suspend fun <I, O> DeviceClient.execute(actionSpec: DeviceActionSpec<*, I, O>, input: I): O {
|
||||||
|
val inputMeta = actionSpec.inputConverter.objectToMeta(input)
|
||||||
|
val res = execute(actionSpec.name, inputMeta)
|
||||||
|
return actionSpec.outputConverter.metaToObject(res ?: Meta.EMPTY)
|
||||||
|
}
|
||||||
|
|
||||||
|
public suspend fun <O> DeviceClient.execute(actionSpec: DeviceActionSpec<*, Unit, O>): O {
|
||||||
|
val res = execute(actionSpec.name, Meta.EMPTY)
|
||||||
|
return actionSpec.outputConverter.metaToObject(res ?: Meta.EMPTY)
|
||||||
|
}
|
@ -4,7 +4,6 @@ import kotlinx.coroutines.flow.Flow
|
|||||||
import kotlinx.coroutines.flow.map
|
import kotlinx.coroutines.flow.map
|
||||||
import kotlinx.coroutines.test.runTest
|
import kotlinx.coroutines.test.runTest
|
||||||
import kotlinx.serialization.json.Json
|
import kotlinx.serialization.json.Json
|
||||||
import space.kscience.controls.api.Device
|
|
||||||
import space.kscience.controls.manager.DeviceManager
|
import space.kscience.controls.manager.DeviceManager
|
||||||
import space.kscience.controls.manager.install
|
import space.kscience.controls.manager.install
|
||||||
import space.kscience.controls.manager.respondMessage
|
import space.kscience.controls.manager.respondMessage
|
||||||
@ -25,9 +24,6 @@ import kotlin.test.assertContains
|
|||||||
import kotlin.time.Duration.Companion.milliseconds
|
import kotlin.time.Duration.Companion.milliseconds
|
||||||
|
|
||||||
|
|
||||||
public suspend fun <T> Device.readUnsafe(propertySpec: DevicePropertySpec<*, T>): T =
|
|
||||||
propertySpec.converter.metaToObject(readProperty(propertySpec.name)) ?: error("Property read result is not valid")
|
|
||||||
|
|
||||||
internal class RemoteDeviceConnect {
|
internal class RemoteDeviceConnect {
|
||||||
|
|
||||||
class TestDevice(context: Context, meta: Meta) : DeviceBySpec<TestDevice>(TestDevice, context, meta) {
|
class TestDevice(context: Context, meta: Meta) : DeviceBySpec<TestDevice>(TestDevice, context, meta) {
|
||||||
@ -82,6 +78,6 @@ internal class RemoteDeviceConnect {
|
|||||||
|
|
||||||
val remoteDevice = virtualMagixEndpoint.remoteDevice(context, "test", Name.EMPTY)
|
val remoteDevice = virtualMagixEndpoint.remoteDevice(context, "test", Name.EMPTY)
|
||||||
|
|
||||||
assertContains(0.0..1.0, remoteDevice.readUnsafe(TestDevice.value))
|
assertContains(0.0..1.0, remoteDevice.read(TestDevice.value))
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user