From 28ec2bc8b8135072ddb154abbaf17325f667cc41 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Mon, 4 Mar 2024 11:12:16 +0300 Subject: [PATCH] Add PropertyHistory API --- .../kscience/controls/misc/PropertyHistory.kt | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 controls-core/src/commonMain/kotlin/space/kscience/controls/misc/PropertyHistory.kt diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/misc/PropertyHistory.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/misc/PropertyHistory.kt new file mode 100644 index 0000000..5b58321 --- /dev/null +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/misc/PropertyHistory.kt @@ -0,0 +1,66 @@ +package space.kscience.controls.misc + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.* +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant +import space.kscience.controls.api.Device +import space.kscience.controls.api.DeviceMessage +import space.kscience.controls.api.PropertyChangedMessage +import space.kscience.controls.spec.DevicePropertySpec +import space.kscience.controls.spec.name +import space.kscience.dataforge.meta.transformations.MetaConverter + +/** + * An interface for device property history. + */ +public interface PropertyHistory { + /** + * Flow property values filtered by a time range. The implementation could flow it as a chunk or provide paging. + * So the resulting flow is allowed to suspend. + * + * If [until] is in the future, the resulting flow is potentially unlimited. + * Theoretically, it could be also unlimited if the event source keeps producing new event with timestamp in a given range. + */ + public fun flowHistory( + from: Instant = Instant.DISTANT_PAST, + until: Instant = Clock.System.now(), + ): Flow> +} + +/** + * An in-memory property values history collector + */ +public class CollectedPropertyHistory( + public val scope: CoroutineScope, + eventFlow: Flow, + public val propertyName: String, + public val converter: MetaConverter, + maxSize: Int = 1000, +) : PropertyHistory { + + private val store: SharedFlow> = eventFlow + .filterIsInstance() + .filter { it.property == propertyName } + .map { ValueWithTime(converter.metaToObject(it.value), it.time) } + .shareIn(scope, started = SharingStarted.Eagerly, replay = maxSize) + + override fun flowHistory(from: Instant, until: Instant): Flow> = + store.filter { it.time in from..until } +} + +/** + * Collect and store in memory device property changes for a given property + */ +public fun Device.collectPropertyHistory( + scope: CoroutineScope = this, + propertyName: String, + converter: MetaConverter, + maxSize: Int = 1000, +): PropertyHistory = CollectedPropertyHistory(scope, messageFlow, propertyName, converter, maxSize) + +public fun D.collectPropertyHistory( + scope: CoroutineScope = this, + spec: DevicePropertySpec, + maxSize: Int = 1000, +): PropertyHistory = collectPropertyHistory(scope, spec.name, spec.converter, maxSize) \ No newline at end of file