From 20345f846b41a8803fc1395243afbd3231ef5253 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Mon, 6 Jun 2022 14:10:48 +0300 Subject: [PATCH] Add rabbitMQ connector (untested) --- magix/magix-rabbit/build.gradle.kts | 4 + .../npm/magix/rabbit/RabbitMQMagixEndpoint.kt | 80 +++++++++++++++++++ 2 files changed, 84 insertions(+) create mode 100644 magix/magix-rabbit/src/main/kotlin/ru/mipt/npm/magix/rabbit/RabbitMQMagixEndpoint.kt diff --git a/magix/magix-rabbit/build.gradle.kts b/magix/magix-rabbit/build.gradle.kts index c9d4bef..960a121 100644 --- a/magix/magix-rabbit/build.gradle.kts +++ b/magix/magix-rabbit/build.gradle.kts @@ -11,3 +11,7 @@ dependencies { api(projects.magix.magixApi) implementation("com.rabbitmq:amqp-client:5.14.2") } + +readme{ + maturity = ru.mipt.npm.gradle.Maturity.PROTOTYPE +} diff --git a/magix/magix-rabbit/src/main/kotlin/ru/mipt/npm/magix/rabbit/RabbitMQMagixEndpoint.kt b/magix/magix-rabbit/src/main/kotlin/ru/mipt/npm/magix/rabbit/RabbitMQMagixEndpoint.kt new file mode 100644 index 0000000..a709df8 --- /dev/null +++ b/magix/magix-rabbit/src/main/kotlin/ru/mipt/npm/magix/rabbit/RabbitMQMagixEndpoint.kt @@ -0,0 +1,80 @@ +package ru.mipt.npm.magix.rabbit + +import com.rabbitmq.client.* +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.launch +import ru.mipt.npm.magix.api.MagixEndpoint +import ru.mipt.npm.magix.api.MagixMessage +import ru.mipt.npm.magix.api.MagixMessageFilter +import ru.mipt.npm.magix.api.filter +import ru.mipt.npm.magix.rabbit.RabbitMQMagixEndpoint.Companion.DEFAULT_MAGIX_QUEUE_NAME + +/** + * A magix endpoint for RabbitMQ message broker + */ +public class RabbitMQMagixEndpoint( + private val connection: Connection, + private val queueName: String = DEFAULT_MAGIX_QUEUE_NAME, +) : MagixEndpoint, AutoCloseable { + + private val rabbitChannel by lazy { + connection.createChannel().apply { + queueDeclare(queueName, false, false, false, null) + } + } + + override fun subscribe(filter: MagixMessageFilter): Flow = callbackFlow { + val deliverCallback: DeliverCallback = DeliverCallback { _: String, message: Delivery -> + val magixMessage = MagixEndpoint.magixJson.decodeFromString( + MagixMessage.serializer(), message.body.decodeToString() + ) + launch { + send(magixMessage) + } + } + + val cancelCallback: CancelCallback = CancelCallback { + cancel("Rabbit consumer is closed") + } + + val consumerTag = rabbitChannel.basicConsume( + queueName, + true, + deliverCallback, + cancelCallback + ) + + awaitClose { + rabbitChannel.basicCancel(consumerTag) + } + }.filter(filter) + + override suspend fun broadcast(message: MagixMessage) { + rabbitChannel.basicPublish( + "", + queueName, + null, + MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message).encodeToByteArray() + ) + } + + override fun close() { + rabbitChannel.close() + connection.close() + } + + public companion object { + public const val DEFAULT_MAGIX_QUEUE_NAME: String = "magix" + } +} + +public fun MagixEndpoint.Companion.rabbit( + address: String, + queueName: String = DEFAULT_MAGIX_QUEUE_NAME, +): RabbitMQMagixEndpoint { + val connection = ConnectionFactory().newConnection(address) + return RabbitMQMagixEndpoint(connection, queueName) +} \ No newline at end of file