Add rabbitMQ connector (untested)

This commit is contained in:
Alexander Nozik 2022-06-06 14:10:48 +03:00
parent 535e44286c
commit 20345f846b
No known key found for this signature in database
GPG Key ID: F7FCF2DD25C71357
2 changed files with 84 additions and 0 deletions

View File

@ -11,3 +11,7 @@ dependencies {
api(projects.magix.magixApi)
implementation("com.rabbitmq:amqp-client:5.14.2")
}
readme{
maturity = ru.mipt.npm.gradle.Maturity.PROTOTYPE
}

View File

@ -0,0 +1,80 @@
package ru.mipt.npm.magix.rabbit
import com.rabbitmq.client.*
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.launch
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixMessage
import ru.mipt.npm.magix.api.MagixMessageFilter
import ru.mipt.npm.magix.api.filter
import ru.mipt.npm.magix.rabbit.RabbitMQMagixEndpoint.Companion.DEFAULT_MAGIX_QUEUE_NAME
/**
* A magix endpoint for RabbitMQ message broker
*/
public class RabbitMQMagixEndpoint(
private val connection: Connection,
private val queueName: String = DEFAULT_MAGIX_QUEUE_NAME,
) : MagixEndpoint, AutoCloseable {
private val rabbitChannel by lazy {
connection.createChannel().apply {
queueDeclare(queueName, false, false, false, null)
}
}
override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage> = callbackFlow {
val deliverCallback: DeliverCallback = DeliverCallback { _: String, message: Delivery ->
val magixMessage = MagixEndpoint.magixJson.decodeFromString(
MagixMessage.serializer(), message.body.decodeToString()
)
launch {
send(magixMessage)
}
}
val cancelCallback: CancelCallback = CancelCallback {
cancel("Rabbit consumer is closed")
}
val consumerTag = rabbitChannel.basicConsume(
queueName,
true,
deliverCallback,
cancelCallback
)
awaitClose {
rabbitChannel.basicCancel(consumerTag)
}
}.filter(filter)
override suspend fun broadcast(message: MagixMessage) {
rabbitChannel.basicPublish(
"",
queueName,
null,
MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message).encodeToByteArray()
)
}
override fun close() {
rabbitChannel.close()
connection.close()
}
public companion object {
public const val DEFAULT_MAGIX_QUEUE_NAME: String = "magix"
}
}
public fun MagixEndpoint.Companion.rabbit(
address: String,
queueName: String = DEFAULT_MAGIX_QUEUE_NAME,
): RabbitMQMagixEndpoint {
val connection = ConnectionFactory().newConnection(address)
return RabbitMQMagixEndpoint(connection, queueName)
}