docs(CompositeControlComponentSpec): improved comments and code

This commit is contained in:
Максим Колпаков 2025-02-01 03:42:23 +03:00
parent 7d11650cab
commit 37e357a404
2 changed files with 282 additions and 101 deletions
controls-core/src
commonMain/kotlin/space/kscience/controls/spec
commonTest/kotlin/space/kscience/controls/spec

@ -19,6 +19,120 @@ import kotlin.reflect.KProperty
import kotlin.time.Duration import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds import kotlin.time.Duration.Companion.seconds
/**
* Extension function to safely get the completed value of a Deferred or return null.
*/
@OptIn(ExperimentalCoroutinesApi::class)
private fun <T> Deferred<T>.getCompletedOrNull(): T? =
if (isCompleted && !isCancelled) getCompleted() else null
private val globalExceptionHandler = CoroutineExceptionHandler { _, throwable ->
println("Unhandled exception in global scope: ${throwable.message}")
}
/**
* EventBus interface for publishing and subscribing to application-level events.
*/
public interface EventBus {
public val events: SharedFlow<Any>
public suspend fun publish(event: Any)
}
/**
* Default implementation of EventBus using a MutableSharedFlow.
*/
public class DefaultEventBus(
replay: Int = 100,
onBufferOverflow: BufferOverflow = BufferOverflow.DROP_OLDEST
) : EventBus {
private val _events = MutableSharedFlow<Any>(replay = replay, onBufferOverflow = onBufferOverflow)
override val events: SharedFlow<Any> get() = _events
override suspend fun publish(event: Any) {
_events.emit(event)
}
}
/**
* TransportAdapter interface for distributed communications.
* This abstraction allows plugging in different transport mechanisms.
*/
public interface TransportAdapter {
public suspend fun send(message: DeviceMessage)
public fun subscribe(): Flow<DeviceMessage>
}
/**
* Default stub implementation of TransportAdapter for in-process communication.
*/
public class DefaultTransportAdapter(
private val eventBus: EventBus
) : TransportAdapter {
private val _messages = MutableSharedFlow<DeviceMessage>(replay = 100, onBufferOverflow = BufferOverflow.DROP_OLDEST)
override suspend fun send(message: DeviceMessage) {
_messages.emit(message)
eventBus.publish("Message sent: ${message.sourceDevice}")
}
override fun subscribe(): Flow<DeviceMessage> = _messages.asSharedFlow()
}
/**
* TransactionManager interface for executing a block of operations.
*/
public interface TransactionManager {
/**
* Executes [block] within a transaction. If an exception occurs, a rollback is performed.
*/
public suspend fun <T> withTransaction(block: suspend () -> T): T
}
/**
* Default implementation of TransactionManager.
* This implementation wraps the block in a try/catch and publishes transaction events.
*/
public class DefaultTransactionManager(
private val eventBus: EventBus,
private val logger: Logger = DefaultLogManager()
) : TransactionManager {
override suspend fun <T> withTransaction(block: suspend () -> T): T {
eventBus.publish(TransactionEvent.TransactionStarted)
return try {
val result = block()
eventBus.publish(TransactionEvent.TransactionCommitted)
result
} catch (ex: Exception) {
logger.error(ex) { "Transaction failed, rolling back." }
eventBus.publish(TransactionEvent.TransactionRolledBack)
throw ex
}
}
}
/**
* Transaction events.
*/
public sealed class TransactionEvent {
public object TransactionStarted : TransactionEvent()
public object TransactionCommitted : TransactionEvent()
public object TransactionRolledBack : TransactionEvent()
}
/**
* Interface for publishing metrics.
*/
public interface MetricPublisher {
public fun publishMetric(name: String, value: Double, tags: Map<String, String> = emptyMap())
}
/**
* Default stub implementation of MetricPublisher which logs metrics.
*/
public class DefaultMetricPublisher : MetricPublisher {
override fun publishMetric(name: String, value: Double, tags: Map<String, String>) {
println("Metric published: $name = $value, tags: $tags")
}
}
/** /**
* Defines different modes of how a child device is coupled to its parent device. * Defines different modes of how a child device is coupled to its parent device.
* *
@ -476,7 +590,7 @@ public interface CompositeDeviceSpec<D : ConfigurableCompositeControlComponent<D
converter: MetaConverter<T>, converter: MetaConverter<T>,
descriptorBuilder: PropertyDescriptorBuilder.() -> Unit = {}, descriptorBuilder: PropertyDescriptorBuilder.() -> Unit = {},
name: String? = null, name: String? = null,
read: suspend D.(propertyName: String) -> T?, read: suspend D.(propertyName: String) -> T?
): PropertyDelegateProvider<CompositeDeviceSpec<D>, ReadOnlyProperty<CompositeDeviceSpec<D>, DevicePropertySpec<D, T>>> ): PropertyDelegateProvider<CompositeDeviceSpec<D>, ReadOnlyProperty<CompositeDeviceSpec<D>, DevicePropertySpec<D, T>>>
/** /**
@ -487,7 +601,7 @@ public interface CompositeDeviceSpec<D : ConfigurableCompositeControlComponent<D
descriptorBuilder: PropertyDescriptorBuilder.() -> Unit = {}, descriptorBuilder: PropertyDescriptorBuilder.() -> Unit = {},
name: String? = null, name: String? = null,
read: suspend D.(propertyName: String) -> T?, read: suspend D.(propertyName: String) -> T?,
write: suspend D.(propertyName: String, value: T) -> Unit, write: suspend D.(propertyName: String, value: T) -> Unit
): PropertyDelegateProvider<CompositeDeviceSpec<D>, ReadOnlyProperty<CompositeDeviceSpec<D>, MutableDevicePropertySpec<D, T>>> ): PropertyDelegateProvider<CompositeDeviceSpec<D>, ReadOnlyProperty<CompositeDeviceSpec<D>, MutableDevicePropertySpec<D, T>>>
/** /**
@ -498,7 +612,7 @@ public interface CompositeDeviceSpec<D : ConfigurableCompositeControlComponent<D
outputConverter: MetaConverter<O>, outputConverter: MetaConverter<O>,
descriptorBuilder: ActionDescriptorBuilder.() -> Unit = {}, descriptorBuilder: ActionDescriptorBuilder.() -> Unit = {},
name: String? = null, name: String? = null,
execute: suspend D.(I) -> O, execute: suspend D.(I) -> O
): PropertyDelegateProvider<CompositeDeviceSpec<D>, ReadOnlyProperty<CompositeDeviceSpec<D>, DeviceActionSpec<D, I, O>>> ): PropertyDelegateProvider<CompositeDeviceSpec<D>, ReadOnlyProperty<CompositeDeviceSpec<D>, DeviceActionSpec<D, I, O>>>
} }
@ -537,7 +651,7 @@ public open class CompositeControlComponentSpec<D : ConfigurableCompositeControl
} }
override fun validate(device: D) { override fun validate(device: D) {
// Verify that all declared properties and actions are indeed in the device's descriptors // Verify that all declared properties and actions are registered in the device.
properties.values.forEach { prop -> properties.values.forEach { prop ->
check(prop.descriptor in device.propertyDescriptors) { check(prop.descriptor in device.propertyDescriptors) {
"Property ${prop.descriptor.name} not registered in ${device.id}" "Property ${prop.descriptor.name} not registered in ${device.id}"
@ -598,7 +712,7 @@ public open class CompositeControlComponentSpec<D : ConfigurableCompositeControl
converter: MetaConverter<T>, converter: MetaConverter<T>,
descriptorBuilder: PropertyDescriptorBuilder.() -> Unit, descriptorBuilder: PropertyDescriptorBuilder.() -> Unit,
name: String?, name: String?,
read: suspend D.(propertyName: String) -> T?, read: suspend D.(propertyName: String) -> T?
): PropertyDelegateProvider<CompositeDeviceSpec<D>, ReadOnlyProperty<CompositeDeviceSpec<D>, DevicePropertySpec<D, T>>> = ): PropertyDelegateProvider<CompositeDeviceSpec<D>, ReadOnlyProperty<CompositeDeviceSpec<D>, DevicePropertySpec<D, T>>> =
PropertyDelegateProvider { _, property -> PropertyDelegateProvider { _, property ->
val propertyName = name ?: property.name val propertyName = name ?: property.name
@ -652,7 +766,7 @@ public open class CompositeControlComponentSpec<D : ConfigurableCompositeControl
specKeyInRegistry: Name? = null, specKeyInRegistry: Name? = null,
childDeviceName: Name? = null, childDeviceName: Name? = null,
metaBuilder: (MutableMeta.() -> Unit)? = null, metaBuilder: (MutableMeta.() -> Unit)? = null,
configBuilder: DeviceLifecycleConfigBuilder.() -> Unit = {}, configBuilder: DeviceLifecycleConfigBuilder.() -> Unit = {}
): PropertyDelegateProvider<CompositeControlComponentSpec<D>, ReadOnlyProperty<CompositeControlComponentSpec<D>, CompositeControlComponentSpec<CD>>> = ): PropertyDelegateProvider<CompositeControlComponentSpec<D>, ReadOnlyProperty<CompositeControlComponentSpec<D>, CompositeControlComponentSpec<CD>>> =
PropertyDelegateProvider { thisRef, property -> PropertyDelegateProvider { thisRef, property ->
val registryKey = specKeyInRegistry ?: property.name.asName() val registryKey = specKeyInRegistry ?: property.name.asName()
@ -711,7 +825,7 @@ public open class CompositeControlComponentSpec<D : ConfigurableCompositeControl
public fun <D : ConfigurableCompositeControlComponent<D>> CompositeControlComponentSpec<D>.unitAction( public fun <D : ConfigurableCompositeControlComponent<D>> CompositeControlComponentSpec<D>.unitAction(
descriptorBuilder: ActionDescriptorBuilder.() -> Unit = {}, descriptorBuilder: ActionDescriptorBuilder.() -> Unit = {},
name: String? = null, name: String? = null,
execute: suspend D.() -> Unit, execute: suspend D.() -> Unit
): PropertyDelegateProvider<CompositeDeviceSpec<D>, ReadOnlyProperty<CompositeDeviceSpec<D>, DeviceActionSpec<D, Unit, Unit>>> = ): PropertyDelegateProvider<CompositeDeviceSpec<D>, ReadOnlyProperty<CompositeDeviceSpec<D>, DeviceActionSpec<D, Unit, Unit>>> =
action(MetaConverter.unit, MetaConverter.unit, descriptorBuilder, name) { execute() } action(MetaConverter.unit, MetaConverter.unit, descriptorBuilder, name) { execute() }
@ -725,28 +839,38 @@ public fun <D : ConfigurableCompositeControlComponent<D>> CompositeControlCompon
public fun <D : ConfigurableCompositeControlComponent<D>> CompositeControlComponentSpec<D>.metaAction( public fun <D : ConfigurableCompositeControlComponent<D>> CompositeControlComponentSpec<D>.metaAction(
descriptorBuilder: ActionDescriptorBuilder.() -> Unit = {}, descriptorBuilder: ActionDescriptorBuilder.() -> Unit = {},
name: String? = null, name: String? = null,
execute: suspend D.(Meta) -> Meta, execute: suspend D.(Meta) -> Meta
): PropertyDelegateProvider<CompositeDeviceSpec<D>, ReadOnlyProperty<CompositeDeviceSpec<D>, DeviceActionSpec<D, Meta, Meta>>> = ): PropertyDelegateProvider<CompositeDeviceSpec<D>, ReadOnlyProperty<CompositeDeviceSpec<D>, DeviceActionSpec<D, Meta, Meta>>> =
action(MetaConverter.meta, MetaConverter.meta, descriptorBuilder, name) { execute(it) } action(MetaConverter.meta, MetaConverter.meta, descriptorBuilder, name) { execute(it) }
/** /**
* An abstract manager for devices, handling lifecycle, error policies, transactions, etc. * An abstract manager for devices, handling lifecycle, error policies, transactions, distributed transport,
* structured concurrency, and event/metric publishing.
* *
* Typically, you extend or instantiate this to manage a set of child [Device]s. * This class uses a global exception handler and SupervisorJob for centralized error handling.
*
* All coroutines are launched with the context: parentJob + dispatcher + globalExceptionHandler.
* *
* @param context The [Context] for logging and plugin management. * @param context The [Context] for logging and plugin management.
* @param dispatcher A [CoroutineDispatcher] for concurrency; default is [Dispatchers.Default]. * @param dispatcher A [CoroutineDispatcher] for concurrency; default is [Dispatchers.Default].
*/ */
public abstract class AbstractDeviceHubManager( public abstract class AbstractDeviceHubManager(
public val context: Context, public val context: Context,
private val dispatcher: CoroutineDispatcher = Dispatchers.Default, private val dispatcher: CoroutineDispatcher = Dispatchers.Default
) { ) {
/** /**
* The supervisor job for all child devices. * Global exception handler for all coroutines in this manager.
*/
protected val exceptionHandler: CoroutineExceptionHandler = CoroutineExceptionHandler { _, ex ->
context.logger.error(ex) { "Unhandled exception in global scope" }
}
/**
* SupervisorJob ensures that child coroutines are isolated.
*/ */
protected val parentJob: Job = SupervisorJob() protected val parentJob: Job = SupervisorJob()
protected val childLock = Mutex() protected val childLock: Mutex = Mutex()
/** /**
* Internal map that keeps track of each child's [ChildJob]. * Internal map that keeps track of each child's [ChildJob].
@ -782,6 +906,21 @@ public abstract class AbstractDeviceHubManager(
*/ */
public abstract val deviceChanges: MutableSharedFlow<DeviceStateEvent> public abstract val deviceChanges: MutableSharedFlow<DeviceStateEvent>
/**
* Additional EventBus for application-level events.
*/
public abstract val eventBus: EventBus
/**
* Metric publisher for logging and monitoring.
*/
public open val metricPublisher: MetricPublisher = DefaultMetricPublisher()
/**
* Transaction manager for wrapping critical operations.
*/
public abstract val transactionManager: TransactionManager
/** /**
* Tracks the number of restart attempts per device. * Tracks the number of restart attempts per device.
*/ */
@ -794,7 +933,7 @@ public abstract class AbstractDeviceHubManager(
private val restartingDevices: MutableSet<Name> = mutableSetOf() private val restartingDevices: MutableSet<Name> = mutableSetOf()
/** /**
* Represents a running child device along with its job, config, and flows. * Represents a running child device along with its job, configuration, and flows.
* *
* @property device The managed device instance. * @property device The managed device instance.
* @property collectorJob The coroutine job collecting messages from [device.messageFlow]. * @property collectorJob The coroutine job collecting messages from [device.messageFlow].
@ -817,10 +956,13 @@ public abstract class AbstractDeviceHubManager(
} }
/** /**
* Called when an error is thrown from a child's message-collecting coroutine or any other child logic. * Global function for launching coroutines with the combined context.
* */
* By default, logs the error. internal fun launchGlobal(block: suspend CoroutineScope.() -> Unit): Job =
* Override to provide custom reaction (before applying the [onError] policy). CoroutineScope(parentJob + dispatcher + exceptionHandler).launch { block() }
/**
* Called when an error is thrown from a child's coroutine.
*/ */
protected open suspend fun onChildErrorCaught(ex: Throwable, childName: Name, config: DeviceLifecycleConfig) { protected open suspend fun onChildErrorCaught(ex: Throwable, childName: Name, config: DeviceLifecycleConfig) {
context.logger.error(ex) { "Error in child device $childName with policy ${config.onError}" } context.logger.error(ex) { "Error in child device $childName with policy ${config.onError}" }
@ -894,7 +1036,7 @@ public abstract class AbstractDeviceHubManager(
replay = config.messageBuffer, replay = config.messageBuffer,
onBufferOverflow = BufferOverflow.DROP_OLDEST onBufferOverflow = BufferOverflow.DROP_OLDEST
) )
val childScope = config.coroutineScope ?: CoroutineScope(parentJob + dispatcher) val childScope = config.coroutineScope ?: CoroutineScope(parentJob + dispatcher + exceptionHandler)
val collectorJob = childScope.launch(CoroutineName("Collect device $name")) { val collectorJob = childScope.launch(CoroutineName("Collect device $name")) {
try { try {
@ -904,9 +1046,7 @@ public abstract class AbstractDeviceHubManager(
messageBus.emit(wrapped) messageBus.emit(wrapped)
} }
} catch (ex: Exception) { } catch (ex: Exception) {
if (ex is CancellationException) { if (ex is CancellationException) throw ex
throw ex
}
onChildErrorCaught(ex, name, config) onChildErrorCaught(ex, name, config)
deviceChanges.emit(DeviceStateEvent.DeviceFailed(name, ex)) deviceChanges.emit(DeviceStateEvent.DeviceFailed(name, ex))
messageBus.emit(DeviceMessage.error(ex, name)) messageBus.emit(DeviceMessage.error(ex, name))
@ -966,9 +1106,8 @@ public abstract class AbstractDeviceHubManager(
private fun calculateDelay(policy: RestartPolicy, attempts: Int): Duration { private fun calculateDelay(policy: RestartPolicy, attempts: Int): Duration {
return when (policy.strategy) { return when (policy.strategy) {
RestartStrategy.LINEAR -> policy.delayBetweenAttempts RestartStrategy.LINEAR -> policy.delayBetweenAttempts
RestartStrategy.EXPONENTIAL_BACKOFF -> RestartStrategy.EXPONENTIAL_BACKOFF -> policy.delayBetweenAttempts * 2.0.pow((attempts - 1).toDouble())
policy.delayBetweenAttempts * 2.0.pow((attempts - 1).toDouble()) RestartStrategy.CUSTOM -> Duration.ZERO // Custom strategy can be overridden.
RestartStrategy.CUSTOM -> Duration.ZERO // custom strategy can be overridden
} }
} }
@ -982,18 +1121,21 @@ public abstract class AbstractDeviceHubManager(
device: Device, device: Device,
bus: MutableSharedFlow<DeviceMessage> bus: MutableSharedFlow<DeviceMessage>
) { ) {
childLock.withLock { val shouldRemove = childLock.withLock {
val current = childrenJobs[name] val current = childrenJobs[name]
if (current?.device == device) { if (current?.device == device) {
childrenJobs.remove(name) childrenJobs.remove(name)
restartAttemptsMap.remove(name) restartAttemptsMap.remove(name)
} true
} else false
} }
bus.resetReplayCache() if (shouldRemove) {
deviceChanges.emit(DeviceStateEvent.DeviceDetached(name)) bus.resetReplayCache()
systemBus.emit(SystemLogMessage("Device $name physically removed.", sourceDevice = name)) deviceChanges.emit(DeviceStateEvent.DeviceDetached(name))
if (device is ConfigurableCompositeControlComponent<*>) { systemBus.emit(SystemLogMessage("Device $name physically removed.", sourceDevice = name))
device.onChildStop() if (device is ConfigurableCompositeControlComponent<*>) {
device.onChildStop()
}
} }
} }
@ -1029,15 +1171,11 @@ public abstract class AbstractDeviceHubManager(
} }
deviceChanges.emit(DeviceStateEvent.DeviceAdded(name)) deviceChanges.emit(DeviceStateEvent.DeviceAdded(name))
systemBus.emit(SystemLogMessage("Device $name attached, startMode=$startMode", sourceDevice = name)) systemBus.emit(SystemLogMessage("Device $name attached, startMode=$startMode", sourceDevice = name))
metricPublisher.publishMetric("device.attach", 1.0, mapOf("device" to name.toString()))
if (config.lifecycleMode == LifecycleMode.INDEPENDENT) return if (config.lifecycleMode == LifecycleMode.INDEPENDENT) return
when (startMode) { when (startMode) {
StartMode.NONE -> {} StartMode.NONE -> {}
StartMode.ASYNC -> { StartMode.ASYNC -> launchGlobal { doStartDevice(name, config, device) }
CoroutineScope(parentJob + dispatcher).launch {
doStartDevice(name, config, device)
}
}
StartMode.SYNC -> doStartDevice(name, config, device) StartMode.SYNC -> doStartDevice(name, config, device)
} }
} }
@ -1055,6 +1193,7 @@ public abstract class AbstractDeviceHubManager(
onStartTimeout(name, config) onStartTimeout(name, config)
} else { } else {
deviceChanges.emit(DeviceStateEvent.DeviceStarted(name)) deviceChanges.emit(DeviceStateEvent.DeviceStarted(name))
metricPublisher.publishMetric("device.start", 1.0, mapOf("device" to name.toString()))
if (config.restartPolicy.resetOnSuccess) restartAttemptsMap[name] = 0 if (config.restartPolicy.resetOnSuccess) restartAttemptsMap[name] = 0
} }
} }
@ -1064,7 +1203,21 @@ public abstract class AbstractDeviceHubManager(
* If [waitStop] = true, waits until the device fully stops. * If [waitStop] = true, waits until the device fully stops.
*/ */
public suspend fun detachDevice(name: Name, waitStop: Boolean = false) { public suspend fun detachDevice(name: Name, waitStop: Boolean = false) {
childLock.withLock { removeDeviceUnlocked(name, waitStop = waitStop) } val child = childLock.withLock {
childrenJobs.remove(name)?.also {
restartAttemptsMap.remove(name)
}
}
if (child != null) {
deviceChanges.emit(DeviceStateEvent.DeviceRemoved(name))
systemBus.emit(SystemLogMessage("Device $name removed (waitStop=$waitStop)", sourceDevice = name))
metricPublisher.publishMetric("device.detach", 1.0, mapOf("device" to name.toString()))
if (waitStop) {
performStop(child)
} else {
launchGlobal { performStop(child) }
}
}
} }
/** /**
@ -1078,6 +1231,7 @@ public abstract class AbstractDeviceHubManager(
onStartTimeout(name, config) onStartTimeout(name, config)
} else { } else {
deviceChanges.emit(DeviceStateEvent.DeviceStarted(name)) deviceChanges.emit(DeviceStateEvent.DeviceStarted(name))
metricPublisher.publishMetric("device.start", 1.0, mapOf("device" to name.toString()))
if (config.restartPolicy.resetOnSuccess) restartAttemptsMap[name] = 0 if (config.restartPolicy.resetOnSuccess) restartAttemptsMap[name] = 0
} }
} }
@ -1101,6 +1255,7 @@ public abstract class AbstractDeviceHubManager(
) )
childrenJobs[name] = newChild childrenJobs[name] = newChild
systemBus.emit(SystemLogMessage("Device $name restarted", sourceDevice = name)) systemBus.emit(SystemLogMessage("Device $name restarted", sourceDevice = name))
metricPublisher.publishMetric("device.restart", 1.0, mapOf("device" to name.toString()))
} }
val deviceRef = childrenJobs[name]?.device ?: return val deviceRef = childrenJobs[name]?.device ?: return
if (childrenJobs[name]?.config?.lifecycleMode != LifecycleMode.INDEPENDENT) { if (childrenJobs[name]?.config?.lifecycleMode != LifecycleMode.INDEPENDENT) {
@ -1113,20 +1268,25 @@ public abstract class AbstractDeviceHubManager(
* The device is removed (stopped), then re-added with the new mode. * The device is removed (stopped), then re-added with the new mode.
*/ */
public suspend fun changeLifecycleMode(name: Name, newMode: LifecycleMode) { public suspend fun changeLifecycleMode(name: Name, newMode: LifecycleMode) {
childLock.withLock { val old = childLock.withLock {
val old = childrenJobs[name] ?: error("Device $name not found") val existing = childrenJobs[name] ?: error("Device $name not found")
val newConfig = old.config.copy(lifecycleMode = newMode) val newConfig = existing.config.copy(lifecycleMode = newMode)
removeDeviceUnlocked(name, waitStop = true) childrenJobs.remove(name)
val newChild = launchChild( restartAttemptsMap.remove(name)
name, Triple(existing.device, newConfig, existing.meta)
old.device,
newConfig,
old.meta,
reuseBus = if (old.reuseBus) old.messageBus else null
)
childrenJobs[name] = newChild
systemBus.emit(SystemLogMessage("Device $name lifecycle changed to $newMode", sourceDevice = name))
} }
val newChild = launchChild(
name,
old.first,
old.second,
old.third,
reuseBus = null
)
childLock.withLock {
childrenJobs[name] = newChild
}
systemBus.emit(SystemLogMessage("Device $name lifecycle changed to $newMode", sourceDevice = name))
metricPublisher.publishMetric("device.lifecycle.change", 1.0, mapOf("device" to name.toString(), "newMode" to newMode.name))
val deviceRef = childrenJobs[name]?.device ?: return val deviceRef = childrenJobs[name]?.device ?: return
if (newMode != LifecycleMode.INDEPENDENT) { if (newMode != LifecycleMode.INDEPENDENT) {
finalizeDeviceStart(name, childrenJobs[name]!!.config, deviceRef) finalizeDeviceStart(name, childrenJobs[name]!!.config, deviceRef)
@ -1149,43 +1309,46 @@ public abstract class AbstractDeviceHubManager(
meta: Meta? = null, meta: Meta? = null,
reuseMessageBus: Boolean = false reuseMessageBus: Boolean = false
) { ) {
childLock.withLock { transactionManager.withTransaction {
val old = childrenJobs[name] val oldBus = childLock.withLock { childrenJobs[name]?.messageBus }
val oldBus = if (reuseMessageBus) old?.messageBus else null
removeDeviceUnlocked(name, waitStop = true) removeDeviceUnlocked(name, waitStop = true)
val newChild = launchChild(name, newDevice, config, meta, oldBus) childLock.withLock {
childrenJobs[name] = newChild val newChild = launchChild(name, newDevice, config, meta, oldBus)
systemBus.emit(SystemLogMessage("Device $name hot-swapped", sourceDevice = name)) childrenJobs[name] = newChild
} systemBus.emit(SystemLogMessage("Device $name hot-swapped", sourceDevice = name))
val deviceRef = childrenJobs[name]?.device ?: return metricPublisher.publishMetric("device.hotswap", 1.0, mapOf("device" to name.toString()))
if (config.lifecycleMode != LifecycleMode.INDEPENDENT) { }
finalizeDeviceStart(name, config, deviceRef) val deviceRef = childLock.withLock { childrenJobs[name]?.device }
if (deviceRef != null && config.lifecycleMode != LifecycleMode.INDEPENDENT) {
finalizeDeviceStart(name, config, deviceRef)
}
} }
} }
/** /**
* Internal function to remove (and optionally wait-stop) a device by [name]. * Internal function to remove (and optionally wait-stop) a device by [name].
* This method must be called from within [childLock].
* *
* @param waitStop If true, waits for stopping within [DeviceLifecycleConfig.stopTimeout]. * @param waitStop If true, waits for stopping within [DeviceLifecycleConfig.stopTimeout].
*/ */
private suspend fun removeDeviceUnlocked(name: Name, waitStop: Boolean) { private suspend fun removeDeviceUnlocked(name: Name, waitStop: Boolean) {
val child = childrenJobs[name] ?: return val child = childLock.withLock { childrenJobs[name] } ?: return
childrenJobs.remove(name) childLock.withLock {
restartAttemptsMap.remove(name) childrenJobs.remove(name)
restartAttemptsMap.remove(name)
}
deviceChanges.emit(DeviceStateEvent.DeviceRemoved(name)) deviceChanges.emit(DeviceStateEvent.DeviceRemoved(name))
systemBus.emit(SystemLogMessage("Device $name removed (waitStop=$waitStop)", sourceDevice = name)) systemBus.emit(SystemLogMessage("Device $name removed (waitStop=$waitStop)", sourceDevice = name))
if (waitStop) { if (waitStop) {
performStop(child) performStop(child)
} else { } else {
CoroutineScope(parentJob + dispatcher).launch { performStop(child) } launchGlobal { performStop(child) }
} }
} }
/** /**
* Performs the actual stopping sequence: * Performs the actual stopping sequence:
* 1) Attempt `device.stop()` with [stopTimeout] * 1) Attempt `device.stop()` with [stopTimeout]
* 2) Cancel and join the collector job * 2) Cancel and join the collector job.
*/ */
private suspend fun performStop(child: ChildJob) { private suspend fun performStop(child: ChildJob) {
val timeout = child.config.stopTimeout ?: Duration.INFINITE val timeout = child.config.stopTimeout ?: Duration.INFINITE
@ -1196,7 +1359,10 @@ public abstract class AbstractDeviceHubManager(
if (result == null) { if (result == null) {
onStopTimeout(deviceName, child.config) onStopTimeout(deviceName, child.config)
} }
child.collectorJob.cancelAndJoin() withContext(NonCancellable) {
child.collectorJob.cancelAndJoin()
}
metricPublisher.publishMetric("device.stop", 1.0, mapOf("device" to deviceName.toString()))
} }
/** /**
@ -1236,12 +1402,11 @@ public abstract class AbstractDeviceHubManager(
} else null } else null
} }
} }
return@coroutineScope try { try {
deferredList.awaitAll() deferredList.awaitAll()
true true
} catch (ex: Exception) { } catch (ex: Exception) {
context.logger.error(ex) { "Failed to start device batch. Rolling back." } context.logger.error(ex) { "Failed to start device batch. Rolling back." }
// rollback: stop those that were started
deferredList.mapNotNull { it.getCompletedOrNull() }.forEach { dn -> deferredList.mapNotNull { it.getCompletedOrNull() }.forEach { dn ->
childrenJobs[dn]?.let { job -> childrenJobs[dn]?.let { job ->
try { try {
@ -1280,12 +1445,11 @@ public abstract class AbstractDeviceHubManager(
} else null } else null
} }
} }
return@coroutineScope try { try {
deferredList.awaitAll() deferredList.awaitAll()
true true
} catch (ex: Exception) { } catch (ex: Exception) {
context.logger.error(ex) { "Failed to stop device batch. Rolling back." } context.logger.error(ex) { "Failed to stop device batch. Rolling back." }
// rollback: start those that were stopped
deferredList.mapNotNull { it.getCompletedOrNull() }.forEach { dn -> deferredList.mapNotNull { it.getCompletedOrNull() }.forEach { dn ->
childrenJobs[dn]?.let { job -> childrenJobs[dn]?.let { job ->
try { try {
@ -1300,15 +1464,12 @@ public abstract class AbstractDeviceHubManager(
} }
} }
@OptIn(ExperimentalCoroutinesApi::class)
private fun <T> Deferred<T>.getCompletedOrNull(): T? = if (isCompleted && !isCancelled) getCompleted() else null
/** /**
* Optionally set up a distributed transport or message broker for the managed devices. * Optionally set up a distributed transport or message broker for the managed devices.
* By default, this is a stub that logs an informational message. * By default, this is a stub that logs an informational message.
*/ */
public open fun installDistributedTransport() { public open fun installDistributedTransport() {
context.logger.info { "installDistributedTransport: implement or override for custom broker." } context.logger.info { "installDistributedTransport: Implement or override for custom broker." }
} }
/** /**
@ -1322,6 +1483,10 @@ public abstract class AbstractDeviceHubManager(
} }
} }
} }
public suspend fun shutdown() {
parentJob.cancelAndJoin()
}
} }
/** /**
@ -1339,7 +1504,15 @@ private class DeviceHubManagerImpl(context: Context, dispatcher: CoroutineDispat
replay = 50, replay = 50,
onBufferOverflow = BufferOverflow.DROP_OLDEST onBufferOverflow = BufferOverflow.DROP_OLDEST
) )
override val deviceChanges: MutableSharedFlow<DeviceStateEvent> = MutableSharedFlow(replay = 1) override val deviceChanges: MutableSharedFlow<DeviceStateEvent> = MutableSharedFlow(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
override val eventBus: DefaultEventBus = DefaultEventBus(
replay = 100,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
override val transactionManager: TransactionManager = DefaultTransactionManager(eventBus)
} }
/** /**
@ -1374,7 +1547,7 @@ public open class ConfigurableCompositeControlComponent<D : ConfigurableComposit
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
config: DeviceLifecycleConfig = DeviceLifecycleConfig(), config: DeviceLifecycleConfig = DeviceLifecycleConfig(),
registry: ComponentRegistry? = null, registry: ComponentRegistry? = null,
public val hubManager: AbstractDeviceHubManager = DeviceHubManagerImpl(context, config.dispatcher ?: Dispatchers.Default), public val hubManager: AbstractDeviceHubManager = DeviceHubManagerImpl(context, config.dispatcher ?: Dispatchers.Default)
) : DeviceBase<D>(context, meta), CompositeControlComponent { ) : DeviceBase<D>(context, meta), CompositeControlComponent {
public val effectiveRegistry: ComponentRegistry? = registry ?: context.componentRegistry public val effectiveRegistry: ComponentRegistry? = registry ?: context.componentRegistry
@ -1400,26 +1573,24 @@ public open class ConfigurableCompositeControlComponent<D : ConfigurableComposit
private val childConfigs: List<ChildComponentConfig<*>> = spec.childSpecs.values.toList() private val childConfigs: List<ChildComponentConfig<*>> = spec.childSpecs.values.toList()
init { init {
// Setup action execution: wrap execute block with error handling hubManager.launchGlobal {
spec.actions.values.forEach { actionSpec -> spec.actions.values.forEach { actionSpec ->
launch {
val actionName = actionSpec.name
messageFlow messageFlow
.filterIsInstance<ActionExecuteMessage>() .filterIsInstance<ActionExecuteMessage>()
.filter { it.action == actionName } .filter { it.action == actionSpec.name }
.onEach { msg -> .onEach { msg ->
try { try {
val result = execute(actionName, msg.argument) val result = execute(actionSpec.name, msg.argument)
messageBus.emit( messageBus.emit(
ActionResultMessage( ActionResultMessage(
action = actionName, action = actionSpec.name,
result = result, result = result,
requestId = msg.requestId, requestId = msg.requestId,
sourceDevice = id.asName() sourceDevice = id.asName()
) )
) )
} catch (ex: Exception) { } catch (ex: Exception) {
logger.error(ex) { "Error executing action $actionName on device $id" } logger.error(ex) { "Error executing action ${actionSpec.name} on device $id" }
messageBus.emit(DeviceMessage.error(ex, id.asName())) messageBus.emit(DeviceMessage.error(ex, id.asName()))
} }
} }
@ -1462,7 +1633,6 @@ public open class ConfigurableCompositeControlComponent<D : ConfigurableComposit
self.onOpen() self.onOpen()
validate(self) validate(self)
} }
// Start child devices that are in INITIAL state and not marked as LAZY
hubManager.devices.values.filter { it.lifecycleState == LifecycleState.INITIAL }.forEach { child -> hubManager.devices.values.filter { it.lifecycleState == LifecycleState.INITIAL }.forEach { child ->
val mode = hubManager.childrenJobs[child.id.parseAsName()]?.lifecycleMode val mode = hubManager.childrenJobs[child.id.parseAsName()]?.lifecycleMode
if (mode != LifecycleMode.LAZY) { if (mode != LifecycleMode.LAZY) {
@ -1472,18 +1642,14 @@ public open class ConfigurableCompositeControlComponent<D : ConfigurableComposit
} }
/** /**
* Called when this device is stopping. * Called when the device stops.
* Default logic: attempt to stop each child device (if started), then call [spec.onClose].
*/ */
override suspend fun onStop() { override suspend fun onStop() {
// Stop each child device concurrently with proper error handling
hubManager.devices.values.forEach { child -> hubManager.devices.values.forEach { child ->
if (child.lifecycleState == LifecycleState.STARTED) { if (child.lifecycleState == LifecycleState.STARTED) {
launch(child.coroutineContext) { launch(child.coroutineContext) {
val stopTimeout = hubManager.childrenJobs[child.id.parseAsName()]?.config?.stopTimeout ?: Duration.INFINITE val stopTimeout = hubManager.childrenJobs[child.id.parseAsName()]?.config?.stopTimeout ?: Duration.INFINITE
val stopped = withTimeoutOrNull(stopTimeout) { val stopped = withTimeoutOrNull(stopTimeout) { child.stop() }
child.stop()
}
if (stopped == null) { if (stopped == null) {
hubManager.childrenJobs[child.id.parseAsName()]?.let { job -> hubManager.childrenJobs[child.id.parseAsName()]?.let { job ->
hubManager.onStopTimeout(child.id.parseAsName(), job.config) hubManager.onStopTimeout(child.id.parseAsName(), job.config)

@ -3,7 +3,9 @@
package space.kscience.controls.spec package space.kscience.controls.spec
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import space.kscience.controls.api.* import space.kscience.controls.api.*
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
@ -13,18 +15,23 @@ import space.kscience.dataforge.meta.int
import space.kscience.dataforge.names.asName import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.parseAsName import space.kscience.dataforge.names.parseAsName
import kotlin.test.* import kotlin.test.*
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.test.StandardTestDispatcher
/** /**
* A simple [AbstractDeviceHubManager] implementation for tests, providing * A simple [AbstractDeviceHubManager] implementation for tests, providing
* message flows and basic logging of events. * message flows, event bus, and basic logging of events.
*/ */
private class TestDeviceHubManager( private class TestDeviceHubManager(
context: Context, context: Context,
dispatcher: CoroutineDispatcher = Dispatchers.Default dispatcher: CoroutineDispatcher = Dispatchers.Default
) : AbstractDeviceHubManager(context, dispatcher) { ) : AbstractDeviceHubManager(context, dispatcher) {
override val messageBus = MutableSharedFlow<DeviceMessage>(replay = 100) override val messageBus = MutableSharedFlow<DeviceMessage>(replay = 100, onBufferOverflow = BufferOverflow.DROP_OLDEST)
override val systemBus = MutableSharedFlow<SystemLogMessage>(replay = 50) override val systemBus = MutableSharedFlow<SystemLogMessage>(replay = 50, onBufferOverflow = BufferOverflow.DROP_OLDEST)
override val deviceChanges = MutableSharedFlow<DeviceStateEvent>(replay = 50) override val deviceChanges = MutableSharedFlow<DeviceStateEvent>(replay = 50, onBufferOverflow = BufferOverflow.DROP_OLDEST)
override val eventBus: EventBus = DefaultEventBus(replay = 100, onBufferOverflow = BufferOverflow.DROP_OLDEST)
override val transactionManager: TransactionManager = DefaultTransactionManager(eventBus)
} }
/** /**
@ -658,7 +665,7 @@ class CompositeControlTest {
val pump = SyringePumpDevice(context, Meta { "maxVolume" put 5.0 }) val pump = SyringePumpDevice(context, Meta { "maxVolume" put 5.0 })
pump.setVolume(10.0) pump.setVolume(10.0)
assertEquals(0.0, pump.getVolume(), "Volume should not change on invalid value") assertEquals(0.0, pump.getVolume(), "Volume should remain 0 on invalid value")
} }
@Test @Test
@ -764,6 +771,7 @@ class CompositeControlTest {
manager.detachDevice(name, true) manager.detachDevice(name, true)
assertFalse(name in manager.devices.keys, "The device should be removed from the manager.") assertFalse(name in manager.devices.keys, "The device should be removed from the manager.")
manager.shutdown()
} }
@Test @Test
@ -793,8 +801,9 @@ class CompositeControlTest {
@Test @Test
fun `test hot swap device`() = runTest { fun `test hot swap device`() = runTest {
val testDispatcher = StandardTestDispatcher(testScheduler)
val context = createTestContext() val context = createTestContext()
val manager = TestDeviceHubManager(context) val manager = TestDeviceHubManager(context, testDispatcher)
val oldDevice = StepperMotorDevice(context, Meta { "maxPosition" put 100 }) val oldDevice = StepperMotorDevice(context, Meta { "maxPosition" put 100 })
val name = "motorSwap".asName() val name = "motorSwap".asName()
@ -814,8 +823,14 @@ class CompositeControlTest {
val current = manager.devices[name] val current = manager.devices[name]
assertNotNull(current, "New device should be present after hot swap") assertNotNull(current, "New device should be present after hot swap")
assertTrue(current === newDevice, "Manager should reference the new device instance") assertTrue(current === newDevice, "Manager should reference the new device instance")
assertEquals(999, newDevice.maxPosition) assertEquals(999, newDevice.maxPosition)
val events = manager.eventBus.events.take(2).toList()
assertTrue(events.any { it is TransactionEvent.TransactionStarted }, "TransactionStarted event expected")
assertTrue(events.any { it is TransactionEvent.TransactionCommitted }, "TransactionCommitted event expected")
manager.detachDevice(name, waitStop = true)
manager.shutdown()
} }
} }