diff --git a/build.gradle.kts b/build.gradle.kts index 29cdfc2..0b9825b 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -4,7 +4,7 @@ plugins { kotlin("js") apply false } -val dataforgeVersion: String by extra("0.2.0") +val dataforgeVersion: String by extra("0.2.1-dev-2") val ktorVersion: String by extra("1.4.3") val rsocketVersion by extra("0.11.1") diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt index d6f5d5f..01762a2 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt @@ -2,13 +2,12 @@ package hep.dataforge.control.api import hep.dataforge.context.ContextAware import hep.dataforge.control.api.Device.Companion.DEVICE_TARGET -import hep.dataforge.io.Envelope -import hep.dataforge.io.EnvelopeBuilder import hep.dataforge.meta.Meta import hep.dataforge.meta.MetaItem import hep.dataforge.provider.Type import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.cancel +import kotlinx.coroutines.flow.SharedFlow import kotlinx.io.Closeable /** @@ -28,27 +27,15 @@ public interface Device : Closeable, ContextAware { public val actionDescriptors: Collection /** - * The supervisor scope encompassing all operations on a device. When canceled, cancels all running processes + * The supervisor scope encompassing all operations on a device. When canceled, cancels all running processes. */ public val scope: CoroutineScope - /** - * Register a new property change listener for this device. - * [owner] is provided optionally in order for listener to be - * easily removable - */ - public fun registerListener(listener: DeviceListener, owner: Any? = listener) - - /** - * Remove all listeners belonging to the specified owner - */ - public fun removeListeners(owner: Any?) - /** * Get the value of the property or throw error if property in not defined. * Suspend if property value is not available */ - public suspend fun getProperty(propertyName: String): MetaItem<*> + public suspend fun getProperty(propertyName: String): MetaItem<*>? /** * Invalidate property and force recalculate @@ -61,11 +48,16 @@ public interface Device : Closeable, ContextAware { */ public suspend fun setProperty(propertyName: String, value: MetaItem<*>) + /** + * The [SharedFlow] of property changes + */ + public val propertyFlow: SharedFlow>> + /** * Send an action request and suspend caller while request is being processed. * Could return null if request does not return a meaningful answer. */ - public suspend fun execute(command: String, argument: MetaItem<*>? = null): MetaItem<*>? + public suspend fun execute(action: String, argument: MetaItem<*>? = null): MetaItem<*>? override fun close() { scope.cancel("The device is closed") @@ -76,14 +68,10 @@ public interface Device : Closeable, ContextAware { } } -public interface ResponderDevice{ - /** - * - * A request with binary data or for binary response (or both). This request does not cover basic functionality like - * [setProperty], [getProperty] or [execute] and not defined for a generic device. - * - */ - public suspend fun respondWithData(request: Envelope): EnvelopeBuilder +public suspend fun Device.getState(): Meta = Meta{ + for(descriptor in propertyDescriptors) { + descriptor.name put getProperty(descriptor.name) + } } -public suspend fun Device.execute(name: String, meta: Meta?): MetaItem<*>? = execute(name, meta?.let { MetaItem.NodeItem(it) }) \ No newline at end of file +//public suspend fun Device.execute(name: String, meta: Meta?): MetaItem<*>? = execute(name, meta?.let { MetaItem.NodeItem(it) }) \ No newline at end of file diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceListener.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceListener.kt deleted file mode 100644 index 483d05c..0000000 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceListener.kt +++ /dev/null @@ -1,14 +0,0 @@ -package hep.dataforge.control.api - -import hep.dataforge.meta.MetaItem - -/** - * PropertyChangeListener Interface - * [value] is a new value that property has after a change; null is for invalid state. - */ -public interface DeviceListener { - public fun propertyChanged(propertyName: String, value: MetaItem<*>?) - public fun actionExecuted(action: String, argument: MetaItem<*>?, result: MetaItem<*>?) {} - - //TODO add general message listener method -} \ No newline at end of file diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt index 9546d23..9ac7e2b 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt @@ -3,18 +3,25 @@ package hep.dataforge.control.base import hep.dataforge.context.Context import hep.dataforge.control.api.ActionDescriptor import hep.dataforge.control.api.Device -import hep.dataforge.control.api.DeviceListener import hep.dataforge.control.api.PropertyDescriptor +import hep.dataforge.meta.DFExperimental import hep.dataforge.meta.MetaItem import kotlinx.coroutines.* +import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +//TODO move to DataForge-core +@DFExperimental +public data class LogEntry(val content: String, val priority: Int = 0) + /** * Baseline implementation of [Device] interface */ +@Suppress("EXPERIMENTAL_API_USAGE") public abstract class DeviceBase(override val context: Context) : Device { private val _properties = HashMap() @@ -22,25 +29,21 @@ public abstract class DeviceBase(override val context: Context) : Device { private val _actions = HashMap() public val actions: Map get() = _actions - private val listeners = ArrayList>(4) + private val sharedPropertyFlow = MutableSharedFlow>>() - override fun registerListener(listener: DeviceListener, owner: Any?) { - listeners.add(owner to listener) - } + override val propertyFlow: SharedFlow>> get() = sharedPropertyFlow - override fun removeListeners(owner: Any?) { - listeners.removeAll { it.first == owner } - } + private val sharedLogFlow = MutableSharedFlow() - internal fun notifyListeners(block: DeviceListener.() -> Unit) { - listeners.forEach { it.second.block() } - } + /** + * The [SharedFlow] of log messages + */ + @DFExperimental + public val logFlow: SharedFlow + get() = sharedLogFlow - public fun notifyPropertyChanged(propertyName: String) { - scope.launch { - val value = getProperty(propertyName) - notifyListeners { propertyChanged(propertyName, value) } - } + protected suspend fun log(message: String, priority: Int = 0) { + sharedLogFlow.emit(LogEntry(message, priority)) } override val propertyDescriptors: Collection @@ -72,8 +75,8 @@ public abstract class DeviceBase(override val context: Context) : Device { ) } - override suspend fun execute(command: String, argument: MetaItem<*>?): MetaItem<*>? = - (_actions[command] ?: error("Request with name $command not defined")).invoke(argument) + override suspend fun execute(action: String, argument: MetaItem<*>?): MetaItem<*>? = + (_actions[action] ?: error("Request with name $action not defined")).invoke(argument) @OptIn(ExperimentalCoroutinesApi::class) private open inner class BasicReadOnlyDeviceProperty( @@ -94,8 +97,8 @@ public abstract class DeviceBase(override val context: Context) : Device { override fun updateLogical(item: MetaItem<*>) { state.value = item - notifyListeners { - propertyChanged(name, item) + scope.launch { + sharedPropertyFlow.emit(Pair(name, item)) } } @@ -206,11 +209,7 @@ public abstract class DeviceBase(override val context: Context) : Device { ) : DeviceAction { override suspend fun invoke(arg: MetaItem<*>?): MetaItem<*>? = withContext(scope.coroutineContext + SupervisorJob(scope.coroutineContext[Job])) { - block(arg).also { - notifyListeners { - actionExecuted(name, arg, it) - } - } + block(arg) } } diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt index 6675003..881a24a 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt @@ -1,62 +1,42 @@ package hep.dataforge.control.controllers -import hep.dataforge.control.api.* +import hep.dataforge.control.api.Device +import hep.dataforge.control.api.DeviceHub +import hep.dataforge.control.api.get import hep.dataforge.control.messages.* -import hep.dataforge.io.Consumer -import hep.dataforge.io.Envelope -import hep.dataforge.io.Responder -import hep.dataforge.io.SimpleEnvelope -import hep.dataforge.meta.* +import hep.dataforge.meta.DFExperimental +import hep.dataforge.meta.Meta +import hep.dataforge.meta.MetaItem import hep.dataforge.names.Name import hep.dataforge.names.toName -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.consumeAsFlow -import kotlinx.coroutines.launch -import kotlinx.io.Binary +import kotlinx.coroutines.flow.map +/** + * The [DeviceController] wraps device operations in [DeviceMessage] + */ @OptIn(DFExperimental::class) public class DeviceController( public val device: Device, public val deviceName: String, - public val scope: CoroutineScope = device.scope, -) : Responder, Consumer, DeviceListener { +) { - init { - device.registerListener(this, this) + private val propertyChanges = device.propertyFlow.map { (propertyName: String, value: MetaItem<*>) -> + PropertyChangedMessage( + sourceDevice = deviceName, + key = propertyName, + value = value, + ) } - private val outputChannel = Channel(Channel.CONFLATED) + /** + * The flow of outgoing messages + */ + public val messages: Flow get() = propertyChanges public suspend fun respondMessage(message: DeviceMessage): DeviceMessage = respondMessage(device, deviceName, message) - override suspend fun respond(request: Envelope): Envelope = respond(device, deviceName, request) - - override fun propertyChanged(propertyName: String, value: MetaItem<*>?) { - if (value == null) return - scope.launch { - val change = PropertyChangedMessage( - sourceDevice = deviceName, - key = propertyName, - value = value, - ) - val envelope = SimpleEnvelope(change.toMeta(), Binary.EMPTY) - - outputChannel.send(envelope) - } - } - - public fun receiving(): Flow = outputChannel.consumeAsFlow() - - @DFExperimental - override fun consume(message: Envelope) { - // Fire the respond procedure and forget about the result - scope.launch { - respond(message) - } - } public companion object { public const val GET_PROPERTY_ACTION: String = "read" @@ -65,29 +45,21 @@ public class DeviceController( public const val PROPERTY_LIST_ACTION: String = "propertyList" public const val ACTION_LIST_ACTION: String = "actionList" - internal suspend fun respond(device: Device, deviceTarget: String, request: Envelope): Envelope { - val target = request.meta["target"].string - return try { - if (request.data == null) { - respondMessage(device, deviceTarget, DeviceMessage.fromMeta(request.meta)).toEnvelope() - } else if (target != null && target != deviceTarget) { - error("Wrong target name $deviceTarget expected but $target found") - } else { - if (device is ResponderDevice) { - val response = device.respondWithData(request).apply { - meta { - "target" put request.meta["source"].string - "source" put deviceTarget - } - } - response.seal() - } else error("Device does not support binary response") - } - } catch (ex: Exception) { - val requestSourceName = request.meta[DeviceMessage.SOURCE_KEY].string - DeviceMessage.error(ex, sourceDevice = deviceTarget, targetDevice = requestSourceName).toEnvelope() - } - } +// internal suspend fun respond(device: Device, deviceTarget: String, request: Envelope): Envelope { +// val target = request.meta["target"].string +// return try { +// if (device is Responder) { +// device.respond(request) +// } else if (request.data == null) { +// respondMessage(device, deviceTarget, DeviceMessage.fromMeta(request.meta)).toEnvelope() +// } else if (target != null && target != deviceTarget) { +// error("Wrong target name $deviceTarget expected but $target found") +// } else error("Device does not support binary response") +// } catch (ex: Exception) { +// val requestSourceName = request.meta[DeviceMessage.SOURCE_KEY].string +// DeviceMessage.error(ex, sourceDevice = deviceTarget, targetDevice = requestSourceName).toEnvelope() +// } +// } internal suspend fun respondMessage( device: Device, diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt index 0d55d86..0698a49 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt @@ -1,67 +1,53 @@ package hep.dataforge.control.controllers import hep.dataforge.control.api.DeviceHub -import hep.dataforge.control.api.DeviceListener import hep.dataforge.control.api.get import hep.dataforge.control.messages.DeviceMessage -import hep.dataforge.control.messages.PropertyChangedMessage -import hep.dataforge.control.messages.toEnvelope -import hep.dataforge.io.Consumer -import hep.dataforge.io.Envelope -import hep.dataforge.io.Responder import hep.dataforge.meta.DFExperimental -import hep.dataforge.meta.MetaItem -import hep.dataforge.meta.get -import hep.dataforge.meta.string import hep.dataforge.names.Name -import hep.dataforge.names.NameToken import hep.dataforge.names.toName -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.consumeAsFlow -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch @OptIn(DFExperimental::class) public class HubController( public val hub: DeviceHub, - public val scope: CoroutineScope, -) : Consumer, Responder { +) { private val messageOutbox = Channel(Channel.CONFLATED) - private val envelopeOutbox = Channel(Channel.CONFLATED) +// private val envelopeOutbox = Channel(Channel.CONFLATED) public fun messageOutput(): Flow = messageOutbox.consumeAsFlow() - public fun envelopeOutput(): Flow = envelopeOutbox.consumeAsFlow() +// public fun envelopeOutput(): Flow = envelopeOutbox.consumeAsFlow() - private val packJob = scope.launch { - while (isActive) { - val message = messageOutbox.receive() - envelopeOutbox.send(message.toEnvelope()) - } - } +// private val packJob = scope.launch { +// while (isActive) { +// val message = messageOutbox.receive() +// envelopeOutbox.send(message.toEnvelope()) +// } +// } - private val listeners: Map = hub.devices.mapValues { (name, device) -> - object : DeviceListener { - override fun propertyChanged(propertyName: String, value: MetaItem<*>?) { - if (value == null) return - scope.launch { - val change = PropertyChangedMessage( - sourceDevice = name.toString(), - key = propertyName, - value = value - ) - messageOutbox.send(change) - } - } - }.also { - device.registerListener(it) - } - } +// private val listeners: Map = hub.devices.mapValues { (deviceNameToken, device) -> +// object : DeviceListener { +// override fun propertyChanged(propertyName: String, value: MetaItem<*>?) { +// if (value == null) return +// scope.launch { +// val change = PropertyChangedMessage( +// sourceDevice = deviceNameToken.toString(), +// key = propertyName, +// value = value +// ) +// messageOutbox.send(change) +// } +// } +// }.also { +// device.registerListener(it) +// } +// } public suspend fun respondMessage(message: DeviceMessage): DeviceMessage = try { val targetName = message.targetDevice?.toName() ?: Name.EMPTY @@ -70,24 +56,24 @@ public class HubController( } catch (ex: Exception) { DeviceMessage.error(ex, sourceDevice = null, targetDevice = message.sourceDevice) } - - override suspend fun respond(request: Envelope): Envelope = try { - val targetName = request.meta[DeviceMessage.TARGET_KEY].string?.toName() ?: Name.EMPTY - val device = hub[targetName] ?: error("The device with name $targetName not found in $hub") - if (request.data == null) { - DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.fromMeta(request.meta)) - .toEnvelope() - } else { - DeviceController.respond(device, targetName.toString(), request) - } - } catch (ex: Exception) { - DeviceMessage.error(ex, sourceDevice = null).toEnvelope() - } - - override fun consume(message: Envelope) { - // Fire the respond procedure and forget about the result - scope.launch { - respond(message) - } - } +// +// override suspend fun respond(request: Envelope): Envelope = try { +// val targetName = request.meta[DeviceMessage.TARGET_KEY].string?.toName() ?: Name.EMPTY +// val device = hub[targetName] ?: error("The device with name $targetName not found in $hub") +// if (request.data == null) { +// DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.fromMeta(request.meta)) +// .toEnvelope() +// } else { +// DeviceController.respond(device, targetName.toString(), request) +// } +// } catch (ex: Exception) { +// DeviceMessage.error(ex, sourceDevice = null).toEnvelope() +// } +// +// override fun consume(message: Envelope) { +// // Fire the respond procedure and forget about the result +// scope.launch { +// respond(message) +// } +// } } \ No newline at end of file diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/flowPropertyChanges.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/flowPropertyChanges.kt deleted file mode 100644 index b8c703c..0000000 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/flowPropertyChanges.kt +++ /dev/null @@ -1,30 +0,0 @@ -package hep.dataforge.control.controllers - -import hep.dataforge.control.api.Device -import hep.dataforge.control.api.DeviceListener -import hep.dataforge.meta.MetaItem -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.channels.awaitClose -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.callbackFlow -import kotlinx.coroutines.launch - -/** - * Flow changes of all properties of a given device ignoring invalidation events - */ -@OptIn(ExperimentalCoroutinesApi::class) -public suspend fun Device.flowPropertyChanges(): Flow>> = callbackFlow { - val listener = object : DeviceListener { - override fun propertyChanged(propertyName: String, value: MetaItem<*>?) { - if (value != null) { - launch { - send(propertyName to value) - } - } - } - } - registerListener(listener) - awaitClose { - removeListeners(listener) - } -} \ No newline at end of file diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/messages/DeviceMessage.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/messages/DeviceMessage.kt index 70ebf5f..d1edaa4 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/messages/DeviceMessage.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/messages/DeviceMessage.kt @@ -2,8 +2,6 @@ package hep.dataforge.control.messages import hep.dataforge.io.SimpleEnvelope import hep.dataforge.meta.* -import hep.dataforge.names.Name -import hep.dataforge.names.asName import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable import kotlinx.serialization.json.Json @@ -18,9 +16,6 @@ public sealed class DeviceMessage{ public companion object { - public val SOURCE_KEY: Name = DeviceMessage::sourceDevice.name.asName() - public val TARGET_KEY: Name = DeviceMessage::targetDevice.name.asName() - public fun error( cause: Throwable, sourceDevice: String?, @@ -131,6 +126,18 @@ public data class EmptyDeviceMessage( override val comment: String? = null, ) : DeviceMessage() +/** + * Information log message + */ +@Serializable +@SerialName("log") +public data class DeviceLogMessage( + val message: String, + override val sourceDevice: String? = null, + override val targetDevice: String? = null, + override val comment: String? = null, +) : DeviceMessage() + /** * The evaluation of the message produced a service error */ diff --git a/dataforge-device-server/build.gradle.kts b/dataforge-device-server/build.gradle.kts index b440609..c7a1a0a 100644 --- a/dataforge-device-server/build.gradle.kts +++ b/dataforge-device-server/build.gradle.kts @@ -3,10 +3,6 @@ plugins { id("ru.mipt.npm.publish") } -kscience { - useSerialization() -} - val dataforgeVersion: String by rootProject.extra val ktorVersion: String by rootProject.extra diff --git a/dataforge-device-tcp/build.gradle.kts b/dataforge-device-tcp/build.gradle.kts index b1e74ec..0a77dde 100644 --- a/dataforge-device-tcp/build.gradle.kts +++ b/dataforge-device-tcp/build.gradle.kts @@ -2,13 +2,8 @@ plugins { id("ru.mipt.npm.mpp") } - val ktorVersion: String by rootProject.extra -kscience{ - useCoroutines() -} - kotlin { sourceSets { commonMain { diff --git a/demo/build.gradle.kts b/demo/build.gradle.kts index 830edf1..a25e12a 100644 --- a/demo/build.gradle.kts +++ b/demo/build.gradle.kts @@ -23,6 +23,8 @@ dependencies{ implementation("no.tornado:tornadofx:1.7.20") implementation(kotlin("stdlib-jdk8")) implementation("kscience.plotlykt:plotlykt-server:0.3.0") + + implementation("com.github.Ricky12Awesome:json-schema-serialization:0.6.6") } tasks.withType().configureEach { @@ -37,5 +39,5 @@ javafx{ } application{ - mainClassName = "hep.dataforge.control.demo.DemoControllerViewKt" + mainClass.set("hep.dataforge.control.demo.DemoControllerViewKt") } \ No newline at end of file diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/generateMessageSchema.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/generateMessageSchema.kt new file mode 100644 index 0000000..dc631f0 --- /dev/null +++ b/demo/src/main/kotlin/hep/dataforge/control/demo/generateMessageSchema.kt @@ -0,0 +1,10 @@ +package hep.dataforge.control.demo + +import com.github.ricky12awesome.jss.encodeToSchema +import com.github.ricky12awesome.jss.globalJson +import hep.dataforge.control.messages.DeviceMessage + +fun main() { + val schema = globalJson.encodeToSchema(DeviceMessage.serializer(), generateDefinitions = false) + println(schema) +} \ No newline at end of file diff --git a/magix/magix-api/build.gradle.kts b/magix/magix-api/build.gradle.kts index dd6269b..0b5a962 100644 --- a/magix/magix-api/build.gradle.kts +++ b/magix/magix-api/build.gradle.kts @@ -7,7 +7,6 @@ kscience { useSerialization{ json() } - useCoroutines("1.4.0", configuration = ru.mipt.npm.gradle.DependencyConfiguration.API) } val dataforgeVersion: String by rootProject.extra diff --git a/magix/magix-service/build.gradle.kts b/magix/magix-service/build.gradle.kts index abf4c03..9e95bd2 100644 --- a/magix/magix-service/build.gradle.kts +++ b/magix/magix-service/build.gradle.kts @@ -7,7 +7,6 @@ kscience { useSerialization{ json() } - useCoroutines("1.4.1", configuration = ru.mipt.npm.gradle.DependencyConfiguration.API) } val dataforgeVersion: String by rootProject.extra