docs(CompositeControlComponentSpec): improved comments
This commit is contained in:
parent
db1e5c8c14
commit
1a74d9446d
@ -4,7 +4,6 @@ package space.kscience.controls.spec
|
||||
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.channels.BufferOverflow
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.flow.*
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
@ -13,7 +12,6 @@ import space.kscience.dataforge.context.*
|
||||
import space.kscience.dataforge.context.logger
|
||||
import space.kscience.dataforge.meta.*
|
||||
import space.kscience.dataforge.names.*
|
||||
import kotlin.concurrent.Volatile
|
||||
import kotlin.math.pow
|
||||
import kotlin.properties.PropertyDelegateProvider
|
||||
import kotlin.properties.ReadOnlyProperty
|
||||
@ -21,312 +19,6 @@ import kotlin.reflect.KProperty
|
||||
import kotlin.time.Duration
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
|
||||
/**
|
||||
* A sealed class representing the internal lifecycle state of an Actor.
|
||||
*/
|
||||
public sealed class ActorState {
|
||||
public object Active : ActorState()
|
||||
public object Closing : ActorState()
|
||||
public object Closed : ActorState()
|
||||
}
|
||||
|
||||
/**
|
||||
* A generic Actor class that processes messages sequentially in a coroutine.
|
||||
*
|
||||
* @param M The type of messages processed by this actor.
|
||||
* @property scope The [CoroutineScope] in which the actor operates.
|
||||
* @property capacity The capacity of the actor's mailbox. Defaults to Channel.UNLIMITED.
|
||||
* @property onBufferOverflow The strategy to apply when the channel is at capacity (if capacity != UNLIMITED).
|
||||
* @property errorHandler A function to handle errors during message processing.
|
||||
* @property onCompletion An optional callback invoked once the actor completes processing all messages
|
||||
* (i.e., after the mailbox is closed and the loop terminates), unless force-closing.
|
||||
* @property handler A suspend function invoked for each message.
|
||||
*/
|
||||
public class Actor<M>(
|
||||
private val scope: CoroutineScope,
|
||||
private val capacity: Int = Channel.UNLIMITED,
|
||||
private val onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
|
||||
private val errorHandler: (Throwable) -> Unit = { e ->
|
||||
println("Error processing message in Actor: ${e.message}")
|
||||
},
|
||||
private val onCompletion: (Throwable?) -> Unit = {},
|
||||
private val handler: suspend (M) -> Unit
|
||||
) {
|
||||
|
||||
/**
|
||||
* Channel for receiving messages. By default, uses the provided capacity and
|
||||
* onBufferOverflow strategy. For example:
|
||||
* - BufferOverflow.SUSPEND (default) will suspend senders when capacity is reached.
|
||||
* - BufferOverflow.DROP_OLDEST or DROP_LATEST can be used for dropping messages.
|
||||
*/
|
||||
private val mailbox = Channel<M>(capacity, onBufferOverflow)
|
||||
|
||||
@Volatile
|
||||
private var state: ActorState = ActorState.Active
|
||||
|
||||
/**
|
||||
* The actor's main job that sequentially processes messages from the mailbox.
|
||||
*/
|
||||
private val job = scope.launch {
|
||||
var thrownException: Throwable? = null
|
||||
try {
|
||||
for (message in mailbox) {
|
||||
try {
|
||||
handler(message)
|
||||
} catch (e: Throwable) {
|
||||
errorHandler(e)
|
||||
}
|
||||
}
|
||||
} catch (ex: Throwable) {
|
||||
thrownException = ex
|
||||
} finally {
|
||||
state = ActorState.Closed
|
||||
onCompletion(thrownException)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a message to the actor for processing (suspend until the message is enqueued).
|
||||
*
|
||||
* @param message The message of type [M] to be processed.
|
||||
*/
|
||||
public suspend fun send(message: M) {
|
||||
checkStateBeforeSend()
|
||||
mailbox.send(message)
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to send a message without suspension, returning true if successfully enqueued,
|
||||
* or false if the mailbox is full or closed.
|
||||
*
|
||||
* @param message The message of type [M] to be processed.
|
||||
* @return True if the message was sent, false otherwise.
|
||||
*/
|
||||
public fun trySendNonBlocking(message: M): Boolean {
|
||||
checkStateBeforeSend()
|
||||
return mailbox.trySend(message).isSuccess
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a collection of messages to the actor sequentially (suspending if needed).
|
||||
*
|
||||
* @param messages The list/collection of messages to send.
|
||||
*/
|
||||
public suspend fun sendAll(messages: Iterable<M>) {
|
||||
checkStateBeforeSend()
|
||||
for (msg in messages) {
|
||||
mailbox.send(msg)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the actor's mailbox, transitioning to Closing state.
|
||||
* If [force] is true, also cancels the job immediately.
|
||||
*
|
||||
* @param force If true, cancels the processing job without draining remaining messages.
|
||||
* @param onClosed An optional callback that fires immediately after the channel is closed
|
||||
* (and the job is optionally canceled). This is distinct from [onCompletion],
|
||||
* which is invoked after the actor's main loop ends naturally.
|
||||
*/
|
||||
public fun close(force: Boolean = false, onClosed: (Throwable?) -> Unit = {}) {
|
||||
if (state == ActorState.Closed) return
|
||||
state = ActorState.Closing
|
||||
mailbox.close()
|
||||
if (force) {
|
||||
val cancellation = CancellationException("Actor is force-closed.")
|
||||
job.cancel(cancellation)
|
||||
state = ActorState.Closed
|
||||
onClosed(cancellation)
|
||||
} else {
|
||||
onClosed(null)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Suspends until the actor finishes processing all messages and the job completes.
|
||||
*/
|
||||
public suspend fun join(): Unit = job.join()
|
||||
|
||||
/**
|
||||
* A helper method to verify that the actor is not fully closed before sending.
|
||||
* Throws an [IllegalStateException] if the actor is already closed.
|
||||
*/
|
||||
private fun checkStateBeforeSend() {
|
||||
if (state == ActorState.Closed || !job.isActive) {
|
||||
throw IllegalStateException("Cannot send messages to a closed or inactive Actor.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sealed class representing various device management commands to be processed
|
||||
* by the [DeviceManagerActor].
|
||||
*/
|
||||
public sealed class DeviceCommand {
|
||||
/**
|
||||
* Command to attach (register) a new device.
|
||||
*
|
||||
* @param name The unique name of the device.
|
||||
* @param device The device instance to attach.
|
||||
* @param config The [DeviceLifecycleConfig] for the device.
|
||||
* @param meta Optional metadata associated with the device.
|
||||
* @param startMode Specifies if the device should be started automatically.
|
||||
*/
|
||||
public data class Attach(
|
||||
val name: Name,
|
||||
val device: Device,
|
||||
val config: DeviceLifecycleConfig,
|
||||
val meta: Meta? = null,
|
||||
val startMode: StartMode = StartMode.NONE
|
||||
) : DeviceCommand()
|
||||
|
||||
/**
|
||||
* Command to detach (remove) a device.
|
||||
*
|
||||
* @param name The unique name of the device.
|
||||
* @param waitStop If true, waits until the device fully stops before returning.
|
||||
*/
|
||||
public data class Detach(
|
||||
val name: Name,
|
||||
val waitStop: Boolean = false
|
||||
) : DeviceCommand()
|
||||
|
||||
/**
|
||||
* Command to restart a device.
|
||||
*
|
||||
* @param name The unique name of the device to restart.
|
||||
*/
|
||||
public data class Restart(val name: Name) : DeviceCommand()
|
||||
|
||||
/**
|
||||
* Command to change the lifecycle mode of a device.
|
||||
*
|
||||
* @param name The unique name of the device.
|
||||
* @param newMode The new [LifecycleMode] to be applied.
|
||||
*/
|
||||
public data class ChangeLifecycle(
|
||||
val name: Name,
|
||||
val newMode: LifecycleMode
|
||||
) : DeviceCommand()
|
||||
|
||||
/**
|
||||
* Command to hot-swap a device.
|
||||
*
|
||||
* @param name The unique name of the device to replace.
|
||||
* @param newDevice The new device instance to use.
|
||||
* @param config The [DeviceLifecycleConfig] for the new device.
|
||||
* @param meta Optional metadata for the new device.
|
||||
* @param reuseMessageBus If true, reuses the old message bus.
|
||||
*/
|
||||
public data class HotSwap(
|
||||
val name: Name,
|
||||
val newDevice: Device,
|
||||
val config: DeviceLifecycleConfig,
|
||||
val meta: Meta? = null,
|
||||
val reuseMessageBus: Boolean = false
|
||||
) : DeviceCommand()
|
||||
}
|
||||
|
||||
/**
|
||||
* An actor-based wrapper for device management.
|
||||
*
|
||||
* This class encapsulates an [AbstractDeviceHubManager] and processes device management
|
||||
* commands sequentially via an actor. This ensures that all operations (attach, detach,
|
||||
* restart, etc.) are executed in a thread-safe manner without the need for explicit locks.
|
||||
*
|
||||
* @param hubManager The underlying device manager instance.
|
||||
* @param scope The [CoroutineScope] in which the actor operates.
|
||||
*/
|
||||
public class DeviceManagerActor(
|
||||
private val hubManager: AbstractDeviceHubManager,
|
||||
scope: CoroutineScope
|
||||
) {
|
||||
private val actor = Actor<DeviceCommand>(
|
||||
scope = scope,
|
||||
capacity = 100,
|
||||
onBufferOverflow = BufferOverflow.DROP_OLDEST,
|
||||
errorHandler = { ex ->
|
||||
hubManager.context.logger.error(ex) { "Error processing device command" }
|
||||
}
|
||||
) { command ->
|
||||
when (command) {
|
||||
is DeviceCommand.Attach -> {
|
||||
hubManager.attachDevice(
|
||||
command.name,
|
||||
command.device,
|
||||
command.config,
|
||||
command.meta,
|
||||
command.startMode
|
||||
)
|
||||
}
|
||||
is DeviceCommand.Detach -> {
|
||||
hubManager.detachDevice(command.name, command.waitStop)
|
||||
}
|
||||
is DeviceCommand.Restart -> {
|
||||
hubManager.restartDevice(command.name)
|
||||
}
|
||||
is DeviceCommand.ChangeLifecycle -> {
|
||||
hubManager.changeLifecycleMode(command.name, command.newMode)
|
||||
}
|
||||
is DeviceCommand.HotSwap -> {
|
||||
hubManager.hotSwapDevice(
|
||||
command.name,
|
||||
command.newDevice,
|
||||
command.config,
|
||||
command.meta,
|
||||
command.reuseMessageBus
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a device management command to be processed by the actor.
|
||||
*
|
||||
* @param command The [DeviceCommand] to process.
|
||||
*/
|
||||
public suspend fun send(command: DeviceCommand) {
|
||||
actor.send(command)
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to send a device management command without suspension.
|
||||
*
|
||||
* @param command The [DeviceCommand] to process.
|
||||
* @return True if the command was successfully enqueued, false otherwise.
|
||||
*/
|
||||
public fun trySendNonBlocking(command: DeviceCommand): Boolean {
|
||||
return actor.trySendNonBlocking(command)
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the actor's mailbox, stopping further command processing.
|
||||
* If [force] = true, the actor job is cancelled immediately.
|
||||
*/
|
||||
public fun close(force: Boolean = false) {
|
||||
actor.close(force)
|
||||
}
|
||||
|
||||
/**
|
||||
* Suspends until the actor finishes processing all commands.
|
||||
*/
|
||||
public suspend fun join() {
|
||||
actor.join()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extension function to convert an [AbstractDeviceHubManager] to an actor-based device manager.
|
||||
*
|
||||
* This function creates a [DeviceManagerActor] that wraps the current manager and provides
|
||||
* an actor interface for sending commands.
|
||||
*
|
||||
* @param scope The [CoroutineScope] in which the actor will operate.
|
||||
* @return A [DeviceManagerActor] instance.
|
||||
*/
|
||||
public fun AbstractDeviceHubManager.toActor(scope: CoroutineScope): DeviceManagerActor =
|
||||
DeviceManagerActor(this, scope)
|
||||
|
||||
/**
|
||||
* Extension function to safely get the completed value of a [Deferred] or return `null`.
|
||||
*
|
||||
@ -337,10 +29,6 @@ public fun AbstractDeviceHubManager.toActor(scope: CoroutineScope): DeviceManage
|
||||
private fun <T> Deferred<T>.getCompletedOrNull(): T? =
|
||||
if (isCompleted && !isCancelled) getCompleted() else null
|
||||
|
||||
private val globalExceptionHandler = CoroutineExceptionHandler { _, throwable ->
|
||||
println("Unhandled exception in global scope: ${throwable.message}")
|
||||
}
|
||||
|
||||
/**
|
||||
* EventBus interface for publishing and subscribing to application-level events.
|
||||
*/
|
||||
@ -394,11 +82,13 @@ public interface TransportAdapter {
|
||||
* Default stub implementation of [TransportAdapter] for in-process communication.
|
||||
*/
|
||||
public class DefaultTransportAdapter(
|
||||
private val eventBus: EventBus
|
||||
private val eventBus: EventBus,
|
||||
private val logger: Logger = DefaultLogManager(),
|
||||
) : TransportAdapter {
|
||||
private val _messages = MutableSharedFlow<DeviceMessage>(replay = 100, onBufferOverflow = BufferOverflow.DROP_OLDEST)
|
||||
override suspend fun send(message: DeviceMessage) {
|
||||
_messages.emit(message)
|
||||
logger.info { "TransportAdapter: message sent -> ${message.sourceDevice}" }
|
||||
eventBus.publish("Message sent: ${message.sourceDevice}")
|
||||
}
|
||||
override fun subscribe(): Flow<DeviceMessage> = _messages.asSharedFlow()
|
||||
@ -440,7 +130,7 @@ public class DefaultTransactionManager(
|
||||
}
|
||||
|
||||
/**
|
||||
* Transaction events.
|
||||
* Transaction events used by [DefaultTransactionManager].
|
||||
*/
|
||||
public sealed class TransactionEvent {
|
||||
public object TransactionStarted : TransactionEvent()
|
||||
@ -465,9 +155,11 @@ public interface MetricPublisher {
|
||||
/**
|
||||
* Default stub implementation of [MetricPublisher] which logs metrics.
|
||||
*/
|
||||
public class DefaultMetricPublisher : MetricPublisher {
|
||||
public class DefaultMetricPublisher(
|
||||
private val logger: Logger = DefaultLogManager()
|
||||
) : MetricPublisher {
|
||||
override fun publishMetric(name: String, value: Double, tags: Map<String, String>) {
|
||||
println("Metric published: $name = $value, tags: $tags")
|
||||
logger.info { "Metric published: $name = $value, tags: $tags" }
|
||||
}
|
||||
}
|
||||
|
||||
@ -635,9 +327,6 @@ public enum class RestartStrategy {
|
||||
public sealed class DeviceStateEvent {
|
||||
public abstract val deviceName: Name
|
||||
|
||||
/**
|
||||
* Indicates that a device was added to the manager.
|
||||
*/
|
||||
public data class DeviceAdded(override val deviceName: Name) : DeviceStateEvent()
|
||||
|
||||
/**
|
||||
@ -756,7 +445,7 @@ public class DeviceLifecycleConfigBuilder {
|
||||
dispatcher = dispatcher,
|
||||
onError = onError,
|
||||
healthChecker = healthChecker,
|
||||
restartPolicy = restartPolicy,
|
||||
restartPolicy = restartPolicy
|
||||
)
|
||||
}
|
||||
|
||||
@ -1271,7 +960,7 @@ public abstract class AbstractDeviceHubManager(
|
||||
* Global exception handler for all coroutines in this manager.
|
||||
*/
|
||||
protected val exceptionHandler: CoroutineExceptionHandler = CoroutineExceptionHandler { _, ex ->
|
||||
context.logger.error(ex) { "Unhandled exception in global scope" }
|
||||
context.logger.error(ex) { "Unhandled exception in global scope (DeviceHubManager)" }
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1279,6 +968,9 @@ public abstract class AbstractDeviceHubManager(
|
||||
*/
|
||||
protected val parentJob: Job = SupervisorJob()
|
||||
|
||||
/**
|
||||
* A mutex to protect access to [childrenJobs].
|
||||
*/
|
||||
protected val childLock: Mutex = Mutex()
|
||||
|
||||
/**
|
||||
@ -1549,6 +1241,9 @@ public abstract class AbstractDeviceHubManager(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the delay based on [RestartPolicy].
|
||||
*/
|
||||
private fun calculateDelay(policy: RestartPolicy, attempts: Int): Duration {
|
||||
return when (policy.strategy) {
|
||||
RestartStrategy.LINEAR -> policy.delayBetweenAttempts
|
||||
@ -1622,7 +1317,7 @@ public abstract class AbstractDeviceHubManager(
|
||||
metricPublisher.publishMetric("device.attach", 1.0, mapOf("device" to name.toString()))
|
||||
if (config.lifecycleMode == LifecycleMode.INDEPENDENT) return
|
||||
when (startMode) {
|
||||
StartMode.NONE -> {}
|
||||
StartMode.NONE -> Unit
|
||||
StartMode.ASYNC -> launchGlobal { doStartDevice(name, config, device) }
|
||||
StartMode.SYNC -> doStartDevice(name, config, device)
|
||||
}
|
||||
@ -1775,7 +1470,7 @@ public abstract class AbstractDeviceHubManager(
|
||||
val oldBus = childLock.withLock { childrenJobs[name]?.messageBus }
|
||||
removeDeviceUnlocked(name, waitStop = true)
|
||||
childLock.withLock {
|
||||
val newChild = launchChild(name, newDevice, config, meta, oldBus)
|
||||
val newChild = launchChild(name, newDevice, config, meta, oldBus.takeIf { reuseMessageBus })
|
||||
childrenJobs[name] = newChild
|
||||
systemBus.emit(SystemLogMessage("Device $name hot-swapped", sourceDevice = name))
|
||||
metricPublisher.publishMetric("device.hotswap", 1.0, mapOf("device" to name.toString()))
|
||||
@ -1965,7 +1660,7 @@ public abstract class AbstractDeviceHubManager(
|
||||
* @param context The parent context.
|
||||
* @param dispatcher The [CoroutineDispatcher] for concurrency.
|
||||
*/
|
||||
private class DeviceHubManagerImpl(context: Context, dispatcher: CoroutineDispatcher) : AbstractDeviceHubManager(context, dispatcher) {
|
||||
internal class DeviceHubManagerImpl(context: Context, dispatcher: CoroutineDispatcher) : AbstractDeviceHubManager(context, dispatcher) {
|
||||
override val messageBus: MutableSharedFlow<DeviceMessage> = MutableSharedFlow(
|
||||
replay = 1000,
|
||||
onBufferOverflow = BufferOverflow.DROP_OLDEST
|
||||
@ -1982,7 +1677,7 @@ private class DeviceHubManagerImpl(context: Context, dispatcher: CoroutineDispat
|
||||
replay = 100,
|
||||
onBufferOverflow = BufferOverflow.DROP_OLDEST
|
||||
)
|
||||
override val transactionManager: TransactionManager = DefaultTransactionManager(eventBus)
|
||||
override val transactionManager: TransactionManager = DefaultTransactionManager(eventBus, context.logger)
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user