From 4d4a9fba1cc33947a548670dabf0973d663d904d Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Thu, 29 Jun 2023 09:19:05 +0300 Subject: [PATCH] MQTT topic selectors --- .../space/kscience/controls/demo/DemoControllerView.kt | 3 ++- .../kotlin/space/ksceince/magix/mqtt/MqttMagixEndpoint.kt | 7 ++++--- .../space/kscience/magix/rabbit/RabbitMQMagixEndpoint.kt | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoControllerView.kt b/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoControllerView.kt index 9eb8244..edb199e 100644 --- a/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoControllerView.kt +++ b/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoControllerView.kt @@ -36,8 +36,9 @@ class DemoController : Controller(), ContextAware { var visualizer: ApplicationEngine? = null var opcUaServer: OpcUaServer = OpcUaServer { setApplicationName(LocalizedText.english("space.kscience.controls.opcua")) + endpoint { - setBindPort(9999) + setBindPort(4840) //use default endpoint } } diff --git a/magix/magix-mqtt/src/main/kotlin/space/ksceince/magix/mqtt/MqttMagixEndpoint.kt b/magix/magix-mqtt/src/main/kotlin/space/ksceince/magix/mqtt/MqttMagixEndpoint.kt index c9ac7f7..66cf299 100644 --- a/magix/magix-mqtt/src/main/kotlin/space/ksceince/magix/mqtt/MqttMagixEndpoint.kt +++ b/magix/magix-mqtt/src/main/kotlin/space/ksceince/magix/mqtt/MqttMagixEndpoint.kt @@ -17,7 +17,8 @@ import java.util.* public class MqttMagixEndpoint( serverHost: String, clientId: String = UUID.randomUUID().toString(), - public val topic: String = DEFAULT_MAGIX_TOPIC_NAME, + private val broadcastTopicBuilder: (MagixMessage) -> String = { DEFAULT_MAGIX_TOPIC_NAME }, + private val subscribeTopicBuilder: (MagixMessageFilter) -> String = { DEFAULT_MAGIX_TOPIC_NAME }, public val qos: MqttQos = MqttQos.AT_LEAST_ONCE, ) : MagixEndpoint, AutoCloseable { @@ -36,7 +37,7 @@ public class MqttMagixEndpoint( override fun subscribe(filter: MagixMessageFilter): Flow = callbackFlow { connection.await() client.subscribeWith() - .topicFilter(topic) + .topicFilter(subscribeTopicBuilder(filter)) .qos(qos) .callback { published -> val message = MagixEndpoint.magixJson.decodeFromString( @@ -54,7 +55,7 @@ public class MqttMagixEndpoint( override suspend fun broadcast(message: MagixMessage) { connection.await() - client.publishWith().topic(topic).qos(qos).payload( + client.publishWith().topic(broadcastTopicBuilder(message)).qos(qos).payload( MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message).encodeToByteArray() ).send() } diff --git a/magix/magix-rabbit/src/main/kotlin/space/kscience/magix/rabbit/RabbitMQMagixEndpoint.kt b/magix/magix-rabbit/src/main/kotlin/space/kscience/magix/rabbit/RabbitMQMagixEndpoint.kt index 33af457..561acdd 100644 --- a/magix/magix-rabbit/src/main/kotlin/space/kscience/magix/rabbit/RabbitMQMagixEndpoint.kt +++ b/magix/magix-rabbit/src/main/kotlin/space/kscience/magix/rabbit/RabbitMQMagixEndpoint.kt @@ -27,7 +27,7 @@ public class RabbitMQMagixEndpoint( } override fun subscribe(filter: MagixMessageFilter): Flow = callbackFlow { - val deliverCallback: DeliverCallback = DeliverCallback { _: String, message: Delivery -> + val deliverCallback = DeliverCallback { _: String, message: Delivery -> val magixMessage = MagixEndpoint.magixJson.decodeFromString( MagixMessage.serializer(), message.body.decodeToString() )