diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/CompositeControlComponents.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/CompositeControlComponents.kt index 59f6aae..de0268b 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/CompositeControlComponents.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/CompositeControlComponents.kt @@ -4,7 +4,6 @@ package space.kscience.controls.spec import kotlinx.coroutines.* import kotlinx.coroutines.channels.BufferOverflow -import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.* import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock @@ -13,7 +12,6 @@ import space.kscience.dataforge.context.* import space.kscience.dataforge.context.logger import space.kscience.dataforge.meta.* import space.kscience.dataforge.names.* -import kotlin.concurrent.Volatile import kotlin.math.pow import kotlin.properties.PropertyDelegateProvider import kotlin.properties.ReadOnlyProperty @@ -21,312 +19,6 @@ import kotlin.reflect.KProperty import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds -/** - * A sealed class representing the internal lifecycle state of an Actor. - */ -public sealed class ActorState { - public object Active : ActorState() - public object Closing : ActorState() - public object Closed : ActorState() -} - -/** - * A generic Actor class that processes messages sequentially in a coroutine. - * - * @param M The type of messages processed by this actor. - * @property scope The [CoroutineScope] in which the actor operates. - * @property capacity The capacity of the actor's mailbox. Defaults to Channel.UNLIMITED. - * @property onBufferOverflow The strategy to apply when the channel is at capacity (if capacity != UNLIMITED). - * @property errorHandler A function to handle errors during message processing. - * @property onCompletion An optional callback invoked once the actor completes processing all messages - * (i.e., after the mailbox is closed and the loop terminates), unless force-closing. - * @property handler A suspend function invoked for each message. - */ -public class Actor<M>( - private val scope: CoroutineScope, - private val capacity: Int = Channel.UNLIMITED, - private val onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, - private val errorHandler: (Throwable) -> Unit = { e -> - println("Error processing message in Actor: ${e.message}") - }, - private val onCompletion: (Throwable?) -> Unit = {}, - private val handler: suspend (M) -> Unit -) { - - /** - * Channel for receiving messages. By default, uses the provided capacity and - * onBufferOverflow strategy. For example: - * - BufferOverflow.SUSPEND (default) will suspend senders when capacity is reached. - * - BufferOverflow.DROP_OLDEST or DROP_LATEST can be used for dropping messages. - */ - private val mailbox = Channel<M>(capacity, onBufferOverflow) - - @Volatile - private var state: ActorState = ActorState.Active - - /** - * The actor's main job that sequentially processes messages from the mailbox. - */ - private val job = scope.launch { - var thrownException: Throwable? = null - try { - for (message in mailbox) { - try { - handler(message) - } catch (e: Throwable) { - errorHandler(e) - } - } - } catch (ex: Throwable) { - thrownException = ex - } finally { - state = ActorState.Closed - onCompletion(thrownException) - } - } - - /** - * Sends a message to the actor for processing (suspend until the message is enqueued). - * - * @param message The message of type [M] to be processed. - */ - public suspend fun send(message: M) { - checkStateBeforeSend() - mailbox.send(message) - } - - /** - * Attempts to send a message without suspension, returning true if successfully enqueued, - * or false if the mailbox is full or closed. - * - * @param message The message of type [M] to be processed. - * @return True if the message was sent, false otherwise. - */ - public fun trySendNonBlocking(message: M): Boolean { - checkStateBeforeSend() - return mailbox.trySend(message).isSuccess - } - - /** - * Sends a collection of messages to the actor sequentially (suspending if needed). - * - * @param messages The list/collection of messages to send. - */ - public suspend fun sendAll(messages: Iterable<M>) { - checkStateBeforeSend() - for (msg in messages) { - mailbox.send(msg) - } - } - - /** - * Closes the actor's mailbox, transitioning to Closing state. - * If [force] is true, also cancels the job immediately. - * - * @param force If true, cancels the processing job without draining remaining messages. - * @param onClosed An optional callback that fires immediately after the channel is closed - * (and the job is optionally canceled). This is distinct from [onCompletion], - * which is invoked after the actor's main loop ends naturally. - */ - public fun close(force: Boolean = false, onClosed: (Throwable?) -> Unit = {}) { - if (state == ActorState.Closed) return - state = ActorState.Closing - mailbox.close() - if (force) { - val cancellation = CancellationException("Actor is force-closed.") - job.cancel(cancellation) - state = ActorState.Closed - onClosed(cancellation) - } else { - onClosed(null) - } - } - - /** - * Suspends until the actor finishes processing all messages and the job completes. - */ - public suspend fun join(): Unit = job.join() - - /** - * A helper method to verify that the actor is not fully closed before sending. - * Throws an [IllegalStateException] if the actor is already closed. - */ - private fun checkStateBeforeSend() { - if (state == ActorState.Closed || !job.isActive) { - throw IllegalStateException("Cannot send messages to a closed or inactive Actor.") - } - } -} - -/** - * Sealed class representing various device management commands to be processed - * by the [DeviceManagerActor]. - */ -public sealed class DeviceCommand { - /** - * Command to attach (register) a new device. - * - * @param name The unique name of the device. - * @param device The device instance to attach. - * @param config The [DeviceLifecycleConfig] for the device. - * @param meta Optional metadata associated with the device. - * @param startMode Specifies if the device should be started automatically. - */ - public data class Attach( - val name: Name, - val device: Device, - val config: DeviceLifecycleConfig, - val meta: Meta? = null, - val startMode: StartMode = StartMode.NONE - ) : DeviceCommand() - - /** - * Command to detach (remove) a device. - * - * @param name The unique name of the device. - * @param waitStop If true, waits until the device fully stops before returning. - */ - public data class Detach( - val name: Name, - val waitStop: Boolean = false - ) : DeviceCommand() - - /** - * Command to restart a device. - * - * @param name The unique name of the device to restart. - */ - public data class Restart(val name: Name) : DeviceCommand() - - /** - * Command to change the lifecycle mode of a device. - * - * @param name The unique name of the device. - * @param newMode The new [LifecycleMode] to be applied. - */ - public data class ChangeLifecycle( - val name: Name, - val newMode: LifecycleMode - ) : DeviceCommand() - - /** - * Command to hot-swap a device. - * - * @param name The unique name of the device to replace. - * @param newDevice The new device instance to use. - * @param config The [DeviceLifecycleConfig] for the new device. - * @param meta Optional metadata for the new device. - * @param reuseMessageBus If true, reuses the old message bus. - */ - public data class HotSwap( - val name: Name, - val newDevice: Device, - val config: DeviceLifecycleConfig, - val meta: Meta? = null, - val reuseMessageBus: Boolean = false - ) : DeviceCommand() -} - -/** - * An actor-based wrapper for device management. - * - * This class encapsulates an [AbstractDeviceHubManager] and processes device management - * commands sequentially via an actor. This ensures that all operations (attach, detach, - * restart, etc.) are executed in a thread-safe manner without the need for explicit locks. - * - * @param hubManager The underlying device manager instance. - * @param scope The [CoroutineScope] in which the actor operates. - */ -public class DeviceManagerActor( - private val hubManager: AbstractDeviceHubManager, - scope: CoroutineScope -) { - private val actor = Actor<DeviceCommand>( - scope = scope, - capacity = 100, - onBufferOverflow = BufferOverflow.DROP_OLDEST, - errorHandler = { ex -> - hubManager.context.logger.error(ex) { "Error processing device command" } - } - ) { command -> - when (command) { - is DeviceCommand.Attach -> { - hubManager.attachDevice( - command.name, - command.device, - command.config, - command.meta, - command.startMode - ) - } - is DeviceCommand.Detach -> { - hubManager.detachDevice(command.name, command.waitStop) - } - is DeviceCommand.Restart -> { - hubManager.restartDevice(command.name) - } - is DeviceCommand.ChangeLifecycle -> { - hubManager.changeLifecycleMode(command.name, command.newMode) - } - is DeviceCommand.HotSwap -> { - hubManager.hotSwapDevice( - command.name, - command.newDevice, - command.config, - command.meta, - command.reuseMessageBus - ) - } - } - } - - /** - * Sends a device management command to be processed by the actor. - * - * @param command The [DeviceCommand] to process. - */ - public suspend fun send(command: DeviceCommand) { - actor.send(command) - } - - /** - * Attempts to send a device management command without suspension. - * - * @param command The [DeviceCommand] to process. - * @return True if the command was successfully enqueued, false otherwise. - */ - public fun trySendNonBlocking(command: DeviceCommand): Boolean { - return actor.trySendNonBlocking(command) - } - - /** - * Closes the actor's mailbox, stopping further command processing. - * If [force] = true, the actor job is cancelled immediately. - */ - public fun close(force: Boolean = false) { - actor.close(force) - } - - /** - * Suspends until the actor finishes processing all commands. - */ - public suspend fun join() { - actor.join() - } -} - -/** - * Extension function to convert an [AbstractDeviceHubManager] to an actor-based device manager. - * - * This function creates a [DeviceManagerActor] that wraps the current manager and provides - * an actor interface for sending commands. - * - * @param scope The [CoroutineScope] in which the actor will operate. - * @return A [DeviceManagerActor] instance. - */ -public fun AbstractDeviceHubManager.toActor(scope: CoroutineScope): DeviceManagerActor = - DeviceManagerActor(this, scope) - /** * Extension function to safely get the completed value of a [Deferred] or return `null`. * @@ -337,10 +29,6 @@ public fun AbstractDeviceHubManager.toActor(scope: CoroutineScope): DeviceManage private fun <T> Deferred<T>.getCompletedOrNull(): T? = if (isCompleted && !isCancelled) getCompleted() else null -private val globalExceptionHandler = CoroutineExceptionHandler { _, throwable -> - println("Unhandled exception in global scope: ${throwable.message}") -} - /** * EventBus interface for publishing and subscribing to application-level events. */ @@ -394,11 +82,13 @@ public interface TransportAdapter { * Default stub implementation of [TransportAdapter] for in-process communication. */ public class DefaultTransportAdapter( - private val eventBus: EventBus + private val eventBus: EventBus, + private val logger: Logger = DefaultLogManager(), ) : TransportAdapter { private val _messages = MutableSharedFlow<DeviceMessage>(replay = 100, onBufferOverflow = BufferOverflow.DROP_OLDEST) override suspend fun send(message: DeviceMessage) { _messages.emit(message) + logger.info { "TransportAdapter: message sent -> ${message.sourceDevice}" } eventBus.publish("Message sent: ${message.sourceDevice}") } override fun subscribe(): Flow<DeviceMessage> = _messages.asSharedFlow() @@ -440,7 +130,7 @@ public class DefaultTransactionManager( } /** - * Transaction events. + * Transaction events used by [DefaultTransactionManager]. */ public sealed class TransactionEvent { public object TransactionStarted : TransactionEvent() @@ -465,9 +155,11 @@ public interface MetricPublisher { /** * Default stub implementation of [MetricPublisher] which logs metrics. */ -public class DefaultMetricPublisher : MetricPublisher { +public class DefaultMetricPublisher( + private val logger: Logger = DefaultLogManager() +) : MetricPublisher { override fun publishMetric(name: String, value: Double, tags: Map<String, String>) { - println("Metric published: $name = $value, tags: $tags") + logger.info { "Metric published: $name = $value, tags: $tags" } } } @@ -635,9 +327,6 @@ public enum class RestartStrategy { public sealed class DeviceStateEvent { public abstract val deviceName: Name - /** - * Indicates that a device was added to the manager. - */ public data class DeviceAdded(override val deviceName: Name) : DeviceStateEvent() /** @@ -756,7 +445,7 @@ public class DeviceLifecycleConfigBuilder { dispatcher = dispatcher, onError = onError, healthChecker = healthChecker, - restartPolicy = restartPolicy, + restartPolicy = restartPolicy ) } @@ -1271,7 +960,7 @@ public abstract class AbstractDeviceHubManager( * Global exception handler for all coroutines in this manager. */ protected val exceptionHandler: CoroutineExceptionHandler = CoroutineExceptionHandler { _, ex -> - context.logger.error(ex) { "Unhandled exception in global scope" } + context.logger.error(ex) { "Unhandled exception in global scope (DeviceHubManager)" } } /** @@ -1279,6 +968,9 @@ public abstract class AbstractDeviceHubManager( */ protected val parentJob: Job = SupervisorJob() + /** + * A mutex to protect access to [childrenJobs]. + */ protected val childLock: Mutex = Mutex() /** @@ -1549,6 +1241,9 @@ public abstract class AbstractDeviceHubManager( } } + /** + * Calculates the delay based on [RestartPolicy]. + */ private fun calculateDelay(policy: RestartPolicy, attempts: Int): Duration { return when (policy.strategy) { RestartStrategy.LINEAR -> policy.delayBetweenAttempts @@ -1622,7 +1317,7 @@ public abstract class AbstractDeviceHubManager( metricPublisher.publishMetric("device.attach", 1.0, mapOf("device" to name.toString())) if (config.lifecycleMode == LifecycleMode.INDEPENDENT) return when (startMode) { - StartMode.NONE -> {} + StartMode.NONE -> Unit StartMode.ASYNC -> launchGlobal { doStartDevice(name, config, device) } StartMode.SYNC -> doStartDevice(name, config, device) } @@ -1775,7 +1470,7 @@ public abstract class AbstractDeviceHubManager( val oldBus = childLock.withLock { childrenJobs[name]?.messageBus } removeDeviceUnlocked(name, waitStop = true) childLock.withLock { - val newChild = launchChild(name, newDevice, config, meta, oldBus) + val newChild = launchChild(name, newDevice, config, meta, oldBus.takeIf { reuseMessageBus }) childrenJobs[name] = newChild systemBus.emit(SystemLogMessage("Device $name hot-swapped", sourceDevice = name)) metricPublisher.publishMetric("device.hotswap", 1.0, mapOf("device" to name.toString())) @@ -1965,7 +1660,7 @@ public abstract class AbstractDeviceHubManager( * @param context The parent context. * @param dispatcher The [CoroutineDispatcher] for concurrency. */ -private class DeviceHubManagerImpl(context: Context, dispatcher: CoroutineDispatcher) : AbstractDeviceHubManager(context, dispatcher) { +internal class DeviceHubManagerImpl(context: Context, dispatcher: CoroutineDispatcher) : AbstractDeviceHubManager(context, dispatcher) { override val messageBus: MutableSharedFlow<DeviceMessage> = MutableSharedFlow( replay = 1000, onBufferOverflow = BufferOverflow.DROP_OLDEST @@ -1982,7 +1677,7 @@ private class DeviceHubManagerImpl(context: Context, dispatcher: CoroutineDispat replay = 100, onBufferOverflow = BufferOverflow.DROP_OLDEST ) - override val transactionManager: TransactionManager = DefaultTransactionManager(eventBus) + override val transactionManager: TransactionManager = DefaultTransactionManager(eventBus, context.logger) } /**