Compare commits

...

6 Commits

23 changed files with 428 additions and 72 deletions

View File

@ -3,8 +3,11 @@
## Unreleased
### Added
- Value averaging plot extension
- PLC4X bindings
### Changed
- Constructor properties return `DeviceStat` in order to be able to subscribe to them
### Deprecated

View File

@ -6,14 +6,14 @@ plugins {
}
val dataforgeVersion: String by extra("0.8.0")
val visionforgeVersion by extra("0.4.0")
val visionforgeVersion by extra("0.4.1")
val ktorVersion: String by extra(space.kscience.gradle.KScienceVersions.ktorVersion)
val rsocketVersion by extra("0.15.4")
val xodusVersion by extra("2.0.1")
allprojects {
group = "space.kscience"
version = "0.3.0"
version = "0.3.1-dev-1"
repositories{
maven("https://maven.pkg.jetbrains.space/spc/p/sci/dev")
}

View File

@ -10,7 +10,6 @@ import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import kotlin.properties.PropertyDelegateProvider
import kotlin.properties.ReadOnlyProperty
import kotlin.properties.ReadWriteProperty
import kotlin.reflect.KProperty
import kotlin.time.Duration
@ -55,17 +54,17 @@ public abstract class DeviceConstructor(
/**
* Register a property and provide a direct reader for it
*/
public fun <T : Any> property(
state: DeviceState<T>,
public fun <T, S: DeviceState<T>> property(
state: S,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
nameOverride: String? = null,
): PropertyDelegateProvider<DeviceConstructor, ReadOnlyProperty<DeviceConstructor, T>> =
): PropertyDelegateProvider<DeviceConstructor, ReadOnlyProperty<DeviceConstructor, S>> =
PropertyDelegateProvider { _: DeviceConstructor, property ->
val name = nameOverride ?: property.name
val descriptor = PropertyDescriptor(name).apply(descriptorBuilder)
registerProperty(descriptor, state)
ReadOnlyProperty { _: DeviceConstructor, _ ->
state.value
state
}
}
@ -79,37 +78,14 @@ public abstract class DeviceConstructor(
initialState: T,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
nameOverride: String? = null,
): PropertyDelegateProvider<DeviceConstructor, ReadOnlyProperty<DeviceConstructor, T>> = property(
): PropertyDelegateProvider<DeviceConstructor, ReadOnlyProperty<DeviceConstructor, DeviceState<T>>> = property(
DeviceState.external(this, metaConverter, readInterval, initialState, reader),
descriptorBuilder,
nameOverride,
)
/**
* Register a mutable property and provide a direct reader for it
*/
public fun <T : Any> mutableProperty(
state: MutableDeviceState<T>,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
nameOverride: String? = null,
): PropertyDelegateProvider<DeviceConstructor, ReadWriteProperty<DeviceConstructor, T>> =
PropertyDelegateProvider { _: DeviceConstructor, property ->
val name = nameOverride ?: property.name
val descriptor = PropertyDescriptor(name).apply(descriptorBuilder)
registerProperty(descriptor, state)
object : ReadWriteProperty<DeviceConstructor, T> {
override fun getValue(thisRef: DeviceConstructor, property: KProperty<*>): T = state.value
override fun setValue(thisRef: DeviceConstructor, property: KProperty<*>, value: T) {
state.value = value
}
}
}
/**
* Register external state as a property
* Register a mutable external state as a property
*/
public fun <T : Any> mutableProperty(
metaConverter: MetaConverter<T>,
@ -119,22 +95,22 @@ public abstract class DeviceConstructor(
initialState: T,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
nameOverride: String? = null,
): PropertyDelegateProvider<DeviceConstructor, ReadWriteProperty<DeviceConstructor, T>> = mutableProperty(
): PropertyDelegateProvider<DeviceConstructor, ReadOnlyProperty<DeviceConstructor, MutableDeviceState<T>>> = property(
DeviceState.external(this, metaConverter, readInterval, initialState, reader, writer),
descriptorBuilder,
nameOverride,
)
/**
* Create and register a virtual property with optional [callback]
* Create and register a virtual mutable property with optional [callback]
*/
public fun <T : Any> state(
public fun <T> virtualProperty(
metaConverter: MetaConverter<T>,
initialState: T,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
nameOverride: String? = null,
callback: (T) -> Unit = {},
): PropertyDelegateProvider<DeviceConstructor, ReadWriteProperty<DeviceConstructor, T>> = mutableProperty(
): PropertyDelegateProvider<DeviceConstructor, ReadOnlyProperty<DeviceConstructor, MutableDeviceState<T>>> = property(
DeviceState.virtual(metaConverter, initialState, callback),
descriptorBuilder,
nameOverride,

View File

@ -28,7 +28,7 @@ public open class DeviceGroup(
) : DeviceHub, CachingDevice {
internal class Property(
val state: DeviceState<out Any>,
val state: DeviceState<*>,
val descriptor: PropertyDescriptor,
)
@ -82,7 +82,7 @@ public open class DeviceGroup(
/**
* Register a new property based on [DeviceState]. Properties could be modified dynamically
*/
public fun registerProperty(descriptor: PropertyDescriptor, state: DeviceState<out Any>) {
public fun registerProperty(descriptor: PropertyDescriptor, state: DeviceState<*>) {
val name = descriptor.name.parseAsName()
require(properties[name] == null) { "Can't add property with name $name. It already exists." }
properties[name] = Property(state, descriptor)

View File

@ -1,6 +1,7 @@
package space.kscience.controls.constructor
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
@ -11,6 +12,7 @@ import space.kscience.controls.spec.MutableDevicePropertySpec
import space.kscience.controls.spec.name
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaConverter
import kotlin.reflect.KProperty
import kotlin.time.Duration
/**
@ -29,6 +31,13 @@ public val <T> DeviceState<T>.metaFlow: Flow<Meta> get() = valueFlow.map(convert
public val <T> DeviceState<T>.valueAsMeta: Meta get() = converter.convert(value)
public operator fun <T> DeviceState<T>.getValue(thisRef: Any?, property: KProperty<*>): T = value
/**
* Collect values in a given [scope]
*/
public fun <T> DeviceState<T>.collectValuesIn(scope: CoroutineScope, block: suspend (T)->Unit): Job =
valueFlow.onEach(block).launchIn(scope)
/**
* A mutable state of a device
@ -37,7 +46,11 @@ public interface MutableDeviceState<T> : DeviceState<T> {
override var value: T
}
public var <T : Any> MutableDeviceState<T>.valueAsMeta: Meta
public operator fun <T> MutableDeviceState<T>.setValue(thisRef: Any?, property: KProperty<*>, value: T) {
this.value = value
}
public var <T> MutableDeviceState<T>.valueAsMeta: Meta
get() = converter.convert(value)
set(arg) {
value = converter.read(arg)
@ -216,6 +229,9 @@ private class MutableExternalState<T>(
}
}
/**
* Create a [DeviceState] that regularly reads and caches an external value
*/
public fun <T> DeviceState.Companion.external(
scope: CoroutineScope,
converter: MetaConverter<T>,

View File

@ -10,6 +10,7 @@ import space.kscience.controls.api.PropertyChangedMessage
import space.kscience.controls.spec.DevicePropertySpec
import space.kscience.controls.spec.name
import space.kscience.dataforge.meta.MetaConverter
import space.kscience.dataforge.names.Name
/**
* An interface for device property history.
@ -34,6 +35,7 @@ public interface PropertyHistory<T> {
public class CollectedPropertyHistory<T>(
public val scope: CoroutineScope,
eventFlow: Flow<DeviceMessage>,
public val deviceName: Name,
public val propertyName: String,
public val converter: MetaConverter<T>,
maxSize: Int = 1000,
@ -41,7 +43,7 @@ public class CollectedPropertyHistory<T>(
private val store: SharedFlow<ValueWithTime<T>> = eventFlow
.filterIsInstance<PropertyChangedMessage>()
.filter { it.property == propertyName }
.filter { it.sourceDevice == deviceName && it.property == propertyName }
.map { ValueWithTime(converter.read(it.value), it.time) }
.shareIn(scope, started = SharingStarted.Eagerly, replay = maxSize)
@ -54,13 +56,15 @@ public class CollectedPropertyHistory<T>(
*/
public fun <T> Device.collectPropertyHistory(
scope: CoroutineScope = this,
deviceName: Name,
propertyName: String,
converter: MetaConverter<T>,
maxSize: Int = 1000,
): PropertyHistory<T> = CollectedPropertyHistory(scope, messageFlow, propertyName, converter, maxSize)
): PropertyHistory<T> = CollectedPropertyHistory(scope, messageFlow, deviceName, propertyName, converter, maxSize)
public fun <D : Device, T> D.collectPropertyHistory(
scope: CoroutineScope = this,
deviceName: Name,
spec: DevicePropertySpec<D, T>,
maxSize: Int = 1000,
): PropertyHistory<T> = collectPropertyHistory(scope, spec.name, spec.converter, maxSize)
): PropertyHistory<T> = collectPropertyHistory(scope, deviceName, spec.name, spec.converter, maxSize)

View File

@ -75,7 +75,7 @@ public class DeviceNameSpace(
browseName = newQualifiedName(propertyName)
displayName = LocalizedText.english(propertyName)
dataType = if (descriptor.metaDescriptor.children.isNotEmpty()) {
dataType = if (descriptor.metaDescriptor.nodes.isNotEmpty()) {
Identifiers.String
} else when (descriptor.metaDescriptor.valueTypes?.first()) {
null, ValueType.STRING, ValueType.NULL -> Identifiers.String

View File

@ -0,0 +1,24 @@
import space.kscience.gradle.Maturity
plugins {
id("space.kscience.gradle.mpp")
`maven-publish`
}
val plc4xVersion = "0.12.0"
description = """
A plugin for Controls-kt device server on top of plc4x library
""".trimIndent()
kscience{
jvm()
jvmMain{
api(projects.controlsCore)
api("org.apache.plc4x:plc4j-spi:$plc4xVersion")
}
}
readme{
maturity = Maturity.EXPERIMENTAL
}

View File

@ -0,0 +1,28 @@
package space.kscience.controls.plc4x
import kotlinx.coroutines.future.await
import org.apache.plc4x.java.api.PlcConnection
import org.apache.plc4x.java.api.messages.PlcWriteRequest
import space.kscience.controls.api.Device
import space.kscience.dataforge.meta.Meta
public interface Plc4XDevice: Device {
public val connection: PlcConnection
public suspend fun read(plc4xProperty: Plc4xProperty): Meta = with(plc4xProperty){
val request = connection.readRequestBuilder().request().build()
val response = request.execute().await()
response.readProperty()
}
public suspend fun write(plc4xProperty: Plc4xProperty, value: Meta): Unit = with(plc4xProperty){
val request: PlcWriteRequest = connection.writeRequestBuilder().writeProperty(value).build()
request.execute().await()
}
public suspend fun subscribe(propertyName: String, plc4xProperty: Plc4xProperty): Unit = with(plc4xProperty){
}
}

View File

@ -0,0 +1,31 @@
package space.kscience.controls.plc4x
import org.apache.plc4x.java.api.messages.PlcReadRequest
import org.apache.plc4x.java.api.messages.PlcReadResponse
import org.apache.plc4x.java.api.messages.PlcWriteRequest
import org.apache.plc4x.java.api.types.PlcValueType
import space.kscience.dataforge.meta.Meta
public interface Plc4xProperty {
public fun PlcReadRequest.Builder.request(): PlcReadRequest.Builder
public fun PlcReadResponse.readProperty(): Meta
public fun PlcWriteRequest.Builder.writeProperty(meta: Meta): PlcWriteRequest.Builder
}
public class DefaultPlc4xProperty(
private val address: String,
private val plcValueType: PlcValueType,
private val name: String = "@default",
) : Plc4xProperty {
override fun PlcReadRequest.Builder.request(): PlcReadRequest.Builder =
addTagAddress(name, address)
override fun PlcReadResponse.readProperty(): Meta =
asPlcValue.toMeta()
override fun PlcWriteRequest.Builder.writeProperty(meta: Meta): PlcWriteRequest.Builder =
addTagAddress(name, address, meta.toPlcValue(plcValueType))
}

View File

@ -0,0 +1,123 @@
package space.kscience.controls.plc4x
import org.apache.plc4x.java.api.types.PlcValueType
import org.apache.plc4x.java.api.value.PlcValue
import org.apache.plc4x.java.spi.values.*
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.names.asName
import java.math.BigInteger
internal fun PlcValue.toMeta(): Meta = Meta {
when (plcValueType) {
null, PlcValueType.NULL -> value = Null
PlcValueType.BOOL -> value = this@toMeta.boolean.asValue()
PlcValueType.BYTE -> this@toMeta.byte.asValue()
PlcValueType.WORD -> this@toMeta.short.asValue()
PlcValueType.DWORD -> this@toMeta.int.asValue()
PlcValueType.LWORD -> this@toMeta.long.asValue()
PlcValueType.USINT -> this@toMeta.short.asValue()
PlcValueType.UINT -> this@toMeta.int.asValue()
PlcValueType.UDINT -> this@toMeta.long.asValue()
PlcValueType.ULINT -> this@toMeta.bigInteger.asValue()
PlcValueType.SINT -> this@toMeta.byte.asValue()
PlcValueType.INT -> this@toMeta.short.asValue()
PlcValueType.DINT -> this@toMeta.int.asValue()
PlcValueType.LINT -> this@toMeta.long.asValue()
PlcValueType.REAL -> this@toMeta.float.asValue()
PlcValueType.LREAL -> this@toMeta.double.asValue()
PlcValueType.CHAR -> this@toMeta.int.asValue()
PlcValueType.WCHAR -> this@toMeta.short.asValue()
PlcValueType.STRING -> this@toMeta.string.asValue()
PlcValueType.WSTRING -> this@toMeta.string.asValue()
PlcValueType.TIME -> this@toMeta.duration.toString().asValue()
PlcValueType.LTIME -> this@toMeta.duration.toString().asValue()
PlcValueType.DATE -> this@toMeta.date.toString().asValue()
PlcValueType.LDATE -> this@toMeta.date.toString().asValue()
PlcValueType.TIME_OF_DAY -> this@toMeta.time.toString().asValue()
PlcValueType.LTIME_OF_DAY -> this@toMeta.time.toString().asValue()
PlcValueType.DATE_AND_TIME -> this@toMeta.dateTime.toString().asValue()
PlcValueType.DATE_AND_LTIME -> this@toMeta.dateTime.toString().asValue()
PlcValueType.LDATE_AND_TIME -> this@toMeta.dateTime.toString().asValue()
PlcValueType.Struct -> this@toMeta.struct.forEach { (name, item) ->
set(name, item.toMeta())
}
PlcValueType.List -> {
val listOfMeta = this@toMeta.list.map { it.toMeta() }
if (listOfMeta.all { it.items.isEmpty() }) {
value = listOfMeta.map { it.value ?: Null }.asValue()
} else {
setIndexed("@list".asName(), list.map { it.toMeta() })
}
}
PlcValueType.RAW_BYTE_ARRAY -> this@toMeta.raw.asValue()
}
}
private fun Value.toPlcValue(): PlcValue = when (type) {
ValueType.NUMBER -> when (val number = number) {
is Short -> PlcINT(number.toShort())
is Int -> PlcDINT(number.toInt())
is Long -> PlcLINT(number.toLong())
is Float -> PlcREAL(number.toFloat())
else -> PlcLREAL(number.toDouble())
}
ValueType.STRING -> PlcSTRING(string)
ValueType.BOOLEAN -> PlcBOOL(boolean)
ValueType.NULL -> PlcNull()
ValueType.LIST -> TODO()
}
internal fun Meta.toPlcValue(hint: PlcValueType): PlcValue = when (hint) {
PlcValueType.Struct -> PlcStruct(
items.entries.associate { (token, item) ->
token.toString() to item.toPlcValue(PlcValueType.Struct)
}
)
PlcValueType.NULL -> PlcNull()
PlcValueType.BOOL -> PlcBOOL(boolean)
PlcValueType.BYTE -> PlcBYTE(int)
PlcValueType.WORD -> PlcWORD(int)
PlcValueType.DWORD -> PlcDWORD(int)
PlcValueType.LWORD -> PlcLWORD(long)
PlcValueType.USINT -> PlcLWORD(short)
PlcValueType.UINT -> PlcUINT(int)
PlcValueType.UDINT -> PlcDINT(long)
PlcValueType.ULINT -> (number as? BigInteger)?.let { PlcULINT(it) } ?: PlcULINT(long)
PlcValueType.SINT -> PlcSINT(int)
PlcValueType.INT -> PlcINT(int)
PlcValueType.DINT -> PlcDINT(int)
PlcValueType.LINT -> PlcLINT(long)
PlcValueType.REAL -> PlcREAL(float)
PlcValueType.LREAL -> PlcLREAL(double)
PlcValueType.CHAR -> PlcCHAR(int)
PlcValueType.WCHAR -> PlcWCHAR(short)
PlcValueType.STRING -> PlcSTRING(string)
PlcValueType.WSTRING -> PlcWSTRING(string)
PlcValueType.TIME -> PlcTIME(string?.let { java.time.Duration.parse(it) })
PlcValueType.LTIME -> PlcLTIME(string?.let { java.time.Duration.parse(it) })
PlcValueType.DATE -> PlcDATE(string?.let { java.time.LocalDate.parse(it) })
PlcValueType.LDATE -> PlcLDATE(string?.let { java.time.LocalDate.parse(it) })
PlcValueType.TIME_OF_DAY -> PlcTIME_OF_DAY(string?.let { java.time.LocalTime.parse(it) })
PlcValueType.LTIME_OF_DAY -> PlcLTIME_OF_DAY(string?.let { java.time.LocalTime.parse(it) })
PlcValueType.DATE_AND_TIME -> PlcDATE_AND_TIME(string?.let { java.time.LocalDateTime.parse(it) })
PlcValueType.DATE_AND_LTIME -> PlcDATE_AND_LTIME(string?.let { java.time.LocalDateTime.parse(it) })
PlcValueType.LDATE_AND_TIME -> PlcLDATE_AND_TIME(string?.let { java.time.LocalDateTime.parse(it) })
PlcValueType.List -> PlcList().apply {
value?.list?.forEach { add(it.toPlcValue()) }
getIndexed("@list").forEach { (_, meta) ->
if (meta.items.isEmpty()) {
meta.value?.let { add(it.toPlcValue()) }
} else {
add(meta.toPlcValue(PlcValueType.Struct))
}
}
}
PlcValueType.RAW_BYTE_ARRAY -> PlcRawByteArray(
value?.list?.map { it.number.toByte() }?.toByteArray() ?: error("The meta content is not byte array")
)
}

View File

@ -2,13 +2,8 @@
package space.kscience.controls.vision
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.sample
import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Clock
@ -22,6 +17,7 @@ import space.kscience.controls.spec.DevicePropertySpec
import space.kscience.controls.spec.name
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.plotly.Plot
import space.kscience.plotly.bar
import space.kscience.plotly.models.Bar
@ -183,4 +179,59 @@ public fun Plot.plotBooleanState(
configuration: Bar.() -> Unit = {},
): Job = bar(configuration).run {
updateFromState(context, state, { asValue() }, maxAge, maxPoints, minPoints, sampling)
}
}
private fun <T> Flow<T>.chunkedByPeriod(duration: Duration): Flow<List<T>> {
val collector: ArrayDeque<T> = ArrayDeque<T>()
return channelFlow {
launch {
while (isActive) {
delay(duration)
send(ArrayList(collector))
collector.clear()
}
}
this@chunkedByPeriod.collect {
collector.add(it)
}
}
}
private fun List<Instant>.averageTime(): Instant {
val min = min()
val max = max()
val duration = max - min
return min + duration / 2
}
/**
* Average property value by [averagingInterval]. Return [missingValue] on each sample interval if no events arrived.
*/
@DFExperimental
public fun Plot.plotAveragedDeviceProperty(
device: Device,
propertyName: String,
missingValue: Double = 0.0,
extractValue: Meta.() -> Double = { value?.double ?: missingValue },
maxAge: Duration = defaultMaxAge,
maxPoints: Int = defaultMaxPoints,
minPoints: Int = defaultMinPoints,
averagingInterval: Duration = defaultSampling,
coroutineScope: CoroutineScope = device.context,
configuration: Scatter.() -> Unit = {},
): Job = scatter(configuration).run {
val data = TimeData()
device.propertyMessageFlow(propertyName).chunkedByPeriod(averagingInterval).transform { eventList ->
if (eventList.isEmpty()) {
data.append(Clock.System.now(), missingValue.asValue())
} else {
val time = eventList.map { it.time }.averageTime()
val value = eventList.map { extractValue(it.value) }.average()
data.append(time, value.asValue())
}
data.trim(maxAge, maxPoints, minPoints)
emit(data)
}.onEach {
it.fillPlot(x, y)
}.launchIn(coroutineScope)
}

View File

@ -1,29 +1,17 @@
package space.kscience.controls.vision
import kotlinx.serialization.modules.SerializersModule
import org.w3c.dom.Element
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.visionforge.Vision
import space.kscience.visionforge.VisionPlugin
import space.kscience.visionforge.html.ElementVisionRenderer
public actual class ControlVisionPlugin : VisionPlugin(), ElementVisionRenderer {
public actual class ControlVisionPlugin : VisionPlugin() {
override val tag: PluginTag get() = Companion.tag
override val visionSerializersModule: SerializersModule get() = controlsVisionSerializersModule
override fun rateVision(vision: Vision): Int {
TODO("Not yet implemented")
}
override fun render(element: Element, name: Name, vision: Vision, meta: Meta) {
TODO("Not yet implemented")
}
public actual companion object : PluginFactory<ControlVisionPlugin> {
override val tag: PluginTag = PluginTag("controls.vision")

View File

@ -14,7 +14,7 @@ public final class space/kscience/controls/demo/constructor/LinearDrive : space/
public final fun getDrive ()Lspace/kscience/controls/constructor/Drive;
public final fun getEnd ()Lspace/kscience/controls/constructor/LimitSwitch;
public final fun getPid ()Lspace/kscience/controls/constructor/PidRegulator;
public final fun getPosition ()D
public final fun getPositionState ()Lspace/kscience/controls/constructor/DoubleRangeState;
public final fun getStart ()Lspace/kscience/controls/constructor/LimitSwitch;
public final fun getTarget ()D
public final fun setTarget (D)V

View File

@ -3,7 +3,7 @@ import org.jetbrains.kotlin.gradle.dsl.ExplicitApiMode
plugins {
id("space.kscience.gradle.mpp")
id("org.jetbrains.compose") version "1.5.11"
alias(spclibs.plugins.compose)
}
kscience {

View File

@ -52,8 +52,9 @@ class LinearDrive(
val end by device(LimitSwitch.factory(state.atEndState))
val position by property(state)
var target by mutableProperty(pid.mutablePropertyAsState(Regulator.target, 0.0))
val positionState: DoubleRangeState by property(state)
private val targetState: MutableDeviceState<Double> by property(pid.mutablePropertyAsState(Regulator.target, 0.0))
var target by targetState
}

View File

@ -7,4 +7,4 @@ org.gradle.parallel=true
org.gradle.configureondemand=true
org.gradle.jvmargs=-Xmx4096m
toolsVersion=0.15.2-kotlin-1.9.21
toolsVersion=0.15.2-kotlin-1.9.22

View File

@ -0,0 +1,27 @@
import space.kscience.gradle.Maturity
plugins {
id("space.kscience.gradle.mpp")
`maven-publish`
}
description = """
Common utilities and services for Magix endpoints.
""".trimIndent()
val dataforgeVersion: String by rootProject.extra
kscience {
jvm()
js()
native()
useSerialization()
commonMain {
api(projects.magix.magixApi)
api("space.kscience:dataforge-meta:$dataforgeVersion")
}
}
readme {
maturity = Maturity.EXPERIMENTAL
}

View File

@ -127,11 +127,11 @@ public fun CoroutineScope.launchMagixRegistry(
*
* If [registryEndpoint] field is provided, send request only to given endpoint.
*
* @param endpointName the name of endpoint requesting a property
* @param sourceEndpoint the name of endpoint requesting a property
*/
public suspend fun MagixEndpoint.getProperty(
propertyName: String,
endpointName: String,
sourceEndpoint: String,
user: JsonElement? = null,
registryEndpoint: String? = null,
): Flow<Pair<String, JsonElement>> = subscribe(
@ -146,7 +146,7 @@ public suspend fun MagixEndpoint.getProperty(
send(
MagixRegistryMessage.format,
MagixRegistryRequestMessage(propertyName),
source = endpointName,
source = sourceEndpoint,
target = registryEndpoint,
user = user
)

View File

@ -0,0 +1,82 @@
package space.kscience.magix.services
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.onEach
import kotlinx.serialization.json.JsonNull
import kotlinx.serialization.json.JsonPrimitive
import kotlinx.serialization.json.jsonPrimitive
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.MagixMessageFilter
import space.kscience.magix.api.send
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
public class WatcherEndpointWrapper(
private val scope: CoroutineScope,
private val endpointName: String,
private val endpoint: MagixEndpoint,
private val meta: Meta,
) : MagixEndpoint {
private val watchDogJob: Job = scope.launch {
val filter = MagixMessageFilter(
format = listOf(MAGIX_WATCHDOG_FORMAT),
target = listOf(null, endpointName)
)
endpoint.subscribe(filter).filter {
it.payload.jsonPrimitive.content == MAGIX_PING
}.onEach { request ->
endpoint.send(
MagixMessage(
MAGIX_WATCHDOG_FORMAT,
JsonPrimitive(MAGIX_PONG),
sourceEndpoint = endpointName,
targetEndpoint = request.sourceEndpoint,
parentId = request.id
)
)
}.collect()
}
private val heartBeatDelay: Duration = meta["heartbeat.period"].string?.let { Duration.parse(it) } ?: 10.seconds
//TODO add update from registry
private val heartBeatJob = scope.launch {
while (isActive){
delay(heartBeatDelay)
endpoint.send(
MagixMessage(
MAGIX_HEARTBEAT_FORMAT,
JsonNull, //TODO consider adding timestamp
endpointName
)
)
}
}
override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage> = endpoint.subscribe(filter)
override suspend fun broadcast(message: MagixMessage) {
endpoint.broadcast(message)
}
override fun close() {
endpoint.close()
watchDogJob.cancel()
heartBeatJob.cancel()
}
public companion object {
public const val MAGIX_WATCHDOG_FORMAT: String = "magix.watchdog"
public const val MAGIX_PING: String = "ping"
public const val MAGIX_PONG: String = "pong"
public const val MAGIX_HEARTBEAT_FORMAT: String = "magix.heartbeat"
}
}

View File

@ -47,6 +47,7 @@ include(
":controls-server",
":controls-opcua",
":controls-modbus",
":controls-plc4x",
// ":controls-mongo",
":controls-storage",
":controls-storage:controls-xodus",
@ -55,6 +56,7 @@ include(
":controls-jupyter",
":magix",
":magix:magix-api",
":magix:magix-utils",
":magix:magix-server",
":magix:magix-rsocket",
":magix:magix-java-endpoint",