diff --git a/magix/magix-mqtt/src/main/kotlin/space/ksceince/magix/mqtt/MqttMagixEndpoint.kt b/magix/magix-mqtt/src/main/kotlin/space/ksceince/magix/mqtt/MqttMagixEndpoint.kt index 66cf299..d8294dd 100644 --- a/magix/magix-mqtt/src/main/kotlin/space/ksceince/magix/mqtt/MqttMagixEndpoint.kt +++ b/magix/magix-mqtt/src/main/kotlin/space/ksceince/magix/mqtt/MqttMagixEndpoint.kt @@ -3,6 +3,7 @@ package space.ksceince.magix.mqtt import com.hivemq.client.mqtt.MqttClient import com.hivemq.client.mqtt.datatypes.MqttQos import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient +import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.flow.Flow @@ -13,13 +14,21 @@ import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessageFilter import java.util.* - +`/** + * MQTT5 endpoint for magix. + * + * @param broadcastTopicBuilder defines how the topic is constructed from broadcast message structure. + * By default, use `magix/` topic if target is present and `magix` if it is not. + * @param subscribeTopicBuilder defines how the topic is constructed from the filter. + * By default, uses `magix/` if only a single target is presented and `magix/#` otherwise. + */ public class MqttMagixEndpoint( serverHost: String, clientId: String = UUID.randomUUID().toString(), - private val broadcastTopicBuilder: (MagixMessage) -> String = { DEFAULT_MAGIX_TOPIC_NAME }, - private val subscribeTopicBuilder: (MagixMessageFilter) -> String = { DEFAULT_MAGIX_TOPIC_NAME }, - public val qos: MqttQos = MqttQos.AT_LEAST_ONCE, + private val broadcastTopicBuilder: (MagixMessage) -> String = defaultBroadcastTopicBuilder, + private val subscribeTopicBuilder: (MagixMessageFilter) -> String = defaultSubscribeTopicBuilder, + private val qos: MqttQos = MqttQos.AT_LEAST_ONCE, + private val clientConfig: Mqtt5ClientBuilder.() -> Mqtt5ClientBuilder = { this }, ) : MagixEndpoint, AutoCloseable { private val client: Mqtt5AsyncClient by lazy { @@ -27,6 +36,7 @@ public class MqttMagixEndpoint( .identifier(clientId) .serverHost(serverHost) .useMqttVersion5() + .run(clientConfig) .buildAsync() } @@ -66,5 +76,18 @@ public class MqttMagixEndpoint( public companion object { public const val DEFAULT_MAGIX_TOPIC_NAME: String = "magix" + + + internal val defaultBroadcastTopicBuilder: (MagixMessage) -> String = { message -> + message.target?.let { "$DEFAULT_MAGIX_TOPIC_NAME/it" } ?: DEFAULT_MAGIX_TOPIC_NAME + } + + internal val defaultSubscribeTopicBuilder: (MagixMessageFilter) -> String = { filter -> + if (filter.target?.size == 1) { + "$DEFAULT_MAGIX_TOPIC_NAME/${filter.target!!.first()}" + } else { + "$DEFAULT_MAGIX_TOPIC_NAME/#" + } + } } } \ No newline at end of file