Make DeviceMessage time mandatory
This commit is contained in:
parent
9edf3b13ef
commit
cfd9eb053c
@ -22,7 +22,7 @@ public sealed class DeviceMessage {
|
||||
public abstract val sourceDevice: Name?
|
||||
public abstract val targetDevice: Name?
|
||||
public abstract val comment: String?
|
||||
public abstract val time: Instant?
|
||||
public abstract val time: Instant
|
||||
|
||||
/**
|
||||
* Update the source device name for composition. If the original name is null, the resulting name is also null.
|
||||
@ -59,7 +59,7 @@ public data class PropertyChangedMessage(
|
||||
override val sourceDevice: Name = Name.EMPTY,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
@ -75,7 +75,7 @@ public data class PropertySetMessage(
|
||||
override val sourceDevice: Name? = null,
|
||||
override val targetDevice: Name,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||
}
|
||||
@ -91,7 +91,7 @@ public data class PropertyGetMessage(
|
||||
override val sourceDevice: Name? = null,
|
||||
override val targetDevice: Name,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||
}
|
||||
@ -105,7 +105,7 @@ public data class GetDescriptionMessage(
|
||||
override val sourceDevice: Name? = null,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||
}
|
||||
@ -122,7 +122,7 @@ public data class DescriptionMessage(
|
||||
override val sourceDevice: Name,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
@ -141,7 +141,7 @@ public data class ActionExecuteMessage(
|
||||
override val sourceDevice: Name? = null,
|
||||
override val targetDevice: Name,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||
}
|
||||
@ -160,7 +160,7 @@ public data class ActionResultMessage(
|
||||
override val sourceDevice: Name,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
@ -175,7 +175,7 @@ public data class BinaryNotificationMessage(
|
||||
override val sourceDevice: Name,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
@ -190,7 +190,7 @@ public data class EmptyDeviceMessage(
|
||||
override val sourceDevice: Name? = null,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
|
||||
}
|
||||
@ -206,7 +206,7 @@ public data class DeviceLogMessage(
|
||||
override val sourceDevice: Name = Name.EMPTY,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
@ -223,7 +223,7 @@ public data class DeviceErrorMessage(
|
||||
override val sourceDevice: Name = Name.EMPTY,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
@ -238,7 +238,7 @@ public data class DeviceLifeCycleMessage(
|
||||
override val sourceDevice: Name = Name.EMPTY,
|
||||
override val targetDevice: Name? = null,
|
||||
override val comment: String? = null,
|
||||
@EncodeDefault override val time: Instant? = Clock.System.now(),
|
||||
@EncodeDefault override val time: Instant = Clock.System.now(),
|
||||
) : DeviceMessage() {
|
||||
override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
|
||||
}
|
||||
|
@ -55,7 +55,7 @@ public fun DeviceManager.launchMagixService(
|
||||
logger.error(error) { "Error while responding to message: ${error.message}" }
|
||||
}.launchIn(this)
|
||||
|
||||
hubMessageFlow(this).onEach { payload ->
|
||||
hubMessageFlow().onEach { payload ->
|
||||
endpoint.send(
|
||||
format = controlsMagixFormat,
|
||||
payload = payload,
|
||||
|
@ -60,7 +60,7 @@ internal class RemoteDeviceConnect {
|
||||
MagixMessage(
|
||||
format = DeviceManager.magixFormat.defaultFormat,
|
||||
payload = MagixEndpoint.magixJson.encodeToJsonElement(DeviceManager.magixFormat.serializer, it),
|
||||
sourceEndpoint = "test",
|
||||
sourceEndpoint = "source",
|
||||
)
|
||||
}
|
||||
|
||||
@ -76,7 +76,7 @@ internal class RemoteDeviceConnect {
|
||||
}
|
||||
}
|
||||
|
||||
val remoteDevice = virtualMagixEndpoint.remoteDevice(context, "test", Name.EMPTY)
|
||||
val remoteDevice = virtualMagixEndpoint.remoteDevice(context, "source", "target", Name.EMPTY)
|
||||
|
||||
assertContains(0.0..1.0, remoteDevice.read(TestDevice.value))
|
||||
}
|
||||
|
@ -118,7 +118,7 @@ public class DeviceNameSpace(
|
||||
//Subscribe on properties updates
|
||||
device.onPropertyChange {
|
||||
nodes[property]?.let { node ->
|
||||
val sourceTime = time?.let { DateTime(it.toJavaInstant()) }
|
||||
val sourceTime = DateTime(time.toJavaInstant())
|
||||
val newValue = value.toOpc(sourceTime = sourceTime)
|
||||
if (node.value.value != newValue.value) {
|
||||
node.value = newValue
|
||||
|
@ -19,7 +19,6 @@ import space.kscience.dataforge.context.request
|
||||
import space.kscience.dataforge.io.IOPlugin
|
||||
import space.kscience.dataforge.io.workDirectory
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.get
|
||||
import space.kscience.dataforge.meta.string
|
||||
import space.kscience.dataforge.misc.DFExperimental
|
||||
import space.kscience.dataforge.names.Name
|
||||
@ -39,9 +38,7 @@ internal fun StoreTransaction.writeMessage(message: DeviceMessage): Unit {
|
||||
message.targetDevice?.let {
|
||||
entity.setProperty(DeviceMessage::targetDevice.name, it.toString())
|
||||
}
|
||||
message.time?.let {
|
||||
entity.setProperty(DeviceMessage::targetDevice.name, it.toString())
|
||||
}
|
||||
entity.setProperty(DeviceMessage::targetDevice.name, message.time.toString())
|
||||
entity.setBlobString("json", Json.encodeToString(json))
|
||||
}
|
||||
|
||||
|
@ -31,7 +31,7 @@ public fun DeviceManager.storeMessages(
|
||||
val storage = factory.build(context, meta)
|
||||
logger.debug { "Message storage with meta = $meta created" }
|
||||
|
||||
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
|
||||
return hubMessageFlow().filter(filterCondition).onEach { message ->
|
||||
storage.write(message)
|
||||
}.onCompletion {
|
||||
storage.close()
|
||||
|
@ -104,10 +104,9 @@ public fun Plot.plotDeviceProperty(
|
||||
coroutineScope: CoroutineScope = device.context,
|
||||
configuration: Scatter.() -> Unit = {},
|
||||
): Job = scatter(configuration).run {
|
||||
val clock = device.context.clock
|
||||
val data = TimeData()
|
||||
device.propertyMessageFlow(propertyName).sample(sampling).transform {
|
||||
data.append(it.time ?: clock.now(), it.value.extractValue())
|
||||
data.append(it.time, it.value.extractValue())
|
||||
data.trim(maxAge, maxPoints, minPoints)
|
||||
emit(data)
|
||||
}.onEach {
|
||||
|
@ -103,8 +103,8 @@ suspend fun main() {
|
||||
|
||||
monitorEndpoint.subscribe(DeviceManager.magixFormat).onEach { (magixMessage, payload) ->
|
||||
mutex.withLock {
|
||||
val delay = Clock.System.now() - payload.time!!
|
||||
latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time!!
|
||||
val delay = Clock.System.now() - payload.time
|
||||
latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time
|
||||
max[magixMessage.sourceEndpoint] =
|
||||
maxOf(delay, max[magixMessage.sourceEndpoint] ?: ZERO)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user