MQTT topic selectors

This commit is contained in:
Alexander Nozik 2023-06-29 09:19:05 +03:00
parent d1e9b0a5a5
commit 4d4a9fba1c
3 changed files with 7 additions and 5 deletions

View File

@ -36,8 +36,9 @@ class DemoController : Controller(), ContextAware {
var visualizer: ApplicationEngine? = null var visualizer: ApplicationEngine? = null
var opcUaServer: OpcUaServer = OpcUaServer { var opcUaServer: OpcUaServer = OpcUaServer {
setApplicationName(LocalizedText.english("space.kscience.controls.opcua")) setApplicationName(LocalizedText.english("space.kscience.controls.opcua"))
endpoint { endpoint {
setBindPort(9999) setBindPort(4840)
//use default endpoint //use default endpoint
} }
} }

View File

@ -17,7 +17,8 @@ import java.util.*
public class MqttMagixEndpoint( public class MqttMagixEndpoint(
serverHost: String, serverHost: String,
clientId: String = UUID.randomUUID().toString(), clientId: String = UUID.randomUUID().toString(),
public val topic: String = DEFAULT_MAGIX_TOPIC_NAME, private val broadcastTopicBuilder: (MagixMessage) -> String = { DEFAULT_MAGIX_TOPIC_NAME },
private val subscribeTopicBuilder: (MagixMessageFilter) -> String = { DEFAULT_MAGIX_TOPIC_NAME },
public val qos: MqttQos = MqttQos.AT_LEAST_ONCE, public val qos: MqttQos = MqttQos.AT_LEAST_ONCE,
) : MagixEndpoint, AutoCloseable { ) : MagixEndpoint, AutoCloseable {
@ -36,7 +37,7 @@ public class MqttMagixEndpoint(
override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage> = callbackFlow { override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage> = callbackFlow {
connection.await() connection.await()
client.subscribeWith() client.subscribeWith()
.topicFilter(topic) .topicFilter(subscribeTopicBuilder(filter))
.qos(qos) .qos(qos)
.callback { published -> .callback { published ->
val message = MagixEndpoint.magixJson.decodeFromString( val message = MagixEndpoint.magixJson.decodeFromString(
@ -54,7 +55,7 @@ public class MqttMagixEndpoint(
override suspend fun broadcast(message: MagixMessage) { override suspend fun broadcast(message: MagixMessage) {
connection.await() connection.await()
client.publishWith().topic(topic).qos(qos).payload( client.publishWith().topic(broadcastTopicBuilder(message)).qos(qos).payload(
MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message).encodeToByteArray() MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message).encodeToByteArray()
).send() ).send()
} }

View File

@ -27,7 +27,7 @@ public class RabbitMQMagixEndpoint(
} }
override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage> = callbackFlow { override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage> = callbackFlow {
val deliverCallback: DeliverCallback = DeliverCallback { _: String, message: Delivery -> val deliverCallback = DeliverCallback { _: String, message: Delivery ->
val magixMessage = MagixEndpoint.magixJson.decodeFromString( val magixMessage = MagixEndpoint.magixJson.decodeFromString(
MagixMessage.serializer(), message.body.decodeToString() MagixMessage.serializer(), message.body.decodeToString()
) )