07.04.2025 - Magix improvements in CompositeControlComponents.kt

This commit is contained in:
2025-04-07 10:56:27 +03:00
parent 3727310e55
commit b344ae7cfb
5 changed files with 2599 additions and 2020 deletions

View File

@@ -16,6 +16,7 @@ kscience{
useSerialization()
commonMain {
api(projects.controlsCore)
api(projects.controlsConstructor)
implementation(projects.magix.magixApi)
implementation(projects.magix.magixServer)
implementation(projects.magix.magixRsocket)

View File

@@ -3,10 +3,7 @@
package space.kscience.controls.spec
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.test.runTest
import space.kscience.controls.api.*
import space.kscience.dataforge.context.Context
@@ -18,18 +15,7 @@ import space.kscience.dataforge.meta.int
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.parseAsName
import kotlin.test.*
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.advanceTimeBy
import space.kscience.controls.manager.DeviceManager
import space.kscience.dataforge.context.AbstractPlugin
import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.context.PluginTag.Companion.DATAFORGE_GROUP
import space.kscience.dataforge.context.request
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixFormat
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.MagixMessageFilter
import kotlin.time.Duration.Companion.seconds
class CompositeControlTest {
@@ -1022,8 +1008,8 @@ class CompositeControlTest {
}
class MagixMessageBusStub : MessageBus {
override fun subscribe(filter: DeviceMessageFilter) = flowOf<DeviceMessage>()
override suspend fun publish(message: DeviceMessage) { /* no-op */ }
override fun subscribe(filter: MessageFilter) = flowOf<Message>()
override suspend fun publish(message: Message) { /* no-op */ }
override fun close() { /* no-op */ }
}
}

View File

@@ -16,18 +16,28 @@ import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.toJson
import space.kscience.dataforge.meta.toMeta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
@Serializable
public sealed class DeviceMessage {
public sealed class Message {
public abstract val sourceDevice: Name?
public abstract val targetDevice: Name?
public abstract val comment: String?
public abstract val time: Instant
/**
* Update the source device name for composition. If the original name is null, the resulting name is also null.
*/
public abstract fun changeSource(block: (Name) -> Name): DeviceMessage
public abstract fun changeSource(block: (Name) -> Name): Message
}
@Serializable
public sealed class DeviceMessage: Message() {
public abstract val comment: String?
/**
* Update the source device name for composition. If the original name is null, the resulting name is also null.
*/
public abstract override fun changeSource(block: (Name) -> Name): DeviceMessage
public companion object {
public fun error(
@@ -201,35 +211,6 @@ public data class EmptyDeviceMessage(
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
}
/**
* Information log message
*/
@Serializable
@SerialName("log")
public data class DeviceLogMessage(
val message: String,
val data: Meta? = null,
override val sourceDevice: Name = Name.EMPTY,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
@Serializable
@SerialName("system.log")
public data class SystemLogMessage(
val message: String,
val data: Meta? = null,
override val sourceDevice: Name = Name.EMPTY,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
/**
* The evaluation of the message produced a service error
*/
@@ -262,6 +243,246 @@ public data class DeviceLifeCycleMessage(
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
/**
* Device log message: Messages from specific devices about their internal state
* and operations. Always have a source device and tied to a specific device context.
*/
@Serializable
@SerialName("device.log")
public data class DeviceLogMessage(
val message: String,
val data: Meta? = null,
override val sourceDevice: Name,
override val targetDevice: Name? = null,
override val comment: String? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
}
/**
* System log message: Infrastructure events and system notifications
* that aren't tied to a specific device. Used for resource management
* and system state reporting.
*/
@Serializable
@SerialName("system.log")
public data class SystemLogMessage(
val message: String,
val component: String,
override val sourceDevice: Name = "system".asName(),
override val targetDevice: Name? = null,
val details: Map<String, String> = emptyMap(),
@EncodeDefault override val time: Instant = Clock.System.now(),
) : Message() {
override fun changeSource(block: (Name) -> Name): Message = copy(sourceDevice = block(sourceDevice))
}
/**
* Transaction messages for coordinating transactional operations
*/
@Serializable
public sealed class TransactionMessage : Message() {
public abstract val transactionId: String
@Serializable
@SerialName("transaction.started")
public data class Started(
override val transactionId: String,
override val sourceDevice: Name = "transaction.manager".asName(),
override val targetDevice: Name? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : TransactionMessage() {
override fun changeSource(block: (Name) -> Name): TransactionMessage = copy(sourceDevice = block(sourceDevice))
}
@Serializable
@SerialName("transaction.committed")
public data class Committed(
override val transactionId: String,
override val sourceDevice: Name = "transaction.manager".asName(),
override val targetDevice: Name? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : TransactionMessage() {
override fun changeSource(block: (Name) -> Name): TransactionMessage = copy(sourceDevice = block(sourceDevice))
}
@Serializable
@SerialName("transaction.rolled_back")
public data class RolledBack(
override val transactionId: String,
val errorMessage: String? = null,
val errorType: String? = null,
override val sourceDevice: Name = "transaction.manager".asName(),
override val targetDevice: Name? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : TransactionMessage() {
override fun changeSource(block: (Name) -> Name): TransactionMessage = copy(sourceDevice = block(sourceDevice))
}
@Serializable
@SerialName("transaction.savepoint")
public data class Savepoint(
override val transactionId: String,
val savepointId: String,
override val sourceDevice: Name = "transaction.manager".asName(),
override val targetDevice: Name? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : TransactionMessage() {
override fun changeSource(block: (Name) -> Name): TransactionMessage = copy(sourceDevice = block(sourceDevice))
}
}
/**
* Device state messages for device lifecycle events
*/
@Serializable
public sealed class DeviceStateMessage : Message() {
public abstract val deviceName: String
@Serializable
@SerialName("device.state.added")
public data class Added(
override val deviceName: String,
override val sourceDevice: Name = "device.manager".asName(),
override val targetDevice: Name? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceStateMessage() {
override fun changeSource(block: (Name) -> Name): DeviceStateMessage = copy(sourceDevice = block(sourceDevice))
}
@Serializable
@SerialName("device.state.started")
public data class Started(
override val deviceName: String,
override val sourceDevice: Name = "device.manager".asName(),
override val targetDevice: Name? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceStateMessage() {
override fun changeSource(block: (Name) -> Name): DeviceStateMessage = copy(sourceDevice = block(sourceDevice))
}
@Serializable
@SerialName("device.state.stopped")
public data class Stopped(
override val deviceName: String,
override val sourceDevice: Name = "device.manager".asName(),
override val targetDevice: Name? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceStateMessage() {
override fun changeSource(block: (Name) -> Name): DeviceStateMessage = copy(sourceDevice = block(sourceDevice))
}
@Serializable
@SerialName("device.state.removed")
public data class Removed(
override val deviceName: String,
override val sourceDevice: Name = "device.manager".asName(),
override val targetDevice: Name? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceStateMessage() {
override fun changeSource(block: (Name) -> Name): DeviceStateMessage = copy(sourceDevice = block(sourceDevice))
}
@Serializable
@SerialName("device.state.failed")
public data class Failed(
override val deviceName: String,
val errorMessage: String,
val errorType: String? = null,
override val sourceDevice: Name = "device.manager".asName(),
override val targetDevice: Name? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceStateMessage() {
override fun changeSource(block: (Name) -> Name): DeviceStateMessage = copy(sourceDevice = block(sourceDevice))
}
@Serializable
@SerialName("device.state.detached")
public data class Detached(
override val deviceName: String,
override val sourceDevice: Name = "device.manager".asName(),
override val targetDevice: Name? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceStateMessage() {
override fun changeSource(block: (Name) -> Name): DeviceStateMessage = copy(sourceDevice = block(sourceDevice))
}
}
/**
* Metric messages for monitoring and observability
*/
@Serializable
public sealed class MetricMessage : Message() {
public abstract val name: String
@Serializable
@SerialName("metrics.value")
public data class Value(
override val name: String,
val value: Double,
override val sourceDevice: Name = "metrics".asName(),
override val targetDevice: Name? = null,
val tags: Map<String, String> = emptyMap(),
@EncodeDefault override val time: Instant = Clock.System.now(),
) : MetricMessage() {
override fun changeSource(block: (Name) -> Name): MetricMessage = copy(sourceDevice = block(sourceDevice))
}
@Serializable
@SerialName("metrics.counter")
public data class Counter(
override val name: String,
val increment: Double = 1.0,
override val sourceDevice: Name = "metrics".asName(),
override val targetDevice: Name? = null,
val tags: Map<String, String> = emptyMap(),
@EncodeDefault override val time: Instant = Clock.System.now(),
) : MetricMessage() {
override fun changeSource(block: (Name) -> Name): MetricMessage = copy(sourceDevice = block(sourceDevice))
}
@Serializable
@SerialName("metrics.duration")
public data class Duration(
override val name: String,
val durationMs: Long,
override val sourceDevice: Name = "metrics".asName(),
override val targetDevice: Name? = null,
val tags: Map<String, String> = emptyMap(),
@EncodeDefault override val time: Instant = Clock.System.now(),
) : MetricMessage() {
override fun changeSource(block: (Name) -> Name): MetricMessage = copy(sourceDevice = block(sourceDevice))
}
@Serializable
@SerialName("metrics.distribution")
public data class Distribution(
override val name: String,
val value: Double,
override val sourceDevice: Name = "metrics".asName(),
override val targetDevice: Name? = null,
val tags: Map<String, String> = emptyMap(),
@EncodeDefault override val time: Instant = Clock.System.now(),
) : MetricMessage() {
override fun changeSource(block: (Name) -> Name): MetricMessage = copy(sourceDevice = block(sourceDevice))
}
@Serializable
@SerialName("metrics.gauge")
public data class Gauge(
override val name: String,
val value: Double,
override val sourceDevice: Name = "metrics".asName(),
override val targetDevice: Name? = null,
val tags: Map<String, String> = emptyMap(),
@EncodeDefault override val time: Instant = Clock.System.now(),
) : MetricMessage() {
override fun changeSource(block: (Name) -> Name): MetricMessage = copy(sourceDevice = block(sourceDevice))
}
}
public fun DeviceMessage.toMeta(): Meta = Json.encodeToJsonElement(this).toMeta()

View File

@@ -60,7 +60,6 @@ public suspend fun Device.respondMessage(deviceTarget: Name, request: DeviceMess
is EmptyDeviceMessage,
is DeviceLogMessage,
is DeviceLifeCycleMessage,
is SystemLogMessage,
-> null
}
} catch (ex: Exception) {