From 37e357a4045c055589cb50fdbb53b67c4afd787c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=BE=D0=BB=D0=BF=D0=B0=D0=BA=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC?= <mkolpakov2002@yandex.ru> Date: Sat, 1 Feb 2025 03:42:23 +0300 Subject: [PATCH] docs(CompositeControlComponentSpec): improved comments and code --- .../spec/CompositeControlComponents.kt | 354 +++++++++++++----- .../controls/spec/CompositeControlTest.kt | 29 +- 2 files changed, 282 insertions(+), 101 deletions(-) diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/CompositeControlComponents.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/CompositeControlComponents.kt index d90826b..35e7fd4 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/CompositeControlComponents.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/CompositeControlComponents.kt @@ -19,6 +19,120 @@ import kotlin.reflect.KProperty import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds +/** + * Extension function to safely get the completed value of a Deferred or return null. + */ +@OptIn(ExperimentalCoroutinesApi::class) +private fun <T> Deferred<T>.getCompletedOrNull(): T? = + if (isCompleted && !isCancelled) getCompleted() else null + +private val globalExceptionHandler = CoroutineExceptionHandler { _, throwable -> + println("Unhandled exception in global scope: ${throwable.message}") +} + +/** + * EventBus interface for publishing and subscribing to application-level events. + */ +public interface EventBus { + public val events: SharedFlow<Any> + public suspend fun publish(event: Any) +} + +/** + * Default implementation of EventBus using a MutableSharedFlow. + */ +public class DefaultEventBus( + replay: Int = 100, + onBufferOverflow: BufferOverflow = BufferOverflow.DROP_OLDEST +) : EventBus { + private val _events = MutableSharedFlow<Any>(replay = replay, onBufferOverflow = onBufferOverflow) + override val events: SharedFlow<Any> get() = _events + + override suspend fun publish(event: Any) { + _events.emit(event) + } +} + +/** + * TransportAdapter interface for distributed communications. + * This abstraction allows plugging in different transport mechanisms. + */ +public interface TransportAdapter { + public suspend fun send(message: DeviceMessage) + public fun subscribe(): Flow<DeviceMessage> +} + +/** + * Default stub implementation of TransportAdapter for in-process communication. + */ +public class DefaultTransportAdapter( + private val eventBus: EventBus +) : TransportAdapter { + private val _messages = MutableSharedFlow<DeviceMessage>(replay = 100, onBufferOverflow = BufferOverflow.DROP_OLDEST) + override suspend fun send(message: DeviceMessage) { + _messages.emit(message) + eventBus.publish("Message sent: ${message.sourceDevice}") + } + override fun subscribe(): Flow<DeviceMessage> = _messages.asSharedFlow() +} + +/** + * TransactionManager interface for executing a block of operations. + */ +public interface TransactionManager { + /** + * Executes [block] within a transaction. If an exception occurs, a rollback is performed. + */ + public suspend fun <T> withTransaction(block: suspend () -> T): T +} + +/** + * Default implementation of TransactionManager. + * This implementation wraps the block in a try/catch and publishes transaction events. + */ +public class DefaultTransactionManager( + private val eventBus: EventBus, + private val logger: Logger = DefaultLogManager() +) : TransactionManager { + override suspend fun <T> withTransaction(block: suspend () -> T): T { + eventBus.publish(TransactionEvent.TransactionStarted) + return try { + val result = block() + eventBus.publish(TransactionEvent.TransactionCommitted) + result + } catch (ex: Exception) { + logger.error(ex) { "Transaction failed, rolling back." } + eventBus.publish(TransactionEvent.TransactionRolledBack) + throw ex + } + } +} + +/** + * Transaction events. + */ +public sealed class TransactionEvent { + public object TransactionStarted : TransactionEvent() + public object TransactionCommitted : TransactionEvent() + public object TransactionRolledBack : TransactionEvent() +} + +/** + * Interface for publishing metrics. + */ +public interface MetricPublisher { + public fun publishMetric(name: String, value: Double, tags: Map<String, String> = emptyMap()) +} + +/** + * Default stub implementation of MetricPublisher which logs metrics. + */ +public class DefaultMetricPublisher : MetricPublisher { + override fun publishMetric(name: String, value: Double, tags: Map<String, String>) { + println("Metric published: $name = $value, tags: $tags") + } +} + /** * Defines different modes of how a child device is coupled to its parent device. * @@ -476,7 +590,7 @@ public interface CompositeDeviceSpec<D : ConfigurableCompositeControlComponent<D converter: MetaConverter<T>, descriptorBuilder: PropertyDescriptorBuilder.() -> Unit = {}, name: String? = null, - read: suspend D.(propertyName: String) -> T?, + read: suspend D.(propertyName: String) -> T? ): PropertyDelegateProvider<CompositeDeviceSpec<D>, ReadOnlyProperty<CompositeDeviceSpec<D>, DevicePropertySpec<D, T>>> /** @@ -487,7 +601,7 @@ public interface CompositeDeviceSpec<D : ConfigurableCompositeControlComponent<D descriptorBuilder: PropertyDescriptorBuilder.() -> Unit = {}, name: String? = null, read: suspend D.(propertyName: String) -> T?, - write: suspend D.(propertyName: String, value: T) -> Unit, + write: suspend D.(propertyName: String, value: T) -> Unit ): PropertyDelegateProvider<CompositeDeviceSpec<D>, ReadOnlyProperty<CompositeDeviceSpec<D>, MutableDevicePropertySpec<D, T>>> /** @@ -498,7 +612,7 @@ public interface CompositeDeviceSpec<D : ConfigurableCompositeControlComponent<D outputConverter: MetaConverter<O>, descriptorBuilder: ActionDescriptorBuilder.() -> Unit = {}, name: String? = null, - execute: suspend D.(I) -> O, + execute: suspend D.(I) -> O ): PropertyDelegateProvider<CompositeDeviceSpec<D>, ReadOnlyProperty<CompositeDeviceSpec<D>, DeviceActionSpec<D, I, O>>> } @@ -537,7 +651,7 @@ public open class CompositeControlComponentSpec<D : ConfigurableCompositeControl } override fun validate(device: D) { - // Verify that all declared properties and actions are indeed in the device's descriptors + // Verify that all declared properties and actions are registered in the device. properties.values.forEach { prop -> check(prop.descriptor in device.propertyDescriptors) { "Property ${prop.descriptor.name} not registered in ${device.id}" @@ -598,7 +712,7 @@ public open class CompositeControlComponentSpec<D : ConfigurableCompositeControl converter: MetaConverter<T>, descriptorBuilder: PropertyDescriptorBuilder.() -> Unit, name: String?, - read: suspend D.(propertyName: String) -> T?, + read: suspend D.(propertyName: String) -> T? ): PropertyDelegateProvider<CompositeDeviceSpec<D>, ReadOnlyProperty<CompositeDeviceSpec<D>, DevicePropertySpec<D, T>>> = PropertyDelegateProvider { _, property -> val propertyName = name ?: property.name @@ -652,7 +766,7 @@ public open class CompositeControlComponentSpec<D : ConfigurableCompositeControl specKeyInRegistry: Name? = null, childDeviceName: Name? = null, metaBuilder: (MutableMeta.() -> Unit)? = null, - configBuilder: DeviceLifecycleConfigBuilder.() -> Unit = {}, + configBuilder: DeviceLifecycleConfigBuilder.() -> Unit = {} ): PropertyDelegateProvider<CompositeControlComponentSpec<D>, ReadOnlyProperty<CompositeControlComponentSpec<D>, CompositeControlComponentSpec<CD>>> = PropertyDelegateProvider { thisRef, property -> val registryKey = specKeyInRegistry ?: property.name.asName() @@ -711,7 +825,7 @@ public open class CompositeControlComponentSpec<D : ConfigurableCompositeControl public fun <D : ConfigurableCompositeControlComponent<D>> CompositeControlComponentSpec<D>.unitAction( descriptorBuilder: ActionDescriptorBuilder.() -> Unit = {}, name: String? = null, - execute: suspend D.() -> Unit, + execute: suspend D.() -> Unit ): PropertyDelegateProvider<CompositeDeviceSpec<D>, ReadOnlyProperty<CompositeDeviceSpec<D>, DeviceActionSpec<D, Unit, Unit>>> = action(MetaConverter.unit, MetaConverter.unit, descriptorBuilder, name) { execute() } @@ -725,28 +839,38 @@ public fun <D : ConfigurableCompositeControlComponent<D>> CompositeControlCompon public fun <D : ConfigurableCompositeControlComponent<D>> CompositeControlComponentSpec<D>.metaAction( descriptorBuilder: ActionDescriptorBuilder.() -> Unit = {}, name: String? = null, - execute: suspend D.(Meta) -> Meta, + execute: suspend D.(Meta) -> Meta ): PropertyDelegateProvider<CompositeDeviceSpec<D>, ReadOnlyProperty<CompositeDeviceSpec<D>, DeviceActionSpec<D, Meta, Meta>>> = action(MetaConverter.meta, MetaConverter.meta, descriptorBuilder, name) { execute(it) } /** - * An abstract manager for devices, handling lifecycle, error policies, transactions, etc. + * An abstract manager for devices, handling lifecycle, error policies, transactions, distributed transport, + * structured concurrency, and event/metric publishing. * - * Typically, you extend or instantiate this to manage a set of child [Device]s. + * This class uses a global exception handler and SupervisorJob for centralized error handling. + * + * All coroutines are launched with the context: parentJob + dispatcher + globalExceptionHandler. * * @param context The [Context] for logging and plugin management. * @param dispatcher A [CoroutineDispatcher] for concurrency; default is [Dispatchers.Default]. */ public abstract class AbstractDeviceHubManager( public val context: Context, - private val dispatcher: CoroutineDispatcher = Dispatchers.Default, + private val dispatcher: CoroutineDispatcher = Dispatchers.Default ) { /** - * The supervisor job for all child devices. + * Global exception handler for all coroutines in this manager. + */ + protected val exceptionHandler: CoroutineExceptionHandler = CoroutineExceptionHandler { _, ex -> + context.logger.error(ex) { "Unhandled exception in global scope" } + } + + /** + * SupervisorJob ensures that child coroutines are isolated. */ protected val parentJob: Job = SupervisorJob() - protected val childLock = Mutex() + protected val childLock: Mutex = Mutex() /** * Internal map that keeps track of each child's [ChildJob]. @@ -782,6 +906,21 @@ public abstract class AbstractDeviceHubManager( */ public abstract val deviceChanges: MutableSharedFlow<DeviceStateEvent> + /** + * Additional EventBus for application-level events. + */ + public abstract val eventBus: EventBus + + /** + * Metric publisher for logging and monitoring. + */ + public open val metricPublisher: MetricPublisher = DefaultMetricPublisher() + + /** + * Transaction manager for wrapping critical operations. + */ + public abstract val transactionManager: TransactionManager + /** * Tracks the number of restart attempts per device. */ @@ -794,7 +933,7 @@ public abstract class AbstractDeviceHubManager( private val restartingDevices: MutableSet<Name> = mutableSetOf() /** - * Represents a running child device along with its job, config, and flows. + * Represents a running child device along with its job, configuration, and flows. * * @property device The managed device instance. * @property collectorJob The coroutine job collecting messages from [device.messageFlow]. @@ -817,10 +956,13 @@ public abstract class AbstractDeviceHubManager( } /** - * Called when an error is thrown from a child's message-collecting coroutine or any other child logic. - * - * By default, logs the error. - * Override to provide custom reaction (before applying the [onError] policy). + * Global function for launching coroutines with the combined context. + */ + internal fun launchGlobal(block: suspend CoroutineScope.() -> Unit): Job = + CoroutineScope(parentJob + dispatcher + exceptionHandler).launch { block() } + + /** + * Called when an error is thrown from a child's coroutine. */ protected open suspend fun onChildErrorCaught(ex: Throwable, childName: Name, config: DeviceLifecycleConfig) { context.logger.error(ex) { "Error in child device $childName with policy ${config.onError}" } @@ -894,7 +1036,7 @@ public abstract class AbstractDeviceHubManager( replay = config.messageBuffer, onBufferOverflow = BufferOverflow.DROP_OLDEST ) - val childScope = config.coroutineScope ?: CoroutineScope(parentJob + dispatcher) + val childScope = config.coroutineScope ?: CoroutineScope(parentJob + dispatcher + exceptionHandler) val collectorJob = childScope.launch(CoroutineName("Collect device $name")) { try { @@ -904,9 +1046,7 @@ public abstract class AbstractDeviceHubManager( messageBus.emit(wrapped) } } catch (ex: Exception) { - if (ex is CancellationException) { - throw ex - } + if (ex is CancellationException) throw ex onChildErrorCaught(ex, name, config) deviceChanges.emit(DeviceStateEvent.DeviceFailed(name, ex)) messageBus.emit(DeviceMessage.error(ex, name)) @@ -966,9 +1106,8 @@ public abstract class AbstractDeviceHubManager( private fun calculateDelay(policy: RestartPolicy, attempts: Int): Duration { return when (policy.strategy) { RestartStrategy.LINEAR -> policy.delayBetweenAttempts - RestartStrategy.EXPONENTIAL_BACKOFF -> - policy.delayBetweenAttempts * 2.0.pow((attempts - 1).toDouble()) - RestartStrategy.CUSTOM -> Duration.ZERO // custom strategy can be overridden + RestartStrategy.EXPONENTIAL_BACKOFF -> policy.delayBetweenAttempts * 2.0.pow((attempts - 1).toDouble()) + RestartStrategy.CUSTOM -> Duration.ZERO // Custom strategy can be overridden. } } @@ -982,18 +1121,21 @@ public abstract class AbstractDeviceHubManager( device: Device, bus: MutableSharedFlow<DeviceMessage> ) { - childLock.withLock { + val shouldRemove = childLock.withLock { val current = childrenJobs[name] if (current?.device == device) { childrenJobs.remove(name) restartAttemptsMap.remove(name) - } + true + } else false } - bus.resetReplayCache() - deviceChanges.emit(DeviceStateEvent.DeviceDetached(name)) - systemBus.emit(SystemLogMessage("Device $name physically removed.", sourceDevice = name)) - if (device is ConfigurableCompositeControlComponent<*>) { - device.onChildStop() + if (shouldRemove) { + bus.resetReplayCache() + deviceChanges.emit(DeviceStateEvent.DeviceDetached(name)) + systemBus.emit(SystemLogMessage("Device $name physically removed.", sourceDevice = name)) + if (device is ConfigurableCompositeControlComponent<*>) { + device.onChildStop() + } } } @@ -1029,15 +1171,11 @@ public abstract class AbstractDeviceHubManager( } deviceChanges.emit(DeviceStateEvent.DeviceAdded(name)) systemBus.emit(SystemLogMessage("Device $name attached, startMode=$startMode", sourceDevice = name)) + metricPublisher.publishMetric("device.attach", 1.0, mapOf("device" to name.toString())) if (config.lifecycleMode == LifecycleMode.INDEPENDENT) return - when (startMode) { StartMode.NONE -> {} - StartMode.ASYNC -> { - CoroutineScope(parentJob + dispatcher).launch { - doStartDevice(name, config, device) - } - } + StartMode.ASYNC -> launchGlobal { doStartDevice(name, config, device) } StartMode.SYNC -> doStartDevice(name, config, device) } } @@ -1055,6 +1193,7 @@ public abstract class AbstractDeviceHubManager( onStartTimeout(name, config) } else { deviceChanges.emit(DeviceStateEvent.DeviceStarted(name)) + metricPublisher.publishMetric("device.start", 1.0, mapOf("device" to name.toString())) if (config.restartPolicy.resetOnSuccess) restartAttemptsMap[name] = 0 } } @@ -1064,7 +1203,21 @@ public abstract class AbstractDeviceHubManager( * If [waitStop] = true, waits until the device fully stops. */ public suspend fun detachDevice(name: Name, waitStop: Boolean = false) { - childLock.withLock { removeDeviceUnlocked(name, waitStop = waitStop) } + val child = childLock.withLock { + childrenJobs.remove(name)?.also { + restartAttemptsMap.remove(name) + } + } + if (child != null) { + deviceChanges.emit(DeviceStateEvent.DeviceRemoved(name)) + systemBus.emit(SystemLogMessage("Device $name removed (waitStop=$waitStop)", sourceDevice = name)) + metricPublisher.publishMetric("device.detach", 1.0, mapOf("device" to name.toString())) + if (waitStop) { + performStop(child) + } else { + launchGlobal { performStop(child) } + } + } } /** @@ -1078,6 +1231,7 @@ public abstract class AbstractDeviceHubManager( onStartTimeout(name, config) } else { deviceChanges.emit(DeviceStateEvent.DeviceStarted(name)) + metricPublisher.publishMetric("device.start", 1.0, mapOf("device" to name.toString())) if (config.restartPolicy.resetOnSuccess) restartAttemptsMap[name] = 0 } } @@ -1101,6 +1255,7 @@ public abstract class AbstractDeviceHubManager( ) childrenJobs[name] = newChild systemBus.emit(SystemLogMessage("Device $name restarted", sourceDevice = name)) + metricPublisher.publishMetric("device.restart", 1.0, mapOf("device" to name.toString())) } val deviceRef = childrenJobs[name]?.device ?: return if (childrenJobs[name]?.config?.lifecycleMode != LifecycleMode.INDEPENDENT) { @@ -1113,20 +1268,25 @@ public abstract class AbstractDeviceHubManager( * The device is removed (stopped), then re-added with the new mode. */ public suspend fun changeLifecycleMode(name: Name, newMode: LifecycleMode) { - childLock.withLock { - val old = childrenJobs[name] ?: error("Device $name not found") - val newConfig = old.config.copy(lifecycleMode = newMode) - removeDeviceUnlocked(name, waitStop = true) - val newChild = launchChild( - name, - old.device, - newConfig, - old.meta, - reuseBus = if (old.reuseBus) old.messageBus else null - ) - childrenJobs[name] = newChild - systemBus.emit(SystemLogMessage("Device $name lifecycle changed to $newMode", sourceDevice = name)) + val old = childLock.withLock { + val existing = childrenJobs[name] ?: error("Device $name not found") + val newConfig = existing.config.copy(lifecycleMode = newMode) + childrenJobs.remove(name) + restartAttemptsMap.remove(name) + Triple(existing.device, newConfig, existing.meta) } + val newChild = launchChild( + name, + old.first, + old.second, + old.third, + reuseBus = null + ) + childLock.withLock { + childrenJobs[name] = newChild + } + systemBus.emit(SystemLogMessage("Device $name lifecycle changed to $newMode", sourceDevice = name)) + metricPublisher.publishMetric("device.lifecycle.change", 1.0, mapOf("device" to name.toString(), "newMode" to newMode.name)) val deviceRef = childrenJobs[name]?.device ?: return if (newMode != LifecycleMode.INDEPENDENT) { finalizeDeviceStart(name, childrenJobs[name]!!.config, deviceRef) @@ -1149,43 +1309,46 @@ public abstract class AbstractDeviceHubManager( meta: Meta? = null, reuseMessageBus: Boolean = false ) { - childLock.withLock { - val old = childrenJobs[name] - val oldBus = if (reuseMessageBus) old?.messageBus else null + transactionManager.withTransaction { + val oldBus = childLock.withLock { childrenJobs[name]?.messageBus } removeDeviceUnlocked(name, waitStop = true) - val newChild = launchChild(name, newDevice, config, meta, oldBus) - childrenJobs[name] = newChild - systemBus.emit(SystemLogMessage("Device $name hot-swapped", sourceDevice = name)) - } - val deviceRef = childrenJobs[name]?.device ?: return - if (config.lifecycleMode != LifecycleMode.INDEPENDENT) { - finalizeDeviceStart(name, config, deviceRef) + childLock.withLock { + val newChild = launchChild(name, newDevice, config, meta, oldBus) + childrenJobs[name] = newChild + systemBus.emit(SystemLogMessage("Device $name hot-swapped", sourceDevice = name)) + metricPublisher.publishMetric("device.hotswap", 1.0, mapOf("device" to name.toString())) + } + val deviceRef = childLock.withLock { childrenJobs[name]?.device } + if (deviceRef != null && config.lifecycleMode != LifecycleMode.INDEPENDENT) { + finalizeDeviceStart(name, config, deviceRef) + } } } /** * Internal function to remove (and optionally wait-stop) a device by [name]. - * This method must be called from within [childLock]. * * @param waitStop If true, waits for stopping within [DeviceLifecycleConfig.stopTimeout]. */ private suspend fun removeDeviceUnlocked(name: Name, waitStop: Boolean) { - val child = childrenJobs[name] ?: return - childrenJobs.remove(name) - restartAttemptsMap.remove(name) + val child = childLock.withLock { childrenJobs[name] } ?: return + childLock.withLock { + childrenJobs.remove(name) + restartAttemptsMap.remove(name) + } deviceChanges.emit(DeviceStateEvent.DeviceRemoved(name)) systemBus.emit(SystemLogMessage("Device $name removed (waitStop=$waitStop)", sourceDevice = name)) if (waitStop) { performStop(child) } else { - CoroutineScope(parentJob + dispatcher).launch { performStop(child) } + launchGlobal { performStop(child) } } } /** * Performs the actual stopping sequence: * 1) Attempt `device.stop()` with [stopTimeout] - * 2) Cancel and join the collector job + * 2) Cancel and join the collector job. */ private suspend fun performStop(child: ChildJob) { val timeout = child.config.stopTimeout ?: Duration.INFINITE @@ -1196,7 +1359,10 @@ public abstract class AbstractDeviceHubManager( if (result == null) { onStopTimeout(deviceName, child.config) } - child.collectorJob.cancelAndJoin() + withContext(NonCancellable) { + child.collectorJob.cancelAndJoin() + } + metricPublisher.publishMetric("device.stop", 1.0, mapOf("device" to deviceName.toString())) } /** @@ -1236,12 +1402,11 @@ public abstract class AbstractDeviceHubManager( } else null } } - return@coroutineScope try { + try { deferredList.awaitAll() true } catch (ex: Exception) { context.logger.error(ex) { "Failed to start device batch. Rolling back." } - // rollback: stop those that were started deferredList.mapNotNull { it.getCompletedOrNull() }.forEach { dn -> childrenJobs[dn]?.let { job -> try { @@ -1280,12 +1445,11 @@ public abstract class AbstractDeviceHubManager( } else null } } - return@coroutineScope try { + try { deferredList.awaitAll() true } catch (ex: Exception) { context.logger.error(ex) { "Failed to stop device batch. Rolling back." } - // rollback: start those that were stopped deferredList.mapNotNull { it.getCompletedOrNull() }.forEach { dn -> childrenJobs[dn]?.let { job -> try { @@ -1300,15 +1464,12 @@ public abstract class AbstractDeviceHubManager( } } - @OptIn(ExperimentalCoroutinesApi::class) - private fun <T> Deferred<T>.getCompletedOrNull(): T? = if (isCompleted && !isCancelled) getCompleted() else null - /** * Optionally set up a distributed transport or message broker for the managed devices. * By default, this is a stub that logs an informational message. */ public open fun installDistributedTransport() { - context.logger.info { "installDistributedTransport: implement or override for custom broker." } + context.logger.info { "installDistributedTransport: Implement or override for custom broker." } } /** @@ -1322,6 +1483,10 @@ public abstract class AbstractDeviceHubManager( } } } + + public suspend fun shutdown() { + parentJob.cancelAndJoin() + } } /** @@ -1339,7 +1504,15 @@ private class DeviceHubManagerImpl(context: Context, dispatcher: CoroutineDispat replay = 50, onBufferOverflow = BufferOverflow.DROP_OLDEST ) - override val deviceChanges: MutableSharedFlow<DeviceStateEvent> = MutableSharedFlow(replay = 1) + override val deviceChanges: MutableSharedFlow<DeviceStateEvent> = MutableSharedFlow( + replay = 1, + onBufferOverflow = BufferOverflow.DROP_OLDEST + ) + override val eventBus: DefaultEventBus = DefaultEventBus( + replay = 100, + onBufferOverflow = BufferOverflow.DROP_OLDEST + ) + override val transactionManager: TransactionManager = DefaultTransactionManager(eventBus) } /** @@ -1374,7 +1547,7 @@ public open class ConfigurableCompositeControlComponent<D : ConfigurableComposit meta: Meta = Meta.EMPTY, config: DeviceLifecycleConfig = DeviceLifecycleConfig(), registry: ComponentRegistry? = null, - public val hubManager: AbstractDeviceHubManager = DeviceHubManagerImpl(context, config.dispatcher ?: Dispatchers.Default), + public val hubManager: AbstractDeviceHubManager = DeviceHubManagerImpl(context, config.dispatcher ?: Dispatchers.Default) ) : DeviceBase<D>(context, meta), CompositeControlComponent { public val effectiveRegistry: ComponentRegistry? = registry ?: context.componentRegistry @@ -1400,26 +1573,24 @@ public open class ConfigurableCompositeControlComponent<D : ConfigurableComposit private val childConfigs: List<ChildComponentConfig<*>> = spec.childSpecs.values.toList() init { - // Setup action execution: wrap execute block with error handling - spec.actions.values.forEach { actionSpec -> - launch { - val actionName = actionSpec.name + hubManager.launchGlobal { + spec.actions.values.forEach { actionSpec -> messageFlow .filterIsInstance<ActionExecuteMessage>() - .filter { it.action == actionName } + .filter { it.action == actionSpec.name } .onEach { msg -> try { - val result = execute(actionName, msg.argument) + val result = execute(actionSpec.name, msg.argument) messageBus.emit( ActionResultMessage( - action = actionName, + action = actionSpec.name, result = result, requestId = msg.requestId, sourceDevice = id.asName() ) ) } catch (ex: Exception) { - logger.error(ex) { "Error executing action $actionName on device $id" } + logger.error(ex) { "Error executing action ${actionSpec.name} on device $id" } messageBus.emit(DeviceMessage.error(ex, id.asName())) } } @@ -1462,7 +1633,6 @@ public open class ConfigurableCompositeControlComponent<D : ConfigurableComposit self.onOpen() validate(self) } - // Start child devices that are in INITIAL state and not marked as LAZY hubManager.devices.values.filter { it.lifecycleState == LifecycleState.INITIAL }.forEach { child -> val mode = hubManager.childrenJobs[child.id.parseAsName()]?.lifecycleMode if (mode != LifecycleMode.LAZY) { @@ -1472,18 +1642,14 @@ public open class ConfigurableCompositeControlComponent<D : ConfigurableComposit } /** - * Called when this device is stopping. - * Default logic: attempt to stop each child device (if started), then call [spec.onClose]. + * Called when the device stops. */ override suspend fun onStop() { - // Stop each child device concurrently with proper error handling hubManager.devices.values.forEach { child -> if (child.lifecycleState == LifecycleState.STARTED) { launch(child.coroutineContext) { val stopTimeout = hubManager.childrenJobs[child.id.parseAsName()]?.config?.stopTimeout ?: Duration.INFINITE - val stopped = withTimeoutOrNull(stopTimeout) { - child.stop() - } + val stopped = withTimeoutOrNull(stopTimeout) { child.stop() } if (stopped == null) { hubManager.childrenJobs[child.id.parseAsName()]?.let { job -> hubManager.onStopTimeout(child.id.parseAsName(), job.config) diff --git a/controls-core/src/commonTest/kotlin/space/kscience/controls/spec/CompositeControlTest.kt b/controls-core/src/commonTest/kotlin/space/kscience/controls/spec/CompositeControlTest.kt index fbe80a0..dea79df 100644 --- a/controls-core/src/commonTest/kotlin/space/kscience/controls/spec/CompositeControlTest.kt +++ b/controls-core/src/commonTest/kotlin/space/kscience/controls/spec/CompositeControlTest.kt @@ -3,7 +3,9 @@ package space.kscience.controls.spec import kotlinx.coroutines.* +import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.take import kotlinx.coroutines.test.runTest import space.kscience.controls.api.* import space.kscience.dataforge.context.Context @@ -13,18 +15,23 @@ import space.kscience.dataforge.meta.int import space.kscience.dataforge.names.asName import space.kscience.dataforge.names.parseAsName import kotlin.test.* +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.test.StandardTestDispatcher /** * A simple [AbstractDeviceHubManager] implementation for tests, providing - * message flows and basic logging of events. + * message flows, event bus, and basic logging of events. */ private class TestDeviceHubManager( context: Context, dispatcher: CoroutineDispatcher = Dispatchers.Default ) : AbstractDeviceHubManager(context, dispatcher) { - override val messageBus = MutableSharedFlow<DeviceMessage>(replay = 100) - override val systemBus = MutableSharedFlow<SystemLogMessage>(replay = 50) - override val deviceChanges = MutableSharedFlow<DeviceStateEvent>(replay = 50) + override val messageBus = MutableSharedFlow<DeviceMessage>(replay = 100, onBufferOverflow = BufferOverflow.DROP_OLDEST) + override val systemBus = MutableSharedFlow<SystemLogMessage>(replay = 50, onBufferOverflow = BufferOverflow.DROP_OLDEST) + override val deviceChanges = MutableSharedFlow<DeviceStateEvent>(replay = 50, onBufferOverflow = BufferOverflow.DROP_OLDEST) + override val eventBus: EventBus = DefaultEventBus(replay = 100, onBufferOverflow = BufferOverflow.DROP_OLDEST) + + override val transactionManager: TransactionManager = DefaultTransactionManager(eventBus) } /** @@ -658,7 +665,7 @@ class CompositeControlTest { val pump = SyringePumpDevice(context, Meta { "maxVolume" put 5.0 }) pump.setVolume(10.0) - assertEquals(0.0, pump.getVolume(), "Volume should not change on invalid value") + assertEquals(0.0, pump.getVolume(), "Volume should remain 0 on invalid value") } @Test @@ -764,6 +771,7 @@ class CompositeControlTest { manager.detachDevice(name, true) assertFalse(name in manager.devices.keys, "The device should be removed from the manager.") + manager.shutdown() } @Test @@ -793,8 +801,9 @@ class CompositeControlTest { @Test fun `test hot swap device`() = runTest { + val testDispatcher = StandardTestDispatcher(testScheduler) val context = createTestContext() - val manager = TestDeviceHubManager(context) + val manager = TestDeviceHubManager(context, testDispatcher) val oldDevice = StepperMotorDevice(context, Meta { "maxPosition" put 100 }) val name = "motorSwap".asName() @@ -814,8 +823,14 @@ class CompositeControlTest { val current = manager.devices[name] assertNotNull(current, "New device should be present after hot swap") assertTrue(current === newDevice, "Manager should reference the new device instance") - assertEquals(999, newDevice.maxPosition) + + val events = manager.eventBus.events.take(2).toList() + assertTrue(events.any { it is TransactionEvent.TransactionStarted }, "TransactionStarted event expected") + assertTrue(events.any { it is TransactionEvent.TransactionCommitted }, "TransactionCommitted event expected") + + manager.detachDevice(name, waitStop = true) + manager.shutdown() } }