diff --git a/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt b/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt index 699a766..a724fd6 100644 --- a/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt +++ b/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt @@ -6,6 +6,7 @@ import kotlinx.coroutines.newCoroutineContext import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import space.kscience.controls.api.* +import space.kscience.controls.manager.DeviceManager import space.kscience.dataforge.context.Context import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.misc.DFExperimental @@ -105,8 +106,8 @@ public class DeviceClient( * Connect to a remote device via this client. */ public fun MagixEndpoint.remoteDevice(context: Context, magixTarget: String, deviceName: Name): DeviceClient { - val subscription = subscribe(controlsMagixFormat, originFilter = listOf(magixTarget)).map { it.second } + val subscription = subscribe(DeviceManager.magixFormat, originFilter = listOf(magixTarget)).map { it.second } return DeviceClient(context, deviceName, subscription) { - broadcast(controlsMagixFormat, it, magixTarget, id = stringUID()) + broadcast(DeviceManager.magixFormat, it, magixTarget, id = stringUID()) } } \ No newline at end of file diff --git a/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/controlsMagix.kt b/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/controlsMagix.kt index f64c8e0..00a1cc6 100644 --- a/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/controlsMagix.kt +++ b/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/controlsMagix.kt @@ -14,15 +14,20 @@ import space.kscience.dataforge.context.logger import space.kscience.magix.api.* -public val controlsMagixFormat: MagixFormat = MagixFormat( +internal val controlsMagixFormat: MagixFormat = MagixFormat( DeviceMessage.serializer(), setOf("controls-kt", "dataforge") ) +/** + * A magix message format to work with Controls-kt data + */ +public val DeviceManager.Companion.magixFormat: MagixFormat get() = controlsMagixFormat + internal fun generateId(request: MagixMessage): String = if (request.id != null) { "${request.id}.response" } else { - "df[${request.payload.hashCode()}" + "controls[${request.payload.hashCode().toString(16)}" } /** @@ -37,6 +42,7 @@ public fun DeviceManager.connectToMagix( if (responsePayload != null) { endpoint.broadcast( format = controlsMagixFormat, + target = request.origin, origin = endpointID, payload = responsePayload, id = generateId(request), diff --git a/demo/all-things/src/main/kotlin/space/kscience/controls/demo/demoDeviceServer.kt b/demo/all-things/src/main/kotlin/space/kscience/controls/demo/demoDeviceServer.kt index 9ef69cf..fda8ead 100644 --- a/demo/all-things/src/main/kotlin/space/kscience/controls/demo/demoDeviceServer.kt +++ b/demo/all-things/src/main/kotlin/space/kscience/controls/demo/demoDeviceServer.kt @@ -7,7 +7,8 @@ import kotlinx.coroutines.launch import kotlinx.html.div import kotlinx.html.link import space.kscience.controls.api.PropertyChangedMessage -import space.kscience.controls.client.controlsMagixFormat +import space.kscience.controls.client.magixFormat +import space.kscience.controls.manager.DeviceManager import space.kscience.controls.spec.name import space.kscience.dataforge.meta.double import space.kscience.dataforge.meta.get @@ -54,7 +55,7 @@ suspend fun Trace.updateXYFrom(flow: Flow>>) { fun CoroutineScope.startDemoDeviceServer(magixEndpoint: MagixEndpoint): ApplicationEngine { //share subscription to a parse message only once - val subscription = magixEndpoint.subscribe(controlsMagixFormat).shareIn(this, SharingStarted.Lazily) + val subscription = magixEndpoint.subscribe(DeviceManager.magixFormat).shareIn(this, SharingStarted.Lazily) val sinFlow = subscription.mapNotNull { (_, payload) -> (payload as? PropertyChangedMessage)?.takeIf { it.property == DemoDevice.sin.name } diff --git a/demo/car/src/main/kotlin/space/kscience/controls/demo/car/MagixVirtualCar.kt b/demo/car/src/main/kotlin/space/kscience/controls/demo/car/MagixVirtualCar.kt index addc3a9..03c781b 100644 --- a/demo/car/src/main/kotlin/space/kscience/controls/demo/car/MagixVirtualCar.kt +++ b/demo/car/src/main/kotlin/space/kscience/controls/demo/car/MagixVirtualCar.kt @@ -2,7 +2,8 @@ package space.kscience.controls.demo.car import kotlinx.coroutines.launch import space.kscience.controls.api.PropertyChangedMessage -import space.kscience.controls.client.controlsMagixFormat +import space.kscience.controls.client.magixFormat +import space.kscience.controls.manager.DeviceManager import space.kscience.controls.spec.write import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Factory @@ -18,7 +19,7 @@ import kotlin.time.ExperimentalTime class MagixVirtualCar(context: Context, meta: Meta) : VirtualCar(context, meta) { private fun MagixEndpoint.launchMagixVirtualCarUpdate() = launch { - subscribe(controlsMagixFormat).collect { (_, payload) -> + subscribe(DeviceManager.magixFormat).collect { (_, payload) -> (payload as? PropertyChangedMessage)?.let { message -> if (message.sourceDevice == Name.parse("virtual-car")) { when (message.property) { diff --git a/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt b/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt index 6dae782..5860211 100644 --- a/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt +++ b/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt @@ -8,7 +8,7 @@ import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.datetime.Clock import space.kscience.controls.client.connectToMagix -import space.kscience.controls.client.controlsMagixFormat +import space.kscience.controls.client.magixFormat import space.kscience.controls.manager.DeviceManager import space.kscience.controls.manager.install import space.kscience.controls.spec.* @@ -96,7 +96,7 @@ fun main() { val latest = ConcurrentHashMap() - monitorEndpoint.subscribe(controlsMagixFormat).onEach { (magixMessage, payload) -> + monitorEndpoint.subscribe(DeviceManager.magixFormat).onEach { (magixMessage, payload) -> latest[magixMessage.origin] = Clock.System.now() - payload.time!! }.launchIn(this) diff --git a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFormat.kt b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFormat.kt index 4921129..904afe6 100644 --- a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFormat.kt +++ b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFormat.kt @@ -6,6 +6,11 @@ import kotlinx.serialization.KSerializer import kotlinx.serialization.json.JsonElement import space.kscience.magix.api.MagixEndpoint.Companion.magixJson +/** + * A format for [MagixMessage] that allows to decode typed payload + * + * @param formats allowed values of the format field that are processed. The first value is the primary format for sending. + */ public data class MagixFormat( val serializer: KSerializer, val formats: Set, @@ -13,10 +18,15 @@ public data class MagixFormat( val defaultFormat: String get() = formats.firstOrNull() ?: "magix" } +/** + * Subscribe for messages in given endpoint using [format] to declare format filter as well as automatic decoding. + * + * @return a flow of pairs (raw message, decoded message). Raw messages are to be used to extract headers. + */ public fun MagixEndpoint.subscribe( format: MagixFormat, - originFilter: Collection? = null, - targetFilter: Collection? = null, + originFilter: Collection? = null, + targetFilter: Collection? = null, ): Flow> = subscribe( MagixMessageFilter(format = format.formats, origin = originFilter, target = targetFilter) ).map { @@ -24,6 +34,10 @@ public fun MagixEndpoint.subscribe( it to value } +/** + * Send a message using given [format] to encode the message payload. The format field is also taken from [format]. + * + */ public suspend fun MagixEndpoint.broadcast( format: MagixFormat, payload: T, diff --git a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixMessageFilter.kt b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixMessageFilter.kt index 4ce1995..10d4345 100644 --- a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixMessageFilter.kt +++ b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixMessageFilter.kt @@ -4,11 +4,14 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.filter import kotlinx.serialization.Serializable +/** + * A filter that allows receiving only messages with format, origin and target in given list. + */ @Serializable public data class MagixMessageFilter( - val format: Collection? = null, - val origin: Collection? = null, - val target: Collection? = null, + val format: Collection? = null, + val origin: Collection? = null, + val target: Collection? = null, ) { public fun accepts(message: MagixMessage): Boolean = diff --git a/magix/magix-mqtt/src/main/kotlin/space/ksceince/magix/mqtt/MqttMagixEndpoint.kt b/magix/magix-mqtt/src/main/kotlin/space/ksceince/magix/mqtt/MqttMagixEndpoint.kt index d8294dd..b5f7c0a 100644 --- a/magix/magix-mqtt/src/main/kotlin/space/ksceince/magix/mqtt/MqttMagixEndpoint.kt +++ b/magix/magix-mqtt/src/main/kotlin/space/ksceince/magix/mqtt/MqttMagixEndpoint.kt @@ -77,6 +77,7 @@ public class MqttMagixEndpoint( public companion object { public const val DEFAULT_MAGIX_TOPIC_NAME: String = "magix" + //TODO add target name escaping internal val defaultBroadcastTopicBuilder: (MagixMessage) -> String = { message -> message.target?.let { "$DEFAULT_MAGIX_TOPIC_NAME/it" } ?: DEFAULT_MAGIX_TOPIC_NAME diff --git a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt index 84c9c75..9eb9ad6 100644 --- a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt +++ b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt @@ -35,7 +35,8 @@ public class RSocketMagixFlowPlugin( receive: Flow, sendMessage: suspend (MagixMessage) -> Unit, ): Job { - val tcpTransport = TcpServerTransport(hostname = serverHost, port = serverPort, configure = transportConfiguration) + val tcpTransport = + TcpServerTransport(hostname = serverHost, port = serverPort, configure = transportConfiguration) val rSocketJob: TcpServer = RSocketServer(rsocketConfiguration) .bindIn(scope, tcpTransport, acceptor(scope, receive, sendMessage)) @@ -59,6 +60,7 @@ public class RSocketMagixFlowPlugin( MagixMessageFilter.serializer(), request.data.readText() ) + request.close() receive.filter(filter).map { message -> val string = MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message) buildPayload { data(string) } @@ -66,22 +68,25 @@ public class RSocketMagixFlowPlugin( } //single send fireAndForget { request: Payload -> - val message = - MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), request.data.readText()) + val message = MagixEndpoint.magixJson.decodeFromString( + MagixMessage.serializer(), + request.data.readText() + ) + request.close() sendMessage(message) } // bidirectional connection, used for streaming connection requestChannel { request: Payload, input: Flow -> - input.onEach { + input.onEach { inputPayload -> sendMessage( MagixEndpoint.magixJson.decodeFromString( MagixMessage.serializer(), - it.data.readText() + inputPayload.use{ it.data.readText()} ) ) }.launchIn(this) - val filterText = request.data.readText() + val filterText = request.use { it.data.readText()} val filter = if (filterText.isNotBlank()) { MagixEndpoint.magixJson.decodeFromString(MagixMessageFilter.serializer(), filterText) diff --git a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/magixModule.kt b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/magixModule.kt index d257bb7..dc197ad 100644 --- a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/magixModule.kt +++ b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/magixModule.kt @@ -47,24 +47,24 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow, r install(WebSockets) } + if (pluginOrNull(RSocketSupport) == null) { + install(RSocketSupport) + } + + // if (pluginOrNull(CORS) == null) { // install(CORS) { // //TODO consider more safe policy // anyHost() // } // } - if (pluginOrNull(ContentNegotiation) == null) { - install(ContentNegotiation) { - json() - } - } - if (pluginOrNull(RSocketSupport) == null) { - install(RSocketSupport) - } routing { route(route) { + install(ContentNegotiation){ + json() + } get("state") { call.respondHtml { head { diff --git a/settings.gradle.kts b/settings.gradle.kts index 4fee116..db083c4 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -58,8 +58,7 @@ include( ":magix:magix-zmq", ":magix:magix-rabbit", ":magix:magix-mqtt", - -// ":magix:magix-storage", + ":magix:magix-storage", ":magix:magix-storage:magix-storage-xodus", ":controls-magix-client", ":demo:all-things",