docs(CompositeControlComponents): improved some methods, added logging options and new delegates

This commit is contained in:
Максим Колпаков 2025-02-11 16:24:24 +03:00
parent c6375a60ac
commit 4b3ad34e82
2 changed files with 168 additions and 22 deletions
controls-core/src/commonMain/kotlin/space/kscience/controls/spec

@ -29,6 +29,30 @@ import kotlin.time.Duration.Companion.seconds
private fun <T> Deferred<T>.getCompletedOrNull(): T? =
if (isCompleted && !isCancelled) getCompleted() else null
/**
* Categorizes errors handling.
* - [CRITICAL] means the error requires strong reaction.
* - [NON_CRITICAL] means the system can continue with partial functionality or just log a warning.
*/
public enum class DeviceErrorCategory {
CRITICAL,
NON_CRITICAL
}
/**
* A specialized exception holding a [category] for the device error.
* This allows distinguishing critical vs. non-critical errors in the same system.
*
* @param category The [DeviceErrorCategory] of this error.
* @param message The error message.
* @param cause The original cause, if any.
*/
public open class CategorizedDeviceException(
public val category: DeviceErrorCategory,
message: String,
cause: Throwable? = null
) : RuntimeException(message, cause)
/**
* EventBus interface for publishing and subscribing to application-level events.
*/
@ -91,6 +115,7 @@ public class DefaultTransportAdapter(
logger.info { "TransportAdapter: message sent -> ${message.sourceDevice}" }
eventBus.publish("Message sent: ${message.sourceDevice}")
}
override fun subscribe(): Flow<DeviceMessage> = _messages.asSharedFlow()
}
@ -133,9 +158,9 @@ public class DefaultTransactionManager(
* Transaction events used by [DefaultTransactionManager].
*/
public sealed class TransactionEvent {
public object TransactionStarted : TransactionEvent()
public object TransactionCommitted : TransactionEvent()
public object TransactionRolledBack : TransactionEvent()
public data object TransactionStarted : TransactionEvent()
public data object TransactionCommitted : TransactionEvent()
public data object TransactionRolledBack : TransactionEvent()
}
/**
@ -832,6 +857,7 @@ public open class CompositeControlComponentSpec<D : ConfigurableCompositeControl
override val converter: MetaConverter<T> = converter
override suspend fun read(device: D): T? =
withContext(device.coroutineContext) { device.read(propertyName) }
override suspend fun write(device: D, value: T) =
withContext(device.coroutineContext) { device.write(propertyName, value) }
})
@ -967,7 +993,7 @@ public abstract class AbstractDeviceHubManager(
/**
* Internal map that keeps track of each child's [ChildJob].
*/
internal val childrenJobs: MutableMap<Name, ChildJob> = mutableMapOf()
public val childrenJobs: MutableMap<Name, ChildJob> = mutableMapOf()
/**
* Snapshot of current devices (not guaranteed to be consistent without a lock).
@ -1018,7 +1044,7 @@ public abstract class AbstractDeviceHubManager(
/**
* Tracks the number of restart attempts per device.
*/
internal val restartAttemptsMap: MutableMap<Name, Int> = mutableMapOf()
protected val restartAttemptsMap: MutableMap<Name, Int> = mutableMapOf()
/**
* A set that indicates which devices are currently in the middle of a RESTART procedure,
@ -1037,7 +1063,7 @@ public abstract class AbstractDeviceHubManager(
* @property meta Optional metadata for the device.
* @property reuseBus If `true`, reuses the old bus upon hot-swap.
*/
internal data class ChildJob(
public data class ChildJob(
val device: Device,
val collectorJob: Job,
val config: DeviceLifecycleConfig,
@ -1055,7 +1081,7 @@ public abstract class AbstractDeviceHubManager(
* @param block The suspend function to execute.
* @return The launched [Job].
*/
internal fun launchGlobal(block: suspend CoroutineScope.() -> Unit): Job =
public fun launchGlobal(block: suspend CoroutineScope.() -> Unit): Job =
CoroutineScope(parentJob + dispatcher + exceptionHandler).launch { block() }
/**
@ -1066,7 +1092,19 @@ public abstract class AbstractDeviceHubManager(
* @param config The lifecycle configuration of the child device.
*/
protected open suspend fun onChildErrorCaught(ex: Throwable, childName: Name, config: DeviceLifecycleConfig) {
context.logger.error(ex) { "Error in child device $childName with policy ${config.onError}" }
val category = if (ex is CategorizedDeviceException) ex.category else DeviceErrorCategory.CRITICAL
when (category) {
DeviceErrorCategory.CRITICAL -> {
context.logger.error(ex) {
"CRITICAL error in child device $childName with policy ${config.onError}"
}
}
DeviceErrorCategory.NON_CRITICAL -> {
context.logger.warn { "NON_CRITICAL error in child device $childName, continuing with policy ${config.onError}" }
}
}
}
/**
@ -1114,7 +1152,7 @@ public abstract class AbstractDeviceHubManager(
* @param deviceName The name of the device.
* @param config The lifecycle configuration for the device.
*/
internal open suspend fun onStopTimeout(deviceName: Name, config: DeviceLifecycleConfig) {
public open suspend fun onStopTimeout(deviceName: Name, config: DeviceLifecycleConfig) {
context.logger.warn { "Timeout while stopping $deviceName. You may override onStopTimeout if needed." }
}
@ -1126,7 +1164,7 @@ public abstract class AbstractDeviceHubManager(
*
* @param child The [ChildJob] representing the device.
*/
internal open suspend fun checkHealth(child: ChildJob) {
protected open suspend fun checkHealth(child: ChildJob) {
val hc = child.config.healthChecker ?: return
if (!hc.isHealthy(child.device)) {
val ex = RuntimeException("Health check failed for device ${child.device.id}")
@ -1146,8 +1184,7 @@ public abstract class AbstractDeviceHubManager(
* @param reuseBus If not null, reuses the provided message bus (e.g., for hot-swap).
* @return A [ChildJob] representing the running child device.
*/
@OptIn(ExperimentalCoroutinesApi::class)
internal fun launchChild(
protected fun launchChild(
name: Name,
device: Device,
config: DeviceLifecycleConfig,
@ -1315,16 +1352,19 @@ public abstract class AbstractDeviceHubManager(
}
/**
* Internal helper that starts a device while respecting [startDelay] and [startTimeout].
* Helper that starts a device while respecting [startDelay] and [startTimeout].
*
* @param name The device name.
* @param config The lifecycle configuration.
* @param device The device instance.
*/
internal open suspend fun doStartDevice(name: Name, config: DeviceLifecycleConfig, device: Device) {
protected open suspend fun doStartDevice(name: Name, config: DeviceLifecycleConfig, device: Device) {
if (config.startDelay > Duration.ZERO) delay(config.startDelay)
val startTimeout = config.startTimeout ?: Duration.INFINITE
val success = withTimeoutOrNull(startTimeout) {
check(device.lifecycleState != LifecycleState.STARTED) {
"Device $name is already started."
}
device.start()
}
if (success == null) {
@ -1507,7 +1547,30 @@ public abstract class AbstractDeviceHubManager(
* Called after a child device is physically stopped and removed in [removeJobFromRegistry].
* Override in a subclass if additional logic is needed.
*/
internal open fun onChildStop() {}
protected open fun onChildStop() {}
/**
* A method to rename a device dynamically. This moves the [ChildJob] from [oldName] to [newName].
* If [newName] already exists, an exception is thrown to prevent collisions.
*
* @param oldName The current name of the device.
* @param newName The new name for the device.
*/
public suspend fun renameDevice(oldName: Name, newName: Name) {
childLock.withLock {
require(!childrenJobs.containsKey(newName)) {
"A device with name $newName already exists; cannot rename."
}
val child = childrenJobs.remove(oldName) ?: error("Device not found: $oldName")
childrenJobs[newName] = child
}
systemBus.emit(SystemLogMessage("Device renamed from $oldName to $newName", sourceDevice = newName))
metricPublisher.publishMetric(
"device.rename",
1.0,
mapOf("oldName" to oldName.toString(), "newName" to newName.toString())
)
}
/**
* Starts multiple devices in a transactional manner.
@ -1631,7 +1694,7 @@ public abstract class AbstractDeviceHubManager(
* @param context The parent context.
* @param dispatcher The [CoroutineDispatcher] for concurrency.
*/
internal class DeviceHubManagerImpl(context: Context, dispatcher: CoroutineDispatcher) : AbstractDeviceHubManager(context, dispatcher) {
public class DeviceHubManagerImpl(context: Context, dispatcher: CoroutineDispatcher) : AbstractDeviceHubManager(context, dispatcher) {
override val messageBus: MutableSharedFlow<DeviceMessage> = MutableSharedFlow(
replay = 1000,
onBufferOverflow = BufferOverflow.DROP_OLDEST
@ -1706,7 +1769,7 @@ public open class ConfigurableCompositeControlComponent<D : ConfigurableComposit
override val devices: Map<Name, Device>
get() = hubManager.devices
private val childConfigs: List<ChildComponentConfig<*>> = spec.childSpecs.values.toList()
protected val childConfigs: List<ChildComponentConfig<*>> = spec.childSpecs.values.toList()
init {
hubManager.launchGlobal {
@ -1769,12 +1832,14 @@ public open class ConfigurableCompositeControlComponent<D : ConfigurableComposit
self.onOpen()
validate(self)
}
hubManager.devices.values.filter { it.lifecycleState == LifecycleState.INITIAL }.forEach { child ->
val mode = hubManager.childrenJobs[child.id.parseAsName()]?.lifecycleMode
if (mode != LifecycleMode.LAZY) {
child.start()
hubManager.devices.values
.filter { it.lifecycleState == LifecycleState.INITIAL }
.forEach { child ->
val mode = hubManager.childrenJobs[child.id.parseAsName()]?.lifecycleMode
if (mode != LifecycleMode.LAZY) {
child.start()
}
}
}
}
/**

@ -246,3 +246,84 @@ public fun CompositeControlComponentSpec<*>.validateSpec(device: Device) {
}
}
}
/**
* A read-only property with the ability to embed processing when reading a value.
*/
public fun <T, D : ConfigurableCompositeControlComponent<D>> CompositeControlComponentSpec<D>.checkedReadOnlyProperty(
converter: MetaConverter<T>,
name: String? = null,
descriptorBuilder: PropertyDescriptorBuilder.() -> Unit = {},
beforeRead: suspend D.(propertyName: String) -> Unit = { },
afterRead: suspend D.(propertyName: String, value: T?) -> Unit = { _, _ -> },
read: suspend D.(String) -> T?,
): PropertyDelegateProvider<CompositeControlComponentSpec<D>, ReadOnlyProperty<CompositeControlComponentSpec<D>, DevicePropertySpec<D, T>>> {
return property(
converter = converter,
descriptorBuilder = descriptorBuilder,
name = name
) { propertyName ->
beforeRead(this, propertyName)
val result = read(propertyName)
afterRead(this, propertyName, result)
result
}
}
/**
* Mutable property with the ability to embed processing when reading a value.
*/
public fun <T, D : ConfigurableCompositeControlComponent<D>> CompositeControlComponentSpec<D>.checkedMutableProperty(
converter: MetaConverter<T>,
name: String? = null,
descriptorBuilder: PropertyDescriptorBuilder.() -> Unit = {},
beforeRead: suspend D.(propertyName: String) -> Unit = { },
afterRead: suspend D.(propertyName: String, value: T?) -> Unit = { _, _ -> },
beforeWrite: suspend D.(propertyName: String, newValue: T) -> Unit = { _, _ -> },
afterWrite: suspend D.(propertyName: String, newValue: T) -> Unit = { _, _ -> },
read: suspend D.(String) -> T?,
write: suspend D.(String, T) -> Unit,
): PropertyDelegateProvider<CompositeControlComponentSpec<D>, ReadOnlyProperty<CompositeControlComponentSpec<D>, MutableDevicePropertySpec<D, T>>> {
return mutableProperty(
converter = converter,
descriptorBuilder = descriptorBuilder,
name = name,
read = { propertyName ->
beforeRead(this, propertyName)
val result = read(propertyName)
afterRead(this, propertyName, result)
result
},
write = { propertyName, value ->
beforeWrite(this, propertyName, value)
write(propertyName, value)
afterWrite(this, propertyName, value)
}
)
}
/**
* Property with a defaultValue for the case when `read(...)` will return null.
*/
public fun <T, D : ConfigurableCompositeControlComponent<D>> CompositeControlComponentSpec<D>.defaultValueProperty(
converter: MetaConverter<T>,
defaultValue: T,
name: String? = null,
descriptorBuilder: PropertyDescriptorBuilder.() -> Unit = {},
read: suspend D.(String) -> T?,
write: (suspend D.(String, T) -> Unit)? = null
): PropertyDelegateProvider<CompositeControlComponentSpec<D>, ReadOnlyProperty<CompositeControlComponentSpec<D>, DevicePropertySpec<D, T>>> {
return if (write == null) {
property(converter, descriptorBuilder, name) { propertyName ->
read(propertyName) ?: defaultValue
}
} else {
mutableProperty(converter, descriptorBuilder, name,
read = { propertyName -> read(propertyName) ?: defaultValue },
write = { propertyName, value -> write(propertyName, value) }
)
}
}