Remove hierarchical device structure in Hubs

This commit is contained in:
Alexander Nozik 2024-05-19 10:44:34 +03:00
parent e5088ac8e4
commit 207064cd45
13 changed files with 140 additions and 139 deletions

View File

@ -5,10 +5,14 @@
### Added
- Value averaging plot extension
- PLC4X bindings
- Shortcuts to access all Controls devices in a magix network.
- `DeviceClient` properly evaluates lifecycle and logs
### Changed
- Constructor properties return `DeviceStat` in order to be able to subscribe to them
- Constructor properties return `DeviceState` in order to be able to subscribe to them
- Refactored ports. Now we have `AsynchronousPort` as well as `SynchronousPort`
- `DeviceClient` now initializes property and action descriptors eagerly.
- `DeviceHub` now works with `Name` instead of `NameToken`. Tree-like structure is made using `Path`. Device messages no longer have access to sub-devices.
### Deprecated

View File

@ -7,7 +7,7 @@ plugins {
allprojects {
group = "space.kscience"
version = "0.4.0-dev-1"
version = "0.4.0-dev-2"
repositories{
maven("https://maven.pkg.jetbrains.space/spc/p/sci/dev")
}

View File

@ -10,9 +10,14 @@ import space.kscience.controls.api.DeviceLifecycleState.*
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.install
import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.meta.Laminate
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaConverter
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.*
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.get
import space.kscience.dataforge.names.parseAsName
import kotlin.collections.set
import kotlin.coroutines.CoroutineContext
@ -60,15 +65,15 @@ public open class DeviceGroup(
)
private val _devices = hashMapOf<NameToken, Device>()
private val _devices = hashMapOf<Name, Device>()
override val devices: Map<NameToken, Device> = _devices
override val devices: Map<Name, Device> = _devices
/**
* Register and initialize (synchronize child's lifecycle state with group state) a new device in this group
*/
@OptIn(DFExperimental::class)
public open fun <D : Device> install(token: NameToken, device: D): D {
public open fun <D : Device> install(token: Name, device: D): D {
require(_devices[token] == null) { "A child device with name $token already exists" }
//start the child device if needed
if (lifecycleState == STARTED || lifecycleState == STARTING) launch { device.start() }
@ -175,35 +180,16 @@ public fun Context.registerDeviceGroup(
block: DeviceGroup.() -> Unit,
): DeviceGroup = request(DeviceManager).registerDeviceGroup(name, meta, block)
private fun DeviceGroup.getOrCreateGroup(name: Name): DeviceGroup {
return when (name.length) {
0 -> this
1 -> {
val token = name.first()
when (val d = devices[token]) {
null -> install(
token,
DeviceGroup(context, meta[token] ?: Meta.EMPTY)
)
else -> (d as? DeviceGroup) ?: error("Device $name is not a DeviceGroup")
}
}
else -> getOrCreateGroup(name.first().asName()).getOrCreateGroup(name.cutFirst())
}
}
/**
* Register a device at given [name] path
*/
public fun <D : Device> DeviceGroup.install(name: Name, device: D): D {
return when (name.length) {
0 -> error("Can't use empty name for a child device")
1 -> install(name.first(), device)
else -> getOrCreateGroup(name.cutLast()).install(name.tokens.last(), device)
}
}
///**
// * Register a device at given [path] path
// */
//public fun <D : Device> DeviceGroup.install(path: Path, device: D): D {
// return when (path.length) {
// 0 -> error("Can't use empty path for a child device")
// 1 -> install(path.first().name, device)
// else -> getOrCreateGroup(path.cutLast()).install(path.tokens.last(), device)
// }
//}
public fun <D : Device> DeviceGroup.install(name: String, device: D): D = install(name.parseAsName(), device)
@ -238,7 +224,7 @@ public fun <D : Device> DeviceGroup.install(
* Create or edit a group with a given [name].
*/
public fun DeviceGroup.registerDeviceGroup(name: Name, block: DeviceGroup.() -> Unit): DeviceGroup =
getOrCreateGroup(name).apply(block)
install(name, DeviceGroup(context, meta).apply(block))
public fun DeviceGroup.registerDeviceGroup(name: String, block: DeviceGroup.() -> Unit): DeviceGroup =
registerDeviceGroup(name.parseAsName(), block)

View File

@ -8,10 +8,10 @@ import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Instant
import space.kscience.controls.constructor.DeviceGroup
import space.kscience.controls.constructor.install
import space.kscience.controls.manager.clock
import space.kscience.controls.spec.DeviceBySpec
import space.kscience.controls.spec.write
import space.kscience.dataforge.names.parseAsName
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.DurationUnit
@ -94,4 +94,4 @@ public fun DeviceGroup.pid(
name: String,
drive: Drive,
pidParameters: PidParameters,
): PidRegulator = install(name, PidRegulator(drive, pidParameters))
): PidRegulator = install(name.parseAsName(), PidRegulator(drive, pidParameters))

View File

@ -1,40 +1,24 @@
package space.kscience.controls.api
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.*
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.provider.Path
import space.kscience.dataforge.provider.Provider
import space.kscience.dataforge.provider.asPath
import space.kscience.dataforge.provider.plus
/**
* A hub that could locate multiple devices and redirect actions to them
*/
public interface DeviceHub : Provider {
public val devices: Map<NameToken, Device>
public val devices: Map<Name, Device>
override val defaultTarget: String get() = Device.DEVICE_TARGET
override val defaultChainTarget: String get() = Device.DEVICE_TARGET
/**
* List all devices, including sub-devices
*/
public fun buildDeviceTree(): Map<Name, Device> = buildMap {
fun putAll(prefix: Name, hub: DeviceHub) {
hub.devices.forEach {
put(prefix + it.key, it.value)
}
}
devices.forEach {
val name = it.key.asName()
put(name, it.value)
(it.value as? DeviceHub)?.let { hub ->
putAll(name, hub)
}
}
}
override fun content(target: String): Map<Name, Any> = if (target == Device.DEVICE_TARGET) {
buildDeviceTree()
devices
} else {
emptyMap()
}
@ -42,38 +26,31 @@ public interface DeviceHub : Provider {
public companion object
}
public operator fun DeviceHub.get(nameToken: NameToken): Device =
devices[nameToken] ?: error("Device with name $nameToken not found in $this")
public fun DeviceHub.getOrNull(name: Name): Device? = when {
name.isEmpty() -> this as? Device
name.length == 1 -> devices[name.firstOrNull()!!]
else -> (get(name.firstOrNull()!!) as? DeviceHub)?.getOrNull(name.cutFirst())
/**
* List all devices, including sub-devices
*/
public fun DeviceHub.provideAllDevices(): Map<Path, Device> = buildMap {
fun putAll(prefix: Path, hub: DeviceHub) {
hub.devices.forEach {
put(prefix + it.key.asPath(), it.value)
}
}
public operator fun DeviceHub.get(name: Name): Device =
getOrNull(name) ?: error("Device with name $name not found in $this")
public fun DeviceHub.getOrNull(nameString: String): Device? = getOrNull(Name.parse(nameString))
public operator fun DeviceHub.get(nameString: String): Device =
getOrNull(nameString) ?: error("Device with name $nameString not found in $this")
devices.forEach {
val name: Name = it.key
put(name.asPath(), it.value)
(it.value as? DeviceHub)?.let { hub ->
putAll(name.asPath(), hub)
}
}
}
public suspend fun DeviceHub.readProperty(deviceName: Name, propertyName: String): Meta =
this[deviceName].readProperty(propertyName)
(devices[deviceName] ?: error("Device with name $deviceName not found in $this")).readProperty(propertyName)
public suspend fun DeviceHub.writeProperty(deviceName: Name, propertyName: String, value: Meta) {
this[deviceName].writeProperty(propertyName, value)
(devices[deviceName] ?: error("Device with name $deviceName not found in $this")).writeProperty(propertyName, value)
}
public suspend fun DeviceHub.execute(deviceName: Name, command: String, argument: Meta?): Meta? =
this[deviceName].execute(command, argument)
//suspend fun DeviceHub.respond(request: Envelope): EnvelopeBuilder {
// val target = request.meta[DeviceMessage.TARGET_KEY].string ?: defaultTarget
// val device = this[target.toName()]
//
// return device.respond(device, target, request)
//}
(devices[deviceName] ?: error("Device with name $deviceName not found in $this")).execute(command, argument)

View File

@ -3,13 +3,13 @@ package space.kscience.controls.manager
import kotlinx.coroutines.launch
import space.kscience.controls.api.Device
import space.kscience.controls.api.DeviceHub
import space.kscience.controls.api.getOrNull
import space.kscience.controls.api.id
import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.get
import space.kscience.dataforge.names.parseAsName
import kotlin.collections.set
import kotlin.properties.ReadOnlyProperty
@ -22,11 +22,11 @@ public class DeviceManager : AbstractPlugin(), DeviceHub {
/**
* Actual list of connected devices
*/
private val top = HashMap<NameToken, Device>()
override val devices: Map<NameToken, Device> get() = top
private val _devices = HashMap<Name, Device>()
override val devices: Map<Name, Device> get() = _devices
public fun registerDevice(name: NameToken, device: Device) {
top[name] = device
public fun registerDevice(name: Name, device: Device) {
_devices[name] = device
}
override fun content(target: String): Map<Name, Any> = super<DeviceHub>.content(target)
@ -39,7 +39,7 @@ public class DeviceManager : AbstractPlugin(), DeviceHub {
}
public fun <D : Device> DeviceManager.install(name: String, device: D): D {
registerDevice(NameToken(name), device)
registerDevice(name.parseAsName(), device)
device.launch {
device.start()
}
@ -69,7 +69,7 @@ public inline fun <D : Device> DeviceManager.installing(
val meta = Meta(builder)
return ReadOnlyProperty { _, property ->
val name = property.name
val current = getOrNull(name)
val current = devices[name]
if (current == null) {
install(name, factory, meta)
} else if (current.meta != meta) {

View File

@ -74,11 +74,11 @@ public suspend fun DeviceHub.respondHubMessage(request: DeviceMessage): List<Dev
return try {
val targetName = request.targetDevice
if (targetName == null) {
buildDeviceTree().mapNotNull {
devices.mapNotNull {
it.value.respondMessage(it.key, request)
}
} else {
val device = getOrNull(targetName) ?: error("The device with name $targetName not found in $this")
val device = devices[targetName] ?: error("The device with name $targetName not found in $this")
listOfNotNull(device.respondMessage(targetName, request))
}
} catch (ex: Exception) {

View File

@ -16,6 +16,8 @@ import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.length
import space.kscience.dataforge.names.startsWith
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.send
import space.kscience.magix.api.subscribe
@ -29,13 +31,19 @@ private fun stringUID() = uuid4().leastSignificantBits.toString(16)
public class DeviceClient internal constructor(
override val context: Context,
private val deviceName: Name,
override val propertyDescriptors: Collection<PropertyDescriptor>,
override val actionDescriptors: Collection<ActionDescriptor>,
propertyDescriptors: Collection<PropertyDescriptor>,
actionDescriptors: Collection<ActionDescriptor>,
incomingFlow: Flow<DeviceMessage>,
private val send: suspend (DeviceMessage) -> Unit,
) : CachingDevice {
override var actionDescriptors: Collection<ActionDescriptor> = actionDescriptors
internal set
override var propertyDescriptors: Collection<PropertyDescriptor> = propertyDescriptors
internal set
override val coroutineContext: CoroutineContext = context.coroutineContext + Job(context.coroutineContext[Job])
private val mutex = Mutex()
@ -44,8 +52,7 @@ public class DeviceClient internal constructor(
private val flowInternal = incomingFlow.filter {
it.sourceDevice == deviceName
}.shareIn(this, started = SharingStarted.Eagerly).also {
it.onEach { message ->
}.onEach { message ->
when (message) {
is PropertyChangedMessage -> mutex.withLock {
propertyCache[message.property] = message.value
@ -55,8 +62,7 @@ public class DeviceClient internal constructor(
//ignore
}
}
}.launchIn(this)
}
}.shareIn(this, started = SharingStarted.Eagerly)
override val messageFlow: Flow<DeviceMessage> get() = flowInternal
@ -65,7 +71,7 @@ public class DeviceClient internal constructor(
send(
PropertyGetMessage(propertyName, targetDevice = deviceName)
)
return flowInternal.filterIsInstance<PropertyChangedMessage>().first {
return messageFlow.filterIsInstance<PropertyChangedMessage>().first {
it.property == propertyName
}.value
}
@ -89,30 +95,33 @@ public class DeviceClient internal constructor(
send(
ActionExecuteMessage(actionName, argument, id, targetDevice = deviceName)
)
return flowInternal.filterIsInstance<ActionResultMessage>().first {
return messageFlow.filterIsInstance<ActionResultMessage>().first {
it.action == actionName && it.requestId == id
}.result
}
private val lifecycleStateFlow = messageFlow.filterIsInstance<DeviceLifeCycleMessage>()
.map { it.state }.stateIn(this, started = SharingStarted.Eagerly, DeviceLifecycleState.STARTED)
@DFExperimental
override val lifecycleState: DeviceLifecycleState = DeviceLifecycleState.STARTED
override val lifecycleState: DeviceLifecycleState get() = lifecycleStateFlow.value
}
/**
* Connect to a remote device via this endpoint.
*
* @param context a [Context] to run device in
* @param sourceEndpointName the name of this endpoint
* @param targetEndpointName the name of endpoint in Magix to connect to
* @param thisEndpoint the name of this endpoint
* @param deviceEndpoint the name of endpoint in Magix to connect to
* @param deviceName the name of device within endpoint
*/
public suspend fun MagixEndpoint.remoteDevice(
context: Context,
sourceEndpointName: String,
targetEndpointName: String,
thisEndpoint: String,
deviceEndpoint: String,
deviceName: Name,
): DeviceClient = coroutineScope {
val subscription = subscribe(DeviceManager.magixFormat, originFilter = listOf(targetEndpointName)).map { it.second }
val subscription = subscribe(DeviceManager.magixFormat, originFilter = listOf(deviceEndpoint)).map { it.second }
val deferredDescriptorMessage = CompletableDeferred<DescriptionMessage>()
@ -123,8 +132,8 @@ public suspend fun MagixEndpoint.remoteDevice(
send(
format = DeviceManager.magixFormat,
payload = GetDescriptionMessage(targetDevice = deviceName),
source = sourceEndpointName,
target = targetEndpointName,
source = thisEndpoint,
target = deviceEndpoint,
id = stringUID()
)
@ -141,14 +150,37 @@ public suspend fun MagixEndpoint.remoteDevice(
send(
format = DeviceManager.magixFormat,
payload = it,
source = sourceEndpointName,
target = targetEndpointName,
source = thisEndpoint,
target = deviceEndpoint,
id = stringUID()
)
}
}
//public fun MagixEndpoint.remoteDeviceHub()
private class MapBasedDeviceHub(val deviceMap: Map<Name, Device>, val prefix: Name) : DeviceHub {
override val devices: Map<Name, Device>
get() = deviceMap.filterKeys { name: Name -> name == prefix || (name.startsWith(prefix) && name.length == prefix.length + 1) }
}
public fun MagixEndpoint.remoteDeviceHub(
context: Context,
thisEndpoint: String,
deviceEndpoint: String,
): DeviceHub {
val devices = mutableMapOf<Name, DeviceClient>()
val subscription = subscribe(DeviceManager.magixFormat, originFilter = listOf(deviceEndpoint)).map { it.second }
subscription.filterIsInstance<DescriptionMessage>().onEach {
}.launchIn(context)
return object : DeviceHub {
override val devices: Map<Name, Device>
get() = TODO("Not yet implemented")
}
}
/**
* Subscribe on specific property of a device without creating a device

View File

@ -5,12 +5,12 @@ import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.serialization.Serializable
import space.kscience.controls.api.get
import space.kscience.controls.api.getOrReadProperty
import space.kscience.controls.manager.DeviceManager
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.get
import space.kscience.magix.api.*
public const val TANGO_MAGIX_FORMAT: String = "tango"
@ -88,7 +88,7 @@ public fun DeviceManager.launchTangoMagix(
return context.launch {
endpoint.subscribe(tangoMagixFormat).onEach { (request, payload) ->
try {
val device = get(payload.device)
val device = devices[payload.device] ?: error("Device ${payload.device} not found")
when (payload.action) {
TangoAction.read -> {
val value = device.getOrReadProperty(payload.name)
@ -99,6 +99,7 @@ public fun DeviceManager.launchTangoMagix(
)
}
}
TangoAction.write -> {
payload.value?.let { value ->
device.writeProperty(payload.name, value)
@ -112,6 +113,7 @@ public fun DeviceManager.launchTangoMagix(
)
}
}
TangoAction.exec -> {
val result = device.execute(payload.name, payload.argin)
respond(request, payload) { requestPayload ->
@ -121,6 +123,7 @@ public fun DeviceManager.launchTangoMagix(
)
}
}
TangoAction.pipe -> TODO("Pipe not implemented")
}
} catch (ex: Exception) {

View File

@ -59,7 +59,7 @@ internal class RemoteDeviceConnect {
val virtualMagixEndpoint = object : MagixEndpoint {
private val additionalMessages = MutableSharedFlow<DeviceMessage>(10)
private val additionalMessages = MutableSharedFlow<DeviceMessage>(1)
override fun subscribe(
filter: MagixMessageFilter,

View File

@ -7,7 +7,7 @@ import space.kscience.controls.spec.DeviceBySpec
import space.kscience.controls.spec.DeviceSpec
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.Name
/**
* A variant of [DeviceBySpec] that includes Modbus RTU/TCP/UDP client
@ -35,12 +35,12 @@ public open class ModbusDeviceBySpec<D: Device>(
public class ModbusHub(
public val context: Context,
public val masterBuilder: () -> AbstractModbusMaster,
public val specs: Map<NameToken, Pair<Int, DeviceSpec<*>>>,
public val specs: Map<Name, Pair<Int, DeviceSpec<*>>>,
) : DeviceHub, AutoCloseable {
public val master: AbstractModbusMaster by lazy(masterBuilder)
override val devices: Map<NameToken, ModbusDevice> by lazy {
override val devices: Map<Name, ModbusDevice> by lazy {
specs.mapValues { (_, pair) ->
ModbusDeviceBySpec(
context,

View File

@ -29,19 +29,18 @@ import kotlinx.serialization.json.put
import space.kscience.controls.api.DeviceMessage
import space.kscience.controls.api.PropertyGetMessage
import space.kscience.controls.api.PropertySetMessage
import space.kscience.controls.api.getOrNull
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.respondHubMessage
import space.kscience.dataforge.meta.toMeta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.get
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixFlowPlugin
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.server.magixModule
private fun Application.deviceServerModule(manager: DeviceManager) {
install(StatusPages) {
exception<IllegalArgumentException> { call, cause ->
@ -100,9 +99,8 @@ public fun Application.deviceManagerModule(
h1 {
+"Device server dashboard"
}
deviceNames.forEach { deviceName ->
val device =
manager.getOrNull(deviceName)
deviceNames.forEach { deviceName: String ->
val device = manager.devices[deviceName]
?: error("The device with name $deviceName not found in $manager")
div {
id = deviceName

View File

@ -19,7 +19,8 @@ import space.kscience.controls.ports.withStringDelimiter
import space.kscience.controls.spec.*
import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.parseAsName
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.time.Duration
@ -47,7 +48,7 @@ class PiMotionMasterDevice(
var axes: Map<String, Axis> = emptyMap()
private set
override val devices: Map<NameToken, Axis> = axes.mapKeys { (key, _) -> NameToken(key) }
override val devices: Map<Name, Axis> = axes.mapKeys { (key, _) -> key.parseAsName() }
private suspend fun failIfError(message: (Int) -> String = { "Failed with error code $it" }) {
val errorCode = getErrorCode()