diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/CompositeControlComponents.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/CompositeControlComponents.kt index 3358b30..fa99aec 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/CompositeControlComponents.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/CompositeControlComponents.kt @@ -29,6 +29,30 @@ import kotlin.time.Duration.Companion.seconds private fun <T> Deferred<T>.getCompletedOrNull(): T? = if (isCompleted && !isCancelled) getCompleted() else null +/** + * Categorizes errors handling. + * - [CRITICAL] means the error requires strong reaction. + * - [NON_CRITICAL] means the system can continue with partial functionality or just log a warning. + */ +public enum class DeviceErrorCategory { + CRITICAL, + NON_CRITICAL +} + +/** + * A specialized exception holding a [category] for the device error. + * This allows distinguishing critical vs. non-critical errors in the same system. + * + * @param category The [DeviceErrorCategory] of this error. + * @param message The error message. + * @param cause The original cause, if any. + */ +public open class CategorizedDeviceException( + public val category: DeviceErrorCategory, + message: String, + cause: Throwable? = null +) : RuntimeException(message, cause) + /** * EventBus interface for publishing and subscribing to application-level events. */ @@ -91,6 +115,7 @@ public class DefaultTransportAdapter( logger.info { "TransportAdapter: message sent -> ${message.sourceDevice}" } eventBus.publish("Message sent: ${message.sourceDevice}") } + override fun subscribe(): Flow<DeviceMessage> = _messages.asSharedFlow() } @@ -133,9 +158,9 @@ public class DefaultTransactionManager( * Transaction events used by [DefaultTransactionManager]. */ public sealed class TransactionEvent { - public object TransactionStarted : TransactionEvent() - public object TransactionCommitted : TransactionEvent() - public object TransactionRolledBack : TransactionEvent() + public data object TransactionStarted : TransactionEvent() + public data object TransactionCommitted : TransactionEvent() + public data object TransactionRolledBack : TransactionEvent() } /** @@ -832,6 +857,7 @@ public open class CompositeControlComponentSpec<D : ConfigurableCompositeControl override val converter: MetaConverter<T> = converter override suspend fun read(device: D): T? = withContext(device.coroutineContext) { device.read(propertyName) } + override suspend fun write(device: D, value: T) = withContext(device.coroutineContext) { device.write(propertyName, value) } }) @@ -967,7 +993,7 @@ public abstract class AbstractDeviceHubManager( /** * Internal map that keeps track of each child's [ChildJob]. */ - internal val childrenJobs: MutableMap<Name, ChildJob> = mutableMapOf() + public val childrenJobs: MutableMap<Name, ChildJob> = mutableMapOf() /** * Snapshot of current devices (not guaranteed to be consistent without a lock). @@ -1018,7 +1044,7 @@ public abstract class AbstractDeviceHubManager( /** * Tracks the number of restart attempts per device. */ - internal val restartAttemptsMap: MutableMap<Name, Int> = mutableMapOf() + protected val restartAttemptsMap: MutableMap<Name, Int> = mutableMapOf() /** * A set that indicates which devices are currently in the middle of a RESTART procedure, @@ -1037,7 +1063,7 @@ public abstract class AbstractDeviceHubManager( * @property meta Optional metadata for the device. * @property reuseBus If `true`, reuses the old bus upon hot-swap. */ - internal data class ChildJob( + public data class ChildJob( val device: Device, val collectorJob: Job, val config: DeviceLifecycleConfig, @@ -1055,7 +1081,7 @@ public abstract class AbstractDeviceHubManager( * @param block The suspend function to execute. * @return The launched [Job]. */ - internal fun launchGlobal(block: suspend CoroutineScope.() -> Unit): Job = + public fun launchGlobal(block: suspend CoroutineScope.() -> Unit): Job = CoroutineScope(parentJob + dispatcher + exceptionHandler).launch { block() } /** @@ -1066,7 +1092,19 @@ public abstract class AbstractDeviceHubManager( * @param config The lifecycle configuration of the child device. */ protected open suspend fun onChildErrorCaught(ex: Throwable, childName: Name, config: DeviceLifecycleConfig) { - context.logger.error(ex) { "Error in child device $childName with policy ${config.onError}" } + val category = if (ex is CategorizedDeviceException) ex.category else DeviceErrorCategory.CRITICAL + + when (category) { + DeviceErrorCategory.CRITICAL -> { + context.logger.error(ex) { + "CRITICAL error in child device $childName with policy ${config.onError}" + } + } + + DeviceErrorCategory.NON_CRITICAL -> { + context.logger.warn { "NON_CRITICAL error in child device $childName, continuing with policy ${config.onError}" } + } + } } /** @@ -1114,7 +1152,7 @@ public abstract class AbstractDeviceHubManager( * @param deviceName The name of the device. * @param config The lifecycle configuration for the device. */ - internal open suspend fun onStopTimeout(deviceName: Name, config: DeviceLifecycleConfig) { + public open suspend fun onStopTimeout(deviceName: Name, config: DeviceLifecycleConfig) { context.logger.warn { "Timeout while stopping $deviceName. You may override onStopTimeout if needed." } } @@ -1126,7 +1164,7 @@ public abstract class AbstractDeviceHubManager( * * @param child The [ChildJob] representing the device. */ - internal open suspend fun checkHealth(child: ChildJob) { + protected open suspend fun checkHealth(child: ChildJob) { val hc = child.config.healthChecker ?: return if (!hc.isHealthy(child.device)) { val ex = RuntimeException("Health check failed for device ${child.device.id}") @@ -1146,8 +1184,7 @@ public abstract class AbstractDeviceHubManager( * @param reuseBus If not null, reuses the provided message bus (e.g., for hot-swap). * @return A [ChildJob] representing the running child device. */ - @OptIn(ExperimentalCoroutinesApi::class) - internal fun launchChild( + protected fun launchChild( name: Name, device: Device, config: DeviceLifecycleConfig, @@ -1315,16 +1352,19 @@ public abstract class AbstractDeviceHubManager( } /** - * Internal helper that starts a device while respecting [startDelay] and [startTimeout]. + * Helper that starts a device while respecting [startDelay] and [startTimeout]. * * @param name The device name. * @param config The lifecycle configuration. * @param device The device instance. */ - internal open suspend fun doStartDevice(name: Name, config: DeviceLifecycleConfig, device: Device) { + protected open suspend fun doStartDevice(name: Name, config: DeviceLifecycleConfig, device: Device) { if (config.startDelay > Duration.ZERO) delay(config.startDelay) val startTimeout = config.startTimeout ?: Duration.INFINITE val success = withTimeoutOrNull(startTimeout) { + check(device.lifecycleState != LifecycleState.STARTED) { + "Device $name is already started." + } device.start() } if (success == null) { @@ -1507,7 +1547,30 @@ public abstract class AbstractDeviceHubManager( * Called after a child device is physically stopped and removed in [removeJobFromRegistry]. * Override in a subclass if additional logic is needed. */ - internal open fun onChildStop() {} + protected open fun onChildStop() {} + + /** + * A method to rename a device dynamically. This moves the [ChildJob] from [oldName] to [newName]. + * If [newName] already exists, an exception is thrown to prevent collisions. + * + * @param oldName The current name of the device. + * @param newName The new name for the device. + */ + public suspend fun renameDevice(oldName: Name, newName: Name) { + childLock.withLock { + require(!childrenJobs.containsKey(newName)) { + "A device with name $newName already exists; cannot rename." + } + val child = childrenJobs.remove(oldName) ?: error("Device not found: $oldName") + childrenJobs[newName] = child + } + systemBus.emit(SystemLogMessage("Device renamed from $oldName to $newName", sourceDevice = newName)) + metricPublisher.publishMetric( + "device.rename", + 1.0, + mapOf("oldName" to oldName.toString(), "newName" to newName.toString()) + ) + } /** * Starts multiple devices in a transactional manner. @@ -1631,7 +1694,7 @@ public abstract class AbstractDeviceHubManager( * @param context The parent context. * @param dispatcher The [CoroutineDispatcher] for concurrency. */ -internal class DeviceHubManagerImpl(context: Context, dispatcher: CoroutineDispatcher) : AbstractDeviceHubManager(context, dispatcher) { +public class DeviceHubManagerImpl(context: Context, dispatcher: CoroutineDispatcher) : AbstractDeviceHubManager(context, dispatcher) { override val messageBus: MutableSharedFlow<DeviceMessage> = MutableSharedFlow( replay = 1000, onBufferOverflow = BufferOverflow.DROP_OLDEST @@ -1706,7 +1769,7 @@ public open class ConfigurableCompositeControlComponent<D : ConfigurableComposit override val devices: Map<Name, Device> get() = hubManager.devices - private val childConfigs: List<ChildComponentConfig<*>> = spec.childSpecs.values.toList() + protected val childConfigs: List<ChildComponentConfig<*>> = spec.childSpecs.values.toList() init { hubManager.launchGlobal { @@ -1769,12 +1832,14 @@ public open class ConfigurableCompositeControlComponent<D : ConfigurableComposit self.onOpen() validate(self) } - hubManager.devices.values.filter { it.lifecycleState == LifecycleState.INITIAL }.forEach { child -> - val mode = hubManager.childrenJobs[child.id.parseAsName()]?.lifecycleMode - if (mode != LifecycleMode.LAZY) { - child.start() + hubManager.devices.values + .filter { it.lifecycleState == LifecycleState.INITIAL } + .forEach { child -> + val mode = hubManager.childrenJobs[child.id.parseAsName()]?.lifecycleMode + if (mode != LifecycleMode.LAZY) { + child.start() + } } - } } /** diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/CompositeControlComponentsDelegates.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/CompositeControlComponentsDelegates.kt index ada76ed..fcd0bf5 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/CompositeControlComponentsDelegates.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/CompositeControlComponentsDelegates.kt @@ -246,3 +246,84 @@ public fun CompositeControlComponentSpec<*>.validateSpec(device: Device) { } } } + +/** + * A read-only property with the ability to embed processing when reading a value. + */ +public fun <T, D : ConfigurableCompositeControlComponent<D>> CompositeControlComponentSpec<D>.checkedReadOnlyProperty( + converter: MetaConverter<T>, + name: String? = null, + descriptorBuilder: PropertyDescriptorBuilder.() -> Unit = {}, + beforeRead: suspend D.(propertyName: String) -> Unit = { }, + afterRead: suspend D.(propertyName: String, value: T?) -> Unit = { _, _ -> }, + read: suspend D.(String) -> T?, +): PropertyDelegateProvider<CompositeControlComponentSpec<D>, ReadOnlyProperty<CompositeControlComponentSpec<D>, DevicePropertySpec<D, T>>> { + + return property( + converter = converter, + descriptorBuilder = descriptorBuilder, + name = name + ) { propertyName -> + beforeRead(this, propertyName) + val result = read(propertyName) + afterRead(this, propertyName, result) + result + } +} + +/** + * Mutable property with the ability to embed processing when reading a value. + */ +public fun <T, D : ConfigurableCompositeControlComponent<D>> CompositeControlComponentSpec<D>.checkedMutableProperty( + converter: MetaConverter<T>, + name: String? = null, + descriptorBuilder: PropertyDescriptorBuilder.() -> Unit = {}, + beforeRead: suspend D.(propertyName: String) -> Unit = { }, + afterRead: suspend D.(propertyName: String, value: T?) -> Unit = { _, _ -> }, + beforeWrite: suspend D.(propertyName: String, newValue: T) -> Unit = { _, _ -> }, + afterWrite: suspend D.(propertyName: String, newValue: T) -> Unit = { _, _ -> }, + read: suspend D.(String) -> T?, + write: suspend D.(String, T) -> Unit, +): PropertyDelegateProvider<CompositeControlComponentSpec<D>, ReadOnlyProperty<CompositeControlComponentSpec<D>, MutableDevicePropertySpec<D, T>>> { + + return mutableProperty( + converter = converter, + descriptorBuilder = descriptorBuilder, + name = name, + read = { propertyName -> + beforeRead(this, propertyName) + val result = read(propertyName) + afterRead(this, propertyName, result) + result + }, + write = { propertyName, value -> + beforeWrite(this, propertyName, value) + write(propertyName, value) + afterWrite(this, propertyName, value) + } + ) +} + +/** + * Property with a defaultValue for the case when `read(...)` will return null. + */ +public fun <T, D : ConfigurableCompositeControlComponent<D>> CompositeControlComponentSpec<D>.defaultValueProperty( + converter: MetaConverter<T>, + defaultValue: T, + name: String? = null, + descriptorBuilder: PropertyDescriptorBuilder.() -> Unit = {}, + read: suspend D.(String) -> T?, + write: (suspend D.(String, T) -> Unit)? = null +): PropertyDelegateProvider<CompositeControlComponentSpec<D>, ReadOnlyProperty<CompositeControlComponentSpec<D>, DevicePropertySpec<D, T>>> { + + return if (write == null) { + property(converter, descriptorBuilder, name) { propertyName -> + read(propertyName) ?: defaultValue + } + } else { + mutableProperty(converter, descriptorBuilder, name, + read = { propertyName -> read(propertyName) ?: defaultValue }, + write = { propertyName, value -> write(propertyName, value) } + ) + } +} \ No newline at end of file