Add MQTT topic specialization

This commit is contained in:
Alexander Nozik 2023-07-23 14:29:11 +03:00
parent c28944e10f
commit 6f5270ee37

View File

@ -3,6 +3,7 @@ package space.ksceince.magix.mqtt
import com.hivemq.client.mqtt.MqttClient import com.hivemq.client.mqtt.MqttClient
import com.hivemq.client.mqtt.datatypes.MqttQos import com.hivemq.client.mqtt.datatypes.MqttQos
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient
import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder
import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
@ -13,13 +14,21 @@ import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.MagixMessageFilter import space.kscience.magix.api.MagixMessageFilter
import java.util.* import java.util.*
`/**
* MQTT5 endpoint for magix.
*
* @param broadcastTopicBuilder defines how the topic is constructed from broadcast message structure.
* By default, use `magix/<target>` topic if target is present and `magix` if it is not.
* @param subscribeTopicBuilder defines how the topic is constructed from the filter.
* By default, uses `magix/<target>` if only a single target is presented and `magix/#` otherwise.
*/
public class MqttMagixEndpoint( public class MqttMagixEndpoint(
serverHost: String, serverHost: String,
clientId: String = UUID.randomUUID().toString(), clientId: String = UUID.randomUUID().toString(),
private val broadcastTopicBuilder: (MagixMessage) -> String = { DEFAULT_MAGIX_TOPIC_NAME }, private val broadcastTopicBuilder: (MagixMessage) -> String = defaultBroadcastTopicBuilder,
private val subscribeTopicBuilder: (MagixMessageFilter) -> String = { DEFAULT_MAGIX_TOPIC_NAME }, private val subscribeTopicBuilder: (MagixMessageFilter) -> String = defaultSubscribeTopicBuilder,
public val qos: MqttQos = MqttQos.AT_LEAST_ONCE, private val qos: MqttQos = MqttQos.AT_LEAST_ONCE,
private val clientConfig: Mqtt5ClientBuilder.() -> Mqtt5ClientBuilder = { this },
) : MagixEndpoint, AutoCloseable { ) : MagixEndpoint, AutoCloseable {
private val client: Mqtt5AsyncClient by lazy { private val client: Mqtt5AsyncClient by lazy {
@ -27,6 +36,7 @@ public class MqttMagixEndpoint(
.identifier(clientId) .identifier(clientId)
.serverHost(serverHost) .serverHost(serverHost)
.useMqttVersion5() .useMqttVersion5()
.run(clientConfig)
.buildAsync() .buildAsync()
} }
@ -66,5 +76,18 @@ public class MqttMagixEndpoint(
public companion object { public companion object {
public const val DEFAULT_MAGIX_TOPIC_NAME: String = "magix" public const val DEFAULT_MAGIX_TOPIC_NAME: String = "magix"
internal val defaultBroadcastTopicBuilder: (MagixMessage) -> String = { message ->
message.target?.let { "$DEFAULT_MAGIX_TOPIC_NAME/it" } ?: DEFAULT_MAGIX_TOPIC_NAME
}
internal val defaultSubscribeTopicBuilder: (MagixMessageFilter) -> String = { filter ->
if (filter.target?.size == 1) {
"$DEFAULT_MAGIX_TOPIC_NAME/${filter.target!!.first()}"
} else {
"$DEFAULT_MAGIX_TOPIC_NAME/#"
}
}
} }
} }