diff --git a/build.gradle.kts b/build.gradle.kts index 7794d5d..f28deb6 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -4,20 +4,19 @@ plugins { kotlin("js") apply false } -val dataforgeVersion: String by extra("0.2.0-dev-4") -val ktorVersion: String by extra("1.4.1") -val rsocketVersion by extra("0.11.1") +val dataforgeVersion: String by extra("0.2.1-dev-2") +val ktorVersion: String by extra("1.5.0") +val rsocketVersion by extra("0.12.0") allprojects { repositories { mavenLocal() - maven("https://dl.bintray.com/pdvrieze/maven") - maven("http://maven.jzy3d.org/releases") + //maven("https://dl.bintray.com/pdvrieze/maven") + //maven("http://maven.jzy3d.org/releases") maven("https://kotlin.bintray.com/js-externals") maven("https://maven.pkg.github.com/altavir/kotlin-logging/") - maven("https://dl.bintray.com/rsocket-admin/RSocket") - maven("https://maven.pkg.github.com/altavir/ktor-client-sse") -// maven("https://oss.jfrog.org/oss-snapshot-local") + //maven("https://dl.bintray.com/rsocket-admin/RSocket") + //maven("https://maven.pkg.github.com/altavir/ktor-client-sse") } group = "hep.dataforge" @@ -25,7 +24,7 @@ allprojects { } ksciencePublish { - githubProject = "dataforge-control" + githubProject = "controls.kt" bintrayRepo = "dataforge" } diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt index d6f5d5f..01762a2 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Device.kt @@ -2,13 +2,12 @@ package hep.dataforge.control.api import hep.dataforge.context.ContextAware import hep.dataforge.control.api.Device.Companion.DEVICE_TARGET -import hep.dataforge.io.Envelope -import hep.dataforge.io.EnvelopeBuilder import hep.dataforge.meta.Meta import hep.dataforge.meta.MetaItem import hep.dataforge.provider.Type import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.cancel +import kotlinx.coroutines.flow.SharedFlow import kotlinx.io.Closeable /** @@ -28,27 +27,15 @@ public interface Device : Closeable, ContextAware { public val actionDescriptors: Collection /** - * The supervisor scope encompassing all operations on a device. When canceled, cancels all running processes + * The supervisor scope encompassing all operations on a device. When canceled, cancels all running processes. */ public val scope: CoroutineScope - /** - * Register a new property change listener for this device. - * [owner] is provided optionally in order for listener to be - * easily removable - */ - public fun registerListener(listener: DeviceListener, owner: Any? = listener) - - /** - * Remove all listeners belonging to the specified owner - */ - public fun removeListeners(owner: Any?) - /** * Get the value of the property or throw error if property in not defined. * Suspend if property value is not available */ - public suspend fun getProperty(propertyName: String): MetaItem<*> + public suspend fun getProperty(propertyName: String): MetaItem<*>? /** * Invalidate property and force recalculate @@ -61,11 +48,16 @@ public interface Device : Closeable, ContextAware { */ public suspend fun setProperty(propertyName: String, value: MetaItem<*>) + /** + * The [SharedFlow] of property changes + */ + public val propertyFlow: SharedFlow>> + /** * Send an action request and suspend caller while request is being processed. * Could return null if request does not return a meaningful answer. */ - public suspend fun execute(command: String, argument: MetaItem<*>? = null): MetaItem<*>? + public suspend fun execute(action: String, argument: MetaItem<*>? = null): MetaItem<*>? override fun close() { scope.cancel("The device is closed") @@ -76,14 +68,10 @@ public interface Device : Closeable, ContextAware { } } -public interface ResponderDevice{ - /** - * - * A request with binary data or for binary response (or both). This request does not cover basic functionality like - * [setProperty], [getProperty] or [execute] and not defined for a generic device. - * - */ - public suspend fun respondWithData(request: Envelope): EnvelopeBuilder +public suspend fun Device.getState(): Meta = Meta{ + for(descriptor in propertyDescriptors) { + descriptor.name put getProperty(descriptor.name) + } } -public suspend fun Device.execute(name: String, meta: Meta?): MetaItem<*>? = execute(name, meta?.let { MetaItem.NodeItem(it) }) \ No newline at end of file +//public suspend fun Device.execute(name: String, meta: Meta?): MetaItem<*>? = execute(name, meta?.let { MetaItem.NodeItem(it) }) \ No newline at end of file diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceListener.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceListener.kt deleted file mode 100644 index 483d05c..0000000 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/DeviceListener.kt +++ /dev/null @@ -1,14 +0,0 @@ -package hep.dataforge.control.api - -import hep.dataforge.meta.MetaItem - -/** - * PropertyChangeListener Interface - * [value] is a new value that property has after a change; null is for invalid state. - */ -public interface DeviceListener { - public fun propertyChanged(propertyName: String, value: MetaItem<*>?) - public fun actionExecuted(action: String, argument: MetaItem<*>?, result: MetaItem<*>?) {} - - //TODO add general message listener method -} \ No newline at end of file diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt index 312cc21..9ac7e2b 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/DeviceBase.kt @@ -3,18 +3,25 @@ package hep.dataforge.control.base import hep.dataforge.context.Context import hep.dataforge.control.api.ActionDescriptor import hep.dataforge.control.api.Device -import hep.dataforge.control.api.DeviceListener import hep.dataforge.control.api.PropertyDescriptor +import hep.dataforge.meta.DFExperimental import hep.dataforge.meta.MetaItem import kotlinx.coroutines.* +import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +//TODO move to DataForge-core +@DFExperimental +public data class LogEntry(val content: String, val priority: Int = 0) + /** * Baseline implementation of [Device] interface */ +@Suppress("EXPERIMENTAL_API_USAGE") public abstract class DeviceBase(override val context: Context) : Device { private val _properties = HashMap() @@ -22,25 +29,21 @@ public abstract class DeviceBase(override val context: Context) : Device { private val _actions = HashMap() public val actions: Map get() = _actions - private val listeners = ArrayList>(4) + private val sharedPropertyFlow = MutableSharedFlow>>() - override fun registerListener(listener: DeviceListener, owner: Any?) { - listeners.add(owner to listener) - } + override val propertyFlow: SharedFlow>> get() = sharedPropertyFlow - override fun removeListeners(owner: Any?) { - listeners.removeAll { it.first == owner } - } + private val sharedLogFlow = MutableSharedFlow() - internal fun notifyListeners(block: DeviceListener.() -> Unit) { - listeners.forEach { it.second.block() } - } + /** + * The [SharedFlow] of log messages + */ + @DFExperimental + public val logFlow: SharedFlow + get() = sharedLogFlow - public fun notifyPropertyChanged(propertyName: String) { - scope.launch { - val value = getProperty(propertyName) - notifyListeners { propertyChanged(propertyName, value) } - } + protected suspend fun log(message: String, priority: Int = 0) { + sharedLogFlow.emit(LogEntry(message, priority)) } override val propertyDescriptors: Collection @@ -72,8 +75,8 @@ public abstract class DeviceBase(override val context: Context) : Device { ) } - override suspend fun execute(command: String, argument: MetaItem<*>?): MetaItem<*>? = - (_actions[command] ?: error("Request with name $command not defined")).invoke(argument) + override suspend fun execute(action: String, argument: MetaItem<*>?): MetaItem<*>? = + (_actions[action] ?: error("Request with name $action not defined")).invoke(argument) @OptIn(ExperimentalCoroutinesApi::class) private open inner class BasicReadOnlyDeviceProperty( @@ -94,8 +97,8 @@ public abstract class DeviceBase(override val context: Context) : Device { override fun updateLogical(item: MetaItem<*>) { state.value = item - notifyListeners { - propertyChanged(name, item) + scope.launch { + sharedPropertyFlow.emit(Pair(name, item)) } } @@ -121,7 +124,7 @@ public abstract class DeviceBase(override val context: Context) : Device { /** * Create a bound read-only property with given [getter] */ - public fun newReadOnlyProperty( + public fun createReadOnlyProperty( name: String, default: MetaItem<*>?, descriptorBuilder: PropertyDescriptor.() -> Unit = {}, @@ -178,7 +181,7 @@ public abstract class DeviceBase(override val context: Context) : Device { /** * Create a bound mutable property with given [getter] and [setter] */ - public fun newMutableProperty( + internal fun createMutableProperty( name: String, default: MetaItem<*>?, descriptorBuilder: PropertyDescriptor.() -> Unit = {}, @@ -206,18 +209,14 @@ public abstract class DeviceBase(override val context: Context) : Device { ) : DeviceAction { override suspend fun invoke(arg: MetaItem<*>?): MetaItem<*>? = withContext(scope.coroutineContext + SupervisorJob(scope.coroutineContext[Job])) { - block(arg).also { - notifyListeners { - actionExecuted(name, arg, it) - } - } + block(arg) } } /** * Create a new bound action */ - public fun newAction( + internal fun createAction( name: String, descriptorBuilder: ActionDescriptor.() -> Unit = {}, block: suspend (MetaItem<*>?) -> MetaItem<*>?, diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/actionDelegates.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/actionDelegates.kt index 79d6019..1de2cf7 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/actionDelegates.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/actionDelegates.kt @@ -24,7 +24,7 @@ private class ActionProvider( ) : PropertyDelegateProvider { override operator fun provideDelegate(thisRef: D, property: KProperty<*>): ActionDelegate { val name = property.name - owner.newAction(name, descriptorBuilder, block) + owner.createAction(name, descriptorBuilder, block) return owner.provideAction() } } diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/devicePropertyDelegates.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/devicePropertyDelegates.kt index 420c947..008e2ff 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/devicePropertyDelegates.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/base/devicePropertyDelegates.kt @@ -36,7 +36,7 @@ private class ReadOnlyDevicePropertyProvider( override operator fun provideDelegate(thisRef: D, property: KProperty<*>): ReadOnlyPropertyDelegate { val name = property.name - owner.newReadOnlyProperty(name, default, descriptorBuilder, getter) + owner.createReadOnlyProperty(name, default, descriptorBuilder, getter) return owner.provideProperty(name) } } @@ -51,7 +51,7 @@ private class TypedReadOnlyDevicePropertyProvider( override operator fun provideDelegate(thisRef: D, property: KProperty<*>): TypedReadOnlyPropertyDelegate { val name = property.name - owner.newReadOnlyProperty(name, default, descriptorBuilder, getter) + owner.createReadOnlyProperty(name, default, descriptorBuilder, getter) return owner.provideProperty(name, converter) } } @@ -178,7 +178,7 @@ private class DevicePropertyProvider( override operator fun provideDelegate(thisRef: D, property: KProperty<*>): PropertyDelegate { val name = property.name - owner.newMutableProperty(name, default, descriptorBuilder, getter, setter) + owner.createMutableProperty(name, default, descriptorBuilder, getter, setter) return owner.provideMutableProperty(name) } } @@ -194,7 +194,7 @@ private class TypedDevicePropertyProvider( override operator fun provideDelegate(thisRef: D, property: KProperty<*>): TypedPropertyDelegate { val name = property.name - owner.newMutableProperty(name, default, descriptorBuilder, getter, setter) + owner.createMutableProperty(name, default, descriptorBuilder, getter, setter) return owner.provideMutableProperty(name, converter) } } diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt index 5b670cd..ed4dfcb 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt @@ -1,63 +1,42 @@ package hep.dataforge.control.controllers -import hep.dataforge.control.api.* -import hep.dataforge.control.controllers.DeviceMessage.Companion.PROPERTY_CHANGED_ACTION -import hep.dataforge.io.Consumer -import hep.dataforge.io.Envelope -import hep.dataforge.io.Responder -import hep.dataforge.io.SimpleEnvelope -import hep.dataforge.meta.* +import hep.dataforge.control.api.Device +import hep.dataforge.control.api.DeviceHub +import hep.dataforge.control.api.get +import hep.dataforge.control.messages.* +import hep.dataforge.meta.DFExperimental +import hep.dataforge.meta.Meta +import hep.dataforge.meta.MetaItem import hep.dataforge.names.Name import hep.dataforge.names.toName -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.consumeAsFlow -import kotlinx.coroutines.launch -import kotlinx.io.Binary +import kotlinx.coroutines.flow.map +/** + * The [DeviceController] wraps device operations in [DeviceMessage] + */ @OptIn(DFExperimental::class) public class DeviceController( public val device: Device, - public val deviceTarget: String, - public val scope: CoroutineScope = device.scope, -) : Responder, Consumer, DeviceListener { + public val deviceName: String, +) { - init { - device.registerListener(this, this) + private val propertyChanges = device.propertyFlow.map { (propertyName: String, value: MetaItem<*>) -> + PropertyChangedMessage( + sourceDevice = deviceName, + key = propertyName, + value = value, + ) } - private val outputChannel = Channel(Channel.CONFLATED) + /** + * The flow of outgoing messages + */ + public val messages: Flow get() = propertyChanges public suspend fun respondMessage(message: DeviceMessage): DeviceMessage = - respondMessage(device, deviceTarget, message) + respondMessage(device, deviceName, message) - override suspend fun respond(request: Envelope): Envelope = respond(device, deviceTarget, request) - - override fun propertyChanged(propertyName: String, value: MetaItem<*>?) { - if (value == null) return - scope.launch { - val change = DeviceMessage( - sourceName = deviceTarget, - action = PROPERTY_CHANGED_ACTION, - key = propertyName, - value = value, - ) - val envelope = SimpleEnvelope(change.toMeta(), Binary.EMPTY) - - outputChannel.send(envelope) - } - } - - public fun receiving(): Flow = outputChannel.consumeAsFlow() - - @DFExperimental - override fun consume(message: Envelope) { - // Fire the respond procedure and forget about the result - scope.launch { - respond(message) - } - } public companion object { public const val GET_PROPERTY_ACTION: String = "read" @@ -66,86 +45,99 @@ public class DeviceController( public const val PROPERTY_LIST_ACTION: String = "propertyList" public const val ACTION_LIST_ACTION: String = "actionList" - internal suspend fun respond(device: Device, deviceTarget: String, request: Envelope): Envelope { - val target = request.meta["target"].string - return try { - if (request.data == null) { - respondMessage(device, deviceTarget, DeviceMessage.fromMeta(request.meta)).toEnvelope() - } else if (target != null && target != deviceTarget) { - error("Wrong target name $deviceTarget expected but $target found") - } else { - if (device is ResponderDevice) { - val response = device.respondWithData(request).apply { - meta { - "target" put request.meta["source"].string - "source" put deviceTarget - } - } - response.seal() - } else error("Device does not support binary response") - } - } catch (ex: Exception) { - DeviceMessage.fail(ex).toEnvelope() - } - } +// internal suspend fun respond(device: Device, deviceTarget: String, request: Envelope): Envelope { +// val target = request.meta["target"].string +// return try { +// if (device is Responder) { +// device.respond(request) +// } else if (request.data == null) { +// respondMessage(device, deviceTarget, DeviceMessage.fromMeta(request.meta)).toEnvelope() +// } else if (target != null && target != deviceTarget) { +// error("Wrong target name $deviceTarget expected but $target found") +// } else error("Device does not support binary response") +// } catch (ex: Exception) { +// val requestSourceName = request.meta[DeviceMessage.SOURCE_KEY].string +// DeviceMessage.error(ex, sourceDevice = deviceTarget, targetDevice = requestSourceName).toEnvelope() +// } +// } internal suspend fun respondMessage( device: Device, deviceTarget: String, request: DeviceMessage, ): DeviceMessage = try { - val requestKey = request.key - val requestValue = request.value - var key: String? = null - var value: MetaItem<*>? = null - when (val action = request.action) { - GET_PROPERTY_ACTION -> { - key = requestKey - value = device.getProperty(requestKey ?: error("Key field is not defined in request")) + when (request) { + is PropertyGetMessage -> { + PropertyChangedMessage( + key = request.property, + value = device.getProperty(request.property), + sourceDevice = deviceTarget, + targetDevice = request.sourceDevice + ) } - SET_PROPERTY_ACTION -> { - require(requestKey != null) { "Key field is not defined in request" } - if (requestValue == null) { - device.invalidateProperty(requestKey) - } else { - device.setProperty(requestKey, requestValue) - } - key = requestKey - value = device.getProperty(requestKey) - } - EXECUTE_ACTION -> { - require(requestKey != null) { "Key field is not defined in request" } - key = requestKey - value = device.execute(requestKey, requestValue) + is PropertySetMessage -> { + if (request.value == null) { + device.invalidateProperty(request.property) + } else { + device.setProperty(request.property, request.value) + } + PropertyChangedMessage( + key = request.property, + value = device.getProperty(request.property), + sourceDevice = deviceTarget, + targetDevice = request.sourceDevice + ) } - PROPERTY_LIST_ACTION -> { - value = Meta { - device.propertyDescriptors.map { descriptor -> - descriptor.name put descriptor.config + + is ActionExecuteMessage -> { + ActionResultMessage( + action = request.action, + result = device.execute(request.action, request.argument), + sourceDevice = deviceTarget, + targetDevice = request.sourceDevice + ) + } + + is GetDescriptionMessage -> { + val descriptionMeta = Meta { + "properties" put { + device.propertyDescriptors.map { descriptor -> + descriptor.name put descriptor.config + } } - }.asMetaItem() - } - ACTION_LIST_ACTION -> { - value = Meta { - device.actionDescriptors.map { descriptor -> - descriptor.name put descriptor.config + "actions" put { + device.actionDescriptors.map { descriptor -> + descriptor.name put descriptor.config + } } - }.asMetaItem() + } + + DescriptionMessage( + description = descriptionMeta, + sourceDevice = deviceTarget, + targetDevice = request.sourceDevice + ) } - else -> { - error("Unrecognized action $action") + + is DescriptionMessage, + is PropertyChangedMessage, + is ActionResultMessage, + is BinaryNotificationMessage, + is DeviceErrorMessage, + is EmptyDeviceMessage, + is DeviceLogMessage, + -> { + //Those messages are ignored + EmptyDeviceMessage( + sourceDevice = deviceTarget, + targetDevice = request.sourceDevice, + comment = "The message is ignored" + ) } } - DeviceMessage( - targetName = request.sourceName, - sourceName = deviceTarget, - action = "response.${request.action}", - key = key, - value = value - ) } catch (ex: Exception) { - DeviceMessage.fail(ex, request.action).respondsTo(request) + DeviceMessage.error(ex, sourceDevice = deviceTarget, targetDevice = request.sourceDevice) } } } @@ -153,10 +145,10 @@ public class DeviceController( public suspend fun DeviceHub.respondMessage(request: DeviceMessage): DeviceMessage { return try { - val targetName = request.targetName?.toName() ?: Name.EMPTY + val targetName = request.targetDevice?.toName() ?: Name.EMPTY val device = this[targetName] ?: error("The device with name $targetName not found in $this") DeviceController.respondMessage(device, targetName.toString(), request) } catch (ex: Exception) { - DeviceMessage.fail(ex, request.action).respondsTo(request) + DeviceMessage.error(ex, sourceDevice = request.targetDevice, targetDevice = request.sourceDevice) } } diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceManager.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceManager.kt index 11a2399..d434819 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceManager.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceManager.kt @@ -20,7 +20,7 @@ public class DeviceManager : AbstractPlugin(), DeviceHub { override val devices: Map get() = top public val controller: HubController by lazy { - HubController(this, context) + HubController(this) } public fun registerDevice(name: NameToken, device: Device) { diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt deleted file mode 100644 index 1af7bcd..0000000 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt +++ /dev/null @@ -1,61 +0,0 @@ -package hep.dataforge.control.controllers - -import hep.dataforge.io.SimpleEnvelope -import hep.dataforge.meta.* -import hep.dataforge.names.Name -import hep.dataforge.names.asName -import kotlinx.serialization.Serializable -import kotlinx.serialization.json.Json -import kotlinx.serialization.json.decodeFromJsonElement -import kotlinx.serialization.json.encodeToJsonElement - -@Serializable -public data class DeviceMessage( - public val action: String, - public val status: String = OK_STATUS, - public val sourceName: String? = null, - public val targetName: String? = null, - public val comment: String? = null, - public val key: String? = null, - public val value: MetaItem<*>? = null, -) { - public companion object { - public val SOURCE_KEY: Name = DeviceMessage::sourceName.name.asName() - public val TARGET_KEY: Name = DeviceMessage::targetName.name.asName() - public val MESSAGE_ACTION_KEY: Name = DeviceMessage::action.name.asName() - public val MESSAGE_KEY_KEY: Name = DeviceMessage::key.name.asName() - public val MESSAGE_VALUE_KEY: Name = DeviceMessage::value.name.asName() - - public const val OK_STATUS: String = "OK" - public const val FAIL_STATUS: String = "FAIL" - public const val PROPERTY_CHANGED_ACTION: String = "event.propertyChanged" - - private fun Throwable.toMeta(): Meta = Meta { - "type" put this::class.simpleName - "message" put message - "trace" put stackTraceToString() - } - - public fun fail( - cause: Throwable, - action: String = "undefined", - ): DeviceMessage = DeviceMessage( - action = action, - status = FAIL_STATUS, - value = cause.toMeta().asMetaItem() - ) - - public fun fromMeta(meta: Meta): DeviceMessage = Json.decodeFromJsonElement(meta.toJson()) - } -} - - -public fun DeviceMessage.ok(): DeviceMessage = - copy(status = DeviceMessage.OK_STATUS) - -public fun DeviceMessage.respondsTo(request: DeviceMessage): DeviceMessage = - copy(sourceName = request.targetName, targetName = request.sourceName) - -public fun DeviceMessage.toMeta(): JsonMeta = Json.encodeToJsonElement(this).toMetaItem().node!! - -public fun DeviceMessage.toEnvelope(): SimpleEnvelope = SimpleEnvelope(toMeta(), null) diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt index e60d02a..0698a49 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt @@ -1,89 +1,79 @@ package hep.dataforge.control.controllers import hep.dataforge.control.api.DeviceHub -import hep.dataforge.control.api.DeviceListener import hep.dataforge.control.api.get -import hep.dataforge.io.Consumer -import hep.dataforge.io.Envelope -import hep.dataforge.io.Responder +import hep.dataforge.control.messages.DeviceMessage import hep.dataforge.meta.DFExperimental -import hep.dataforge.meta.MetaItem -import hep.dataforge.meta.get -import hep.dataforge.meta.string import hep.dataforge.names.Name -import hep.dataforge.names.NameToken import hep.dataforge.names.toName -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.SharedFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.consumeAsFlow @OptIn(DFExperimental::class) public class HubController( public val hub: DeviceHub, - public val scope: CoroutineScope, -) : Consumer, Responder { +) { + private val messageOutbox = Channel(Channel.CONFLATED) - private val messageOutbox = MutableSharedFlow() +// private val envelopeOutbox = Channel(Channel.CONFLATED) - private val envelopeOutbox = MutableSharedFlow() + public fun messageOutput(): Flow = messageOutbox.consumeAsFlow() - public val messageOutput: SharedFlow get() = messageOutbox +// public fun envelopeOutput(): Flow = envelopeOutbox.consumeAsFlow() - public val envelopeOutput: SharedFlow get() = envelopeOutbox +// private val packJob = scope.launch { +// while (isActive) { +// val message = messageOutbox.receive() +// envelopeOutbox.send(message.toEnvelope()) +// } +// } - private val packJob = messageOutbox.onEach { message -> - envelopeOutbox.emit(message.toEnvelope()) - }.launchIn(scope) - - private val listeners: Map = hub.devices.mapValues { (name, device) -> - object : DeviceListener { - override fun propertyChanged(propertyName: String, value: MetaItem<*>?) { - if (value == null) return - scope.launch { - val change = DeviceMessage( - sourceName = name.toString(), - action = DeviceMessage.PROPERTY_CHANGED_ACTION, - key = propertyName, - value = value - ) - messageOutbox.emit(change) - } - } - }.also { - device.registerListener(it) - } - } +// private val listeners: Map = hub.devices.mapValues { (deviceNameToken, device) -> +// object : DeviceListener { +// override fun propertyChanged(propertyName: String, value: MetaItem<*>?) { +// if (value == null) return +// scope.launch { +// val change = PropertyChangedMessage( +// sourceDevice = deviceNameToken.toString(), +// key = propertyName, +// value = value +// ) +// messageOutbox.send(change) +// } +// } +// }.also { +// device.registerListener(it) +// } +// } public suspend fun respondMessage(message: DeviceMessage): DeviceMessage = try { - val targetName = message.targetName?.toName() ?: Name.EMPTY + val targetName = message.targetDevice?.toName() ?: Name.EMPTY val device = hub[targetName] ?: error("The device with name $targetName not found in $hub") DeviceController.respondMessage(device, targetName.toString(), message) } catch (ex: Exception) { - DeviceMessage.fail(ex, message.action).respondsTo(message) - } - - override suspend fun respond(request: Envelope): Envelope = try { - val targetName = request.meta[DeviceMessage.TARGET_KEY].string?.toName() ?: Name.EMPTY - val device = hub[targetName] ?: error("The device with name $targetName not found in $hub") - if (request.data == null) { - DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.fromMeta(request.meta)) - .toEnvelope() - } else { - DeviceController.respond(device, targetName.toString(), request) - } - } catch (ex: Exception) { - DeviceMessage.fail(ex).toEnvelope() - } - - override fun consume(message: Envelope) { - // Fire the respond procedure and forget about the result - scope.launch { - respond(message) - } + DeviceMessage.error(ex, sourceDevice = null, targetDevice = message.sourceDevice) } +// +// override suspend fun respond(request: Envelope): Envelope = try { +// val targetName = request.meta[DeviceMessage.TARGET_KEY].string?.toName() ?: Name.EMPTY +// val device = hub[targetName] ?: error("The device with name $targetName not found in $hub") +// if (request.data == null) { +// DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.fromMeta(request.meta)) +// .toEnvelope() +// } else { +// DeviceController.respond(device, targetName.toString(), request) +// } +// } catch (ex: Exception) { +// DeviceMessage.error(ex, sourceDevice = null).toEnvelope() +// } +// +// override fun consume(message: Envelope) { +// // Fire the respond procedure and forget about the result +// scope.launch { +// respond(message) +// } +// } } \ No newline at end of file diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/PropertyFlow.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/PropertyFlow.kt deleted file mode 100644 index e07f8a4..0000000 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/PropertyFlow.kt +++ /dev/null @@ -1,27 +0,0 @@ -package hep.dataforge.control.controllers - -import hep.dataforge.control.api.Device -import hep.dataforge.control.api.DeviceListener -import hep.dataforge.meta.MetaItem -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.channels.awaitClose -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.callbackFlow -import kotlinx.coroutines.launch - -@OptIn(ExperimentalCoroutinesApi::class) -public suspend fun Device.flowValues(): Flow>> = callbackFlow { - val listener = object : DeviceListener { - override fun propertyChanged(propertyName: String, value: MetaItem<*>?) { - if (value != null) { - launch { - send(propertyName to value) - } - } - } - } - registerListener(listener) - awaitClose { - removeListeners(listener) - } -} \ No newline at end of file diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/messages/DeviceMessage.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/messages/DeviceMessage.kt new file mode 100644 index 0000000..4f683bf --- /dev/null +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/messages/DeviceMessage.kt @@ -0,0 +1,159 @@ +package hep.dataforge.control.messages + +import hep.dataforge.io.SimpleEnvelope +import hep.dataforge.meta.* +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.decodeFromJsonElement +import kotlinx.serialization.json.encodeToJsonElement + +@Serializable +public sealed class DeviceMessage { + public abstract val sourceDevice: String? + public abstract val targetDevice: String? + public abstract val comment: String? + + + public companion object { + public fun error( + cause: Throwable, + sourceDevice: String?, + targetDevice: String? = null, + ): DeviceErrorMessage = DeviceErrorMessage( + errorMessage = cause.message, + errorType = cause::class.simpleName, + errorStackTrace = cause.stackTraceToString() + ) + + public fun fromMeta(meta: Meta): DeviceMessage = Json.decodeFromJsonElement(meta.toJson()) + } +} + +@Serializable +@SerialName("property.changed") +public data class PropertyChangedMessage( + public val key: String, + public val value: MetaItem<*>?, + override val sourceDevice: String, + override val targetDevice: String? = null, + override val comment: String? = null, +) : DeviceMessage() + +@Serializable +@SerialName("property.set") +public data class PropertySetMessage( + public val property: String, + public val value: MetaItem<*>?, + override val sourceDevice: String? = null, + override val targetDevice: String, + override val comment: String? = null, +) : DeviceMessage() + +@Serializable +@SerialName("property.get") +public data class PropertyGetMessage( + public val property: String, + override val sourceDevice: String? = null, + override val targetDevice: String, + override val comment: String? = null, +) : DeviceMessage() + +/** + * Request device description + */ +@Serializable +@SerialName("description.get") +public data class GetDescriptionMessage( + override val sourceDevice: String? = null, + override val targetDevice: String, + override val comment: String? = null, +) : DeviceMessage() + +/** + * The full device description message + */ +@Serializable +@SerialName("description") +public data class DescriptionMessage( + val description: Meta, + override val sourceDevice: String, + override val targetDevice: String? = null, + override val comment: String? = null, +) : DeviceMessage() + +@Serializable +@SerialName("action.execute") +public data class ActionExecuteMessage( + public val action: String, + public val argument: MetaItem<*>?, + override val sourceDevice: String? = null, + override val targetDevice: String? = null, + override val comment: String? = null, +) : DeviceMessage() + +@Serializable +@SerialName("action.result") +public data class ActionResultMessage( + public val action: String, + public val result: MetaItem<*>?, + override val sourceDevice: String? = null, + override val targetDevice: String? = null, + override val comment: String? = null, +) : DeviceMessage() + +/** + * Notifies listeners that a new binary with given [binaryID] is available. The binary itself could not be provided via [DeviceMessage] API. + */ +@Serializable +@SerialName("binary.notification") +public data class BinaryNotificationMessage( + val binaryID: String, + override val sourceDevice: String, + override val targetDevice: String? = null, + override val comment: String? = null, +) : DeviceMessage() + +/** + * The message states that the message is received, but no meaningful response is produced. + * This message could be used for a heartbeat. + */ +@Serializable +@SerialName("empty") +public data class EmptyDeviceMessage( + override val sourceDevice: String? = null, + override val targetDevice: String? = null, + override val comment: String? = null, +) : DeviceMessage() + +/** + * Information log message + */ +@Serializable +@SerialName("log") +public data class DeviceLogMessage( + val message: String, + val data: MetaItem<*>? = null, + override val sourceDevice: String? = null, + override val targetDevice: String? = null, + override val comment: String? = null, +) : DeviceMessage() + +/** + * The evaluation of the message produced a service error + */ +@Serializable +@SerialName("error") +public data class DeviceErrorMessage( + public val errorMessage: String?, + public val errorType: String? = null, + public val errorStackTrace: String? = null, + override val sourceDevice: String? = null, + override val targetDevice: String? = null, + override val comment: String? = null, +) : DeviceMessage() + + +public fun DeviceMessage.toMeta(): JsonMeta = Json.encodeToJsonElement(this).toMetaItem().node!! + +public fun DeviceMessage.toEnvelope(): SimpleEnvelope = SimpleEnvelope(toMeta(), null) diff --git a/dataforge-device-server/build.gradle.kts b/dataforge-device-server/build.gradle.kts index b440609..c7a1a0a 100644 --- a/dataforge-device-server/build.gradle.kts +++ b/dataforge-device-server/build.gradle.kts @@ -3,10 +3,6 @@ plugins { id("ru.mipt.npm.publish") } -kscience { - useSerialization() -} - val dataforgeVersion: String by rootProject.extra val ktorVersion: String by rootProject.extra diff --git a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/conversions.kt b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/conversions.kt index 2b86058..c1a154d 100644 --- a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/conversions.kt +++ b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/conversions.kt @@ -1,7 +1,7 @@ package hep.dataforge.control.server -import hep.dataforge.control.controllers.DeviceMessage -import hep.dataforge.control.controllers.toMeta +import hep.dataforge.control.messages.DeviceMessage +import hep.dataforge.control.messages.toMeta import hep.dataforge.io.* import hep.dataforge.meta.MetaSerializer import io.ktor.application.ApplicationCall @@ -30,15 +30,6 @@ internal suspend fun ApplicationCall.respondJson(builder: JsonObjectBuilder.() - respondText(json.toString(), contentType = ContentType.Application.Json) } - public suspend fun ApplicationCall.respondMessage(message: DeviceMessage) { respondText(Json.encodeToString(MetaSerializer, message.toMeta()), contentType = ContentType.Application.Json) -} - -//public suspend fun ApplicationCall.respondMessage(builder: DeviceMessage.() -> Unit) { -// respondMessage(DeviceMessage(builder)) -//} -// -//public suspend fun ApplicationCall.respondFail(builder: DeviceMessage.() -> Unit) { -// respondMessage(DeviceMessage.fail(null, block = builder)) -//} \ No newline at end of file +} \ No newline at end of file diff --git a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt index cffe8ad..55cfba5 100644 --- a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt +++ b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt @@ -4,11 +4,11 @@ package hep.dataforge.control.server import hep.dataforge.control.api.get -import hep.dataforge.control.controllers.DeviceController.Companion.GET_PROPERTY_ACTION -import hep.dataforge.control.controllers.DeviceController.Companion.SET_PROPERTY_ACTION import hep.dataforge.control.controllers.DeviceManager -import hep.dataforge.control.controllers.DeviceMessage import hep.dataforge.control.controllers.respondMessage +import hep.dataforge.control.messages.DeviceMessage +import hep.dataforge.control.messages.PropertyGetMessage +import hep.dataforge.control.messages.PropertySetMessage import hep.dataforge.meta.toJson import hep.dataforge.meta.toMeta import hep.dataforge.meta.toMetaItem @@ -163,24 +163,24 @@ public fun Application.deviceModule( } } } - //Check if application supports websockets and if it does add a push channel - if (this.application.featureOrNull(WebSockets) != null) { - webSocket("ws") { - //subscribe on device - val target: String? by call.request.queryParameters - - try { - application.log.debug("Opened server socket for ${call.request.queryParameters}") - - manager.controller.envelopeOutput.collect { - outgoing.send(it.toFrame()) - } - - } catch (ex: Exception) { - application.log.debug("Closed server socket for ${call.request.queryParameters}") - } - } - } +// //Check if application supports websockets and if it does add a push channel +// if (this.application.featureOrNull(WebSockets) != null) { +// webSocket("ws") { +// //subscribe on device +// val target: String? by call.request.queryParameters +// +// try { +// application.log.debug("Opened server socket for ${call.request.queryParameters}") +// +// manager.controller.envelopeOutput().collect { +// outgoing.send(it.toFrame()) +// } +// +// } catch (ex: Exception) { +// application.log.debug("Closed server socket for ${call.request.queryParameters}") +// } +// } +// } post("message") { val body = call.receiveText() @@ -201,11 +201,10 @@ public fun Application.deviceModule( get("get") { val target: String by call.parameters val property: String by call.parameters - val request = DeviceMessage( - action = GET_PROPERTY_ACTION, - sourceName = WEB_SERVER_TARGET, - targetName = target, - key = property, + val request = PropertyGetMessage( + sourceDevice = WEB_SERVER_TARGET, + targetDevice = target, + property = property, ) val response = manager.respondMessage(request) @@ -217,11 +216,10 @@ public fun Application.deviceModule( val body = call.receiveText() val json = Json.parseToJsonElement(body) - val request = DeviceMessage( - action = SET_PROPERTY_ACTION, - sourceName = WEB_SERVER_TARGET, - targetName = target, - key = property, + val request = PropertySetMessage( + sourceDevice = WEB_SERVER_TARGET, + targetDevice = target, + property = property, value = json.toMetaItem() ) diff --git a/dataforge-device-tcp/build.gradle.kts b/dataforge-device-tcp/build.gradle.kts index b1e74ec..0a77dde 100644 --- a/dataforge-device-tcp/build.gradle.kts +++ b/dataforge-device-tcp/build.gradle.kts @@ -2,13 +2,8 @@ plugins { id("ru.mipt.npm.mpp") } - val ktorVersion: String by rootProject.extra -kscience{ - useCoroutines() -} - kotlin { sourceSets { commonMain { diff --git a/dataforge-magix-client/build.gradle.kts b/dataforge-magix-client/build.gradle.kts index beefe3a..4059a2f 100644 --- a/dataforge-magix-client/build.gradle.kts +++ b/dataforge-magix-client/build.gradle.kts @@ -5,6 +5,12 @@ plugins { val ktorVersion: String by rootProject.extra +kscience{ + useSerialization { + json() + } +} + kotlin { sourceSets { commonMain { diff --git a/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/magixClient.kt b/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/magixClient.kt index 92f86b9..0f35863 100644 --- a/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/magixClient.kt +++ b/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/magixClient.kt @@ -1,8 +1,8 @@ package hep.dataforge.control.client import hep.dataforge.control.controllers.DeviceManager -import hep.dataforge.control.controllers.DeviceMessage import hep.dataforge.control.controllers.respondMessage +import hep.dataforge.control.messages.DeviceMessage import hep.dataforge.magix.api.MagixEndpoint import hep.dataforge.magix.api.MagixMessage import kotlinx.coroutines.Job diff --git a/demo/build.gradle.kts b/demo/build.gradle.kts index 9b73630..a25e12a 100644 --- a/demo/build.gradle.kts +++ b/demo/build.gradle.kts @@ -22,7 +22,9 @@ dependencies{ implementation(project(":dataforge-magix-client")) implementation("no.tornado:tornadofx:1.7.20") implementation(kotlin("stdlib-jdk8")) - implementation("kscience.plotlykt:plotlykt-server:0.3.0-dev-2") + implementation("kscience.plotlykt:plotlykt-server:0.3.0") + + implementation("com.github.Ricky12Awesome:json-schema-serialization:0.6.6") } tasks.withType().configureEach { @@ -37,5 +39,5 @@ javafx{ } application{ - mainClassName = "hep.dataforge.control.demo.DemoControllerViewKt" + mainClass.set("hep.dataforge.control.demo.DemoControllerViewKt") } \ No newline at end of file diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/generateMessageSchema.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/generateMessageSchema.kt new file mode 100644 index 0000000..dc631f0 --- /dev/null +++ b/demo/src/main/kotlin/hep/dataforge/control/demo/generateMessageSchema.kt @@ -0,0 +1,10 @@ +package hep.dataforge.control.demo + +import com.github.ricky12awesome.jss.encodeToSchema +import com.github.ricky12awesome.jss.globalJson +import hep.dataforge.control.messages.DeviceMessage + +fun main() { + val schema = globalJson.encodeToSchema(DeviceMessage.serializer(), generateDefinitions = false) + println(schema) +} \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index be52383..da9702f 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.7-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-6.8-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/magix/magix-api/build.gradle.kts b/magix/magix-api/build.gradle.kts index dd6269b..9babc42 100644 --- a/magix/magix-api/build.gradle.kts +++ b/magix/magix-api/build.gradle.kts @@ -4,10 +4,10 @@ plugins { } kscience { + useCoroutines() useSerialization{ json() } - useCoroutines("1.4.0", configuration = ru.mipt.npm.gradle.DependencyConfiguration.API) } val dataforgeVersion: String by rootProject.extra diff --git a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessageFilter.kt b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessageFilter.kt index d96064d..adb84b5 100644 --- a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessageFilter.kt +++ b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessageFilter.kt @@ -6,8 +6,8 @@ import kotlinx.serialization.Serializable @Serializable public data class MagixMessageFilter( - val format: List? = null, - val origin: List? = null, + val format: List? = null, + val origin: List? = null, val target: List? = null, val action: List? = null, ) { diff --git a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt index 8136b9c..3af51e3 100644 --- a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt +++ b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt @@ -4,36 +4,57 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch +import kotlinx.serialization.KSerializer import kotlinx.serialization.json.JsonElement -public interface MagixProcessor { +public fun interface MagixProcessor { public fun process(endpoint: MagixEndpoint): Job -} -/** - * A converter from one (or several) format to another. It captures all events with the given filter then transforms it - * with given [transformer] and sends back to the loop with given [outputFormat]. - * - * If [newOrigin] is not null, it is used as a replacement for old [MagixMessage.origin] tag. - */ -public class MagixConverter( - private val scope: CoroutineScope, - private val filter: MagixMessageFilter, - private val outputFormat: String, - private val newOrigin: String? = null, - private val transformer: suspend (JsonElement) -> JsonElement, -) : MagixProcessor { - override fun process(endpoint: MagixEndpoint): Job = scope.launch { - endpoint.subscribe(filter).onEach { message -> - val newPayload = transformer(message.payload) - val transformed = message.copy( - payload = newPayload, - format = outputFormat, - origin = newOrigin ?: message.origin - ) - endpoint.broadcast(transformed) - }.launchIn(this) - //TODO add catch logic here + public companion object { + /** + * A converter from one (or several) format to another. It captures all events with the given filter then transforms it + * with given [transformer] and sends back to the loop with given [outputFormat]. + * + * If [newOrigin] is not null, it is used as a replacement for old [MagixMessage.origin] tag. + */ + public fun convert( + scope: CoroutineScope, + filter: MagixMessageFilter, + outputFormat: String, + inputSerializer: KSerializer, + outputSerializer: KSerializer, + newOrigin: String? = null, + transformer: suspend (T) -> R, + ): MagixProcessor = MagixProcessor { endpoint -> + endpoint.subscribe(inputSerializer, filter).onEach { message -> + val newPayload = transformer(message.payload) + val transformed: MagixMessage = MagixMessage( + outputFormat, + newOrigin ?: message.origin, + newPayload, + message.target, + message.id, + message.parentId, + message.user + ) + endpoint.broadcast(outputSerializer, transformed) + }.launchIn(scope) + } } -} \ No newline at end of file + + public fun convert( + scope: CoroutineScope, + filter: MagixMessageFilter, + outputFormat: String, + newOrigin: String? = null, + transformer: suspend (JsonElement) -> JsonElement, + ): MagixProcessor = convert( + scope, + filter, + outputFormat, + JsonElement.serializer(), + JsonElement.serializer(), + newOrigin, + transformer + ) +} diff --git a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt b/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt index 2204a02..18744b1 100644 --- a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt +++ b/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt @@ -52,7 +52,7 @@ internal fun CoroutineScope.magixAcceptor(magixFlow: MutableSharedFlow -> + requestChannel { _: Payload, input: Flow -> input.onEach { magixFlow.emit(magixJson.decodeFromString(genericMessageSerializer, it.data.readText())) }.launchIn(this@magixAcceptor) diff --git a/magix/magix-service/build.gradle.kts b/magix/magix-service/build.gradle.kts index abf4c03..9e95bd2 100644 --- a/magix/magix-service/build.gradle.kts +++ b/magix/magix-service/build.gradle.kts @@ -7,7 +7,6 @@ kscience { useSerialization{ json() } - useCoroutines("1.4.1", configuration = ru.mipt.npm.gradle.DependencyConfiguration.API) } val dataforgeVersion: String by rootProject.extra diff --git a/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt b/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt index 2f1ad13..e92c46d 100644 --- a/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt +++ b/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt @@ -13,15 +13,16 @@ import io.rsocket.kotlin.payload.buildPayload import io.rsocket.kotlin.payload.data import io.rsocket.kotlin.transport.ktor.client.RSocketSupport import io.rsocket.kotlin.transport.ktor.client.rSocket -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map -import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext import kotlinx.serialization.KSerializer import kotlinx.serialization.encodeToString +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.coroutineContext public class RSocketMagixEndpoint( - override val scope: CoroutineScope, + private val coroutineContext: CoroutineContext, private val rSocket: RSocket, ) : MagixEndpoint { @@ -30,19 +31,15 @@ public class RSocketMagixEndpoint( filter: MagixMessageFilter, ): Flow> { val serializer = MagixMessage.serializer(payloadSerializer) - val payload = buildPayload { - data(MagixEndpoint.magixJson.encodeToString(filter)) - } + val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(filter)) } val flow = rSocket.requestStream(payload) return flow.map { MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) } } override suspend fun broadcast(payloadSerializer: KSerializer, message: MagixMessage) { - scope.launch { + withContext(coroutineContext) { val serializer = MagixMessage.serializer(payloadSerializer) - val payload = buildPayload { - data(MagixEndpoint.magixJson.encodeToString(serializer, message)) - } + val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(serializer, message)) } rSocket.fireAndForget(payload) } } @@ -57,7 +54,6 @@ public class RSocketMagixEndpoint( @OptIn(KtorExperimentalAPI::class) public suspend fun withWebSockets( - scope: CoroutineScope, host: String, port: Int, path: String = "/rsocket", @@ -77,8 +73,7 @@ public class RSocketMagixEndpoint( client.close() } - return RSocketMagixEndpoint(scope, rSocket) + return RSocketMagixEndpoint(coroutineContext, rSocket) } } -} - +} \ No newline at end of file diff --git a/motors/build.gradle.kts b/motors/build.gradle.kts index cd2bb4b..c43e16d 100644 --- a/motors/build.gradle.kts +++ b/motors/build.gradle.kts @@ -1,5 +1,3 @@ -import ru.mipt.npm.gradle.useFx - plugins { id("ru.mipt.npm.jvm") id("ru.mipt.npm.publish") @@ -9,11 +7,14 @@ plugins { //TODO to be moved to a separate project application{ - mainClassName = "ru.mipt.npm.devices.pimotionmaster.PiMotionMasterAppKt" + mainClass.set("ru.mipt.npm.devices.pimotionmaster.PiMotionMasterAppKt") } kotlin{ explicitApi = null +} + +kscience{ useFx(ru.mipt.npm.gradle.FXModule.CONTROLS, configuration = ru.mipt.npm.gradle.DependencyConfiguration.IMPLEMENTATION) } diff --git a/settings.gradle.kts b/settings.gradle.kts index a4dee19..d08689d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -1,6 +1,6 @@ pluginManagement { - val kotlinVersion = "1.4.20-M2" - val toolsVersion = "0.6.4-dev-1.4.20-M2" + val kotlinVersion = "1.4.21" + val toolsVersion = "0.7.1" repositories { mavenLocal() @@ -24,20 +24,20 @@ pluginManagement { } } -rootProject.name = "dataforge-control" +rootProject.name = "controls.kt" include( ":dataforge-device-core", ":dataforge-device-tcp", ":dataforge-device-serial", ":dataforge-device-server", - ":dataforge-magix-client", - ":motors", ":demo", ":magix", ":magix:magix-api", ":magix:magix-server", - ":magix:magix-service" + ":magix:magix-service", + ":dataforge-magix-client", + ":motors" ) //includeBuild("../dataforge-core")