Remove generic from MagixMessage
This commit is contained in:
parent
eb7507191e
commit
07adf143cf
@ -9,15 +9,17 @@ import ru.mipt.npm.controls.api.DeviceMessage
|
|||||||
import ru.mipt.npm.controls.manager.DeviceManager
|
import ru.mipt.npm.controls.manager.DeviceManager
|
||||||
import ru.mipt.npm.controls.manager.hubMessageFlow
|
import ru.mipt.npm.controls.manager.hubMessageFlow
|
||||||
import ru.mipt.npm.controls.manager.respondHubMessage
|
import ru.mipt.npm.controls.manager.respondHubMessage
|
||||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
import ru.mipt.npm.magix.api.*
|
||||||
import ru.mipt.npm.magix.api.MagixMessage
|
|
||||||
import space.kscience.dataforge.context.error
|
import space.kscience.dataforge.context.error
|
||||||
import space.kscience.dataforge.context.logger
|
import space.kscience.dataforge.context.logger
|
||||||
|
|
||||||
|
|
||||||
public const val DATAFORGE_MAGIX_FORMAT: String = "dataforge"
|
public val controlsMagixFormat: MagixFormat<DeviceMessage> = MagixFormat(
|
||||||
|
DeviceMessage.serializer(),
|
||||||
|
setOf("controls-kt", "dataforge")
|
||||||
|
)
|
||||||
|
|
||||||
internal fun generateId(request: MagixMessage<*>): String = if (request.id != null) {
|
internal fun generateId(request: MagixMessage): String = if (request.id != null) {
|
||||||
"${request.id}.response"
|
"${request.id}.response"
|
||||||
} else {
|
} else {
|
||||||
"df[${request.payload.hashCode()}"
|
"df[${request.payload.hashCode()}"
|
||||||
@ -27,38 +29,31 @@ internal fun generateId(request: MagixMessage<*>): String = if (request.id != nu
|
|||||||
* Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1)
|
* Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1)
|
||||||
*/
|
*/
|
||||||
public fun DeviceManager.connectToMagix(
|
public fun DeviceManager.connectToMagix(
|
||||||
endpoint: MagixEndpoint<DeviceMessage>,
|
endpoint: MagixEndpoint,
|
||||||
endpointID: String = DATAFORGE_MAGIX_FORMAT,
|
endpointID: String = controlsMagixFormat.defaultFormat,
|
||||||
preSendAction: (MagixMessage<*>) -> Unit = {}
|
|
||||||
): Job = context.launch {
|
): Job = context.launch {
|
||||||
endpoint.subscribe().onEach { request ->
|
endpoint.subscribe(controlsMagixFormat).onEach { (request, payload) ->
|
||||||
val responsePayload = respondHubMessage(request.payload)
|
val responsePayload = respondHubMessage(payload)
|
||||||
if (responsePayload != null) {
|
if (responsePayload != null) {
|
||||||
val response = MagixMessage(
|
endpoint.broadcast(
|
||||||
|
format = controlsMagixFormat,
|
||||||
origin = endpointID,
|
origin = endpointID,
|
||||||
payload = responsePayload,
|
payload = responsePayload,
|
||||||
format = DATAFORGE_MAGIX_FORMAT,
|
|
||||||
id = generateId(request),
|
id = generateId(request),
|
||||||
parentId = request.id
|
parentId = request.id
|
||||||
)
|
)
|
||||||
|
|
||||||
endpoint.broadcast(response)
|
|
||||||
}
|
}
|
||||||
}.catch { error ->
|
}.catch { error ->
|
||||||
logger.error(error) { "Error while responding to message" }
|
logger.error(error) { "Error while responding to message" }
|
||||||
}.launchIn(this)
|
}.launchIn(this)
|
||||||
|
|
||||||
hubMessageFlow(this).onEach { payload ->
|
hubMessageFlow(this).onEach { payload ->
|
||||||
val magixMessage = MagixMessage(
|
endpoint.broadcast(
|
||||||
|
format = controlsMagixFormat,
|
||||||
origin = endpointID,
|
origin = endpointID,
|
||||||
payload = payload,
|
payload = payload,
|
||||||
format = DATAFORGE_MAGIX_FORMAT,
|
|
||||||
id = "df[${payload.hashCode()}]"
|
id = "df[${payload.hashCode()}]"
|
||||||
)
|
)
|
||||||
preSendAction(magixMessage)
|
|
||||||
endpoint.broadcast(
|
|
||||||
magixMessage
|
|
||||||
)
|
|
||||||
}.catch { error ->
|
}.catch { error ->
|
||||||
logger.error(error) { "Error while sending a message" }
|
logger.error(error) { "Error while sending a message" }
|
||||||
}.launchIn(this)
|
}.launchIn(this)
|
@ -8,8 +8,7 @@ import kotlinx.serialization.Serializable
|
|||||||
import ru.mipt.npm.controls.api.get
|
import ru.mipt.npm.controls.api.get
|
||||||
import ru.mipt.npm.controls.api.getOrReadProperty
|
import ru.mipt.npm.controls.api.getOrReadProperty
|
||||||
import ru.mipt.npm.controls.manager.DeviceManager
|
import ru.mipt.npm.controls.manager.DeviceManager
|
||||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
import ru.mipt.npm.magix.api.*
|
||||||
import ru.mipt.npm.magix.api.MagixMessage
|
|
||||||
import space.kscience.dataforge.context.error
|
import space.kscience.dataforge.context.error
|
||||||
import space.kscience.dataforge.context.logger
|
import space.kscience.dataforge.context.logger
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
@ -59,33 +58,39 @@ public data class TangoPayload(
|
|||||||
val argin: Meta? = null,
|
val argin: Meta? = null,
|
||||||
val argout: Meta? = null,
|
val argout: Meta? = null,
|
||||||
val data: Meta? = null,
|
val data: Meta? = null,
|
||||||
val errors: List<String>? = null
|
val errors: List<String>? = null,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
internal val tangoMagixFormat = MagixFormat(
|
||||||
|
TangoPayload.serializer(),
|
||||||
|
setOf("tango")
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
public fun DeviceManager.launchTangoMagix(
|
public fun DeviceManager.launchTangoMagix(
|
||||||
endpoint: MagixEndpoint<TangoPayload>,
|
endpoint: MagixEndpoint,
|
||||||
endpointID: String = TANGO_MAGIX_FORMAT,
|
endpointID: String = TANGO_MAGIX_FORMAT,
|
||||||
): Job {
|
): Job {
|
||||||
suspend fun respond(request: MagixMessage<TangoPayload>, payloadBuilder: (TangoPayload) -> TangoPayload) {
|
|
||||||
|
suspend fun respond(request: MagixMessage, payload: TangoPayload, payloadBuilder: (TangoPayload) -> TangoPayload) {
|
||||||
endpoint.broadcast(
|
endpoint.broadcast(
|
||||||
request.copy(
|
tangoMagixFormat,
|
||||||
id = generateId(request),
|
id = generateId(request),
|
||||||
parentId = request.id,
|
parentId = request.id,
|
||||||
origin = endpointID,
|
origin = endpointID,
|
||||||
payload = payloadBuilder(request.payload)
|
payload = payloadBuilder(payload)
|
||||||
)
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
return context.launch {
|
return context.launch {
|
||||||
endpoint.subscribe().onEach { request ->
|
endpoint.subscribe(tangoMagixFormat).onEach { (request, payload) ->
|
||||||
try {
|
try {
|
||||||
val device = get(request.payload.device)
|
val device = get(payload.device)
|
||||||
when (request.payload.action) {
|
when (payload.action) {
|
||||||
TangoAction.read -> {
|
TangoAction.read -> {
|
||||||
val value = device.getOrReadProperty(request.payload.name)
|
val value = device.getOrReadProperty(payload.name)
|
||||||
respond(request) { requestPayload ->
|
respond(request, payload) { requestPayload ->
|
||||||
requestPayload.copy(
|
requestPayload.copy(
|
||||||
value = value,
|
value = value,
|
||||||
quality = TangoQuality.VALID
|
quality = TangoQuality.VALID
|
||||||
@ -93,12 +98,12 @@ public fun DeviceManager.launchTangoMagix(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
TangoAction.write -> {
|
TangoAction.write -> {
|
||||||
request.payload.value?.let { value ->
|
payload.value?.let { value ->
|
||||||
device.writeProperty(request.payload.name, value)
|
device.writeProperty(payload.name, value)
|
||||||
}
|
}
|
||||||
//wait for value to be written and return final state
|
//wait for value to be written and return final state
|
||||||
val value = device.getOrReadProperty(request.payload.name)
|
val value = device.getOrReadProperty(payload.name)
|
||||||
respond(request) { requestPayload ->
|
respond(request, payload) { requestPayload ->
|
||||||
requestPayload.copy(
|
requestPayload.copy(
|
||||||
value = value,
|
value = value,
|
||||||
quality = TangoQuality.VALID
|
quality = TangoQuality.VALID
|
||||||
@ -106,8 +111,8 @@ public fun DeviceManager.launchTangoMagix(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
TangoAction.exec -> {
|
TangoAction.exec -> {
|
||||||
val result = device.execute(request.payload.name, request.payload.argin)
|
val result = device.execute(payload.name, payload.argin)
|
||||||
respond(request) { requestPayload ->
|
respond(request, payload) { requestPayload ->
|
||||||
requestPayload.copy(
|
requestPayload.copy(
|
||||||
argout = result,
|
argout = result,
|
||||||
quality = TangoQuality.VALID
|
quality = TangoQuality.VALID
|
||||||
@ -119,12 +124,11 @@ public fun DeviceManager.launchTangoMagix(
|
|||||||
} catch (ex: Exception) {
|
} catch (ex: Exception) {
|
||||||
logger.error(ex) { "Error while responding to message" }
|
logger.error(ex) { "Error while responding to message" }
|
||||||
endpoint.broadcast(
|
endpoint.broadcast(
|
||||||
request.copy(
|
tangoMagixFormat,
|
||||||
id = generateId(request),
|
id = generateId(request),
|
||||||
parentId = request.id,
|
parentId = request.id,
|
||||||
origin = endpointID,
|
origin = endpointID,
|
||||||
payload = request.payload.copy(quality = TangoQuality.WARNING)
|
payload = payload.copy(quality = TangoQuality.WARNING)
|
||||||
)
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}.launchIn(this)
|
}.launchIn(this)
|
||||||
|
@ -33,7 +33,7 @@ import ru.mipt.npm.controls.api.getOrNull
|
|||||||
import ru.mipt.npm.controls.manager.DeviceManager
|
import ru.mipt.npm.controls.manager.DeviceManager
|
||||||
import ru.mipt.npm.controls.manager.respondHubMessage
|
import ru.mipt.npm.controls.manager.respondHubMessage
|
||||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||||
import ru.mipt.npm.magix.server.GenericMagixMessage
|
import ru.mipt.npm.magix.api.MagixMessage
|
||||||
import ru.mipt.npm.magix.server.launchMagixServerRawRSocket
|
import ru.mipt.npm.magix.server.launchMagixServerRawRSocket
|
||||||
import ru.mipt.npm.magix.server.magixModule
|
import ru.mipt.npm.magix.server.magixModule
|
||||||
import space.kscience.dataforge.meta.toMeta
|
import space.kscience.dataforge.meta.toMeta
|
||||||
@ -212,7 +212,7 @@ public fun Application.deviceManagerModule(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val magixFlow = MutableSharedFlow<GenericMagixMessage>(
|
val magixFlow = MutableSharedFlow<MagixMessage>(
|
||||||
buffer,
|
buffer,
|
||||||
extraBufferCapacity = buffer
|
extraBufferCapacity = buffer
|
||||||
)
|
)
|
||||||
|
@ -8,7 +8,6 @@ import javafx.stage.Stage
|
|||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
import org.eclipse.milo.opcua.sdk.server.OpcUaServer
|
import org.eclipse.milo.opcua.sdk.server.OpcUaServer
|
||||||
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText
|
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText
|
||||||
import ru.mipt.npm.controls.api.DeviceMessage
|
|
||||||
import ru.mipt.npm.controls.client.connectToMagix
|
import ru.mipt.npm.controls.client.connectToMagix
|
||||||
import ru.mipt.npm.controls.demo.DemoDevice.Companion.cosScale
|
import ru.mipt.npm.controls.demo.DemoDevice.Companion.cosScale
|
||||||
import ru.mipt.npm.controls.demo.DemoDevice.Companion.sinScale
|
import ru.mipt.npm.controls.demo.DemoDevice.Companion.sinScale
|
||||||
@ -52,9 +51,9 @@ class DemoController : Controller(), ContextAware {
|
|||||||
//starting magix event loop
|
//starting magix event loop
|
||||||
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true)
|
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true)
|
||||||
//Launch device client and connect it to the server
|
//Launch device client and connect it to the server
|
||||||
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer())
|
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost")
|
||||||
deviceManager.connectToMagix(deviceEndpoint)
|
deviceManager.connectToMagix(deviceEndpoint)
|
||||||
val visualEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost", DeviceMessage.serializer())
|
val visualEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
|
||||||
visualizer = visualEndpoint.startDemoDeviceServer()
|
visualizer = visualEndpoint.startDemoDeviceServer()
|
||||||
|
|
||||||
opcUaServer.startup()
|
opcUaServer.startup()
|
||||||
|
@ -11,9 +11,10 @@ import kotlinx.coroutines.flow.*
|
|||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
import kotlinx.html.div
|
import kotlinx.html.div
|
||||||
import kotlinx.html.link
|
import kotlinx.html.link
|
||||||
import ru.mipt.npm.controls.api.DeviceMessage
|
|
||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||||
|
import ru.mipt.npm.controls.client.controlsMagixFormat
|
||||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||||
|
import ru.mipt.npm.magix.api.subscribe
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.meta.double
|
import space.kscience.dataforge.meta.double
|
||||||
import space.kscience.plotly.layout
|
import space.kscience.plotly.layout
|
||||||
@ -54,7 +55,7 @@ suspend fun Trace.updateXYFrom(flow: Flow<Iterable<Pair<Double, Double>>>) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
suspend fun MagixEndpoint<DeviceMessage>.startDemoDeviceServer(): ApplicationEngine = embeddedServer(CIO, 9090) {
|
suspend fun MagixEndpoint.startDemoDeviceServer(): ApplicationEngine = embeddedServer(CIO, 9090) {
|
||||||
install(WebSockets)
|
install(WebSockets)
|
||||||
install(RSocketSupport)
|
install(RSocketSupport)
|
||||||
|
|
||||||
@ -66,8 +67,8 @@ suspend fun MagixEndpoint<DeviceMessage>.startDemoDeviceServer(): ApplicationEng
|
|||||||
val cosFlow = MutableSharedFlow<Meta?>()// = device.cos.flow()
|
val cosFlow = MutableSharedFlow<Meta?>()// = device.cos.flow()
|
||||||
|
|
||||||
launch {
|
launch {
|
||||||
subscribe().collect { magix ->
|
subscribe(controlsMagixFormat).collect { (magix, payload) ->
|
||||||
(magix.payload as? PropertyChangedMessage)?.let { message ->
|
(payload as? PropertyChangedMessage)?.let { message ->
|
||||||
when (message.property) {
|
when (message.property) {
|
||||||
"sin" -> sinFlow.emit(message.value)
|
"sin" -> sinFlow.emit(message.value)
|
||||||
"cos" -> cosFlow.emit(message.value)
|
"cos" -> cosFlow.emit(message.value)
|
||||||
|
@ -1,29 +1,26 @@
|
|||||||
package ru.mipt.npm.magix.api
|
package ru.mipt.npm.magix.api
|
||||||
|
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.map
|
|
||||||
import kotlinx.serialization.KSerializer
|
|
||||||
import kotlinx.serialization.json.Json
|
import kotlinx.serialization.json.Json
|
||||||
import kotlinx.serialization.json.JsonElement
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Inwards API of magix endpoint used to build services
|
* Inwards API of magix endpoint used to build services
|
||||||
*/
|
*/
|
||||||
public interface MagixEndpoint<T> {
|
public interface MagixEndpoint {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Subscribe to a [Flow] of messages
|
* Subscribe to a [Flow] of messages
|
||||||
*/
|
*/
|
||||||
public fun subscribe(
|
public fun subscribe(
|
||||||
filter: MagixMessageFilter = MagixMessageFilter.ALL,
|
filter: MagixMessageFilter = MagixMessageFilter.ALL,
|
||||||
): Flow<MagixMessage<T>>
|
): Flow<MagixMessage>
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send an event
|
* Send an event
|
||||||
*/
|
*/
|
||||||
public suspend fun broadcast(
|
public suspend fun broadcast(
|
||||||
message: MagixMessage<T>,
|
message: MagixMessage,
|
||||||
)
|
)
|
||||||
|
|
||||||
public companion object {
|
public companion object {
|
||||||
@ -53,31 +50,4 @@ public interface MagixEndpoint<T> {
|
|||||||
encodeDefaults = false
|
encodeDefaults = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Specialize this raw json endpoint to use specific serializer
|
|
||||||
*/
|
|
||||||
public fun <T : Any> MagixEndpoint<JsonElement>.specialize(
|
|
||||||
payloadSerializer: KSerializer<T>
|
|
||||||
): MagixEndpoint<T> = object : MagixEndpoint<T> {
|
|
||||||
override fun subscribe(
|
|
||||||
filter: MagixMessageFilter
|
|
||||||
): Flow<MagixMessage<T>> = this@specialize.subscribe(filter).map { message ->
|
|
||||||
message.replacePayload { payload ->
|
|
||||||
MagixEndpoint.magixJson.decodeFromJsonElement(payloadSerializer, payload)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override suspend fun broadcast(message: MagixMessage<T>) {
|
|
||||||
this@specialize.broadcast(
|
|
||||||
message.replacePayload { payload ->
|
|
||||||
MagixEndpoint.magixJson.encodeToJsonElement(
|
|
||||||
payloadSerializer,
|
|
||||||
payload
|
|
||||||
)
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
@ -0,0 +1,47 @@
|
|||||||
|
package ru.mipt.npm.magix.api
|
||||||
|
|
||||||
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
import kotlinx.coroutines.flow.map
|
||||||
|
import kotlinx.serialization.KSerializer
|
||||||
|
import kotlinx.serialization.json.JsonElement
|
||||||
|
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.magixJson
|
||||||
|
|
||||||
|
public data class MagixFormat<T>(
|
||||||
|
val serializer: KSerializer<T>,
|
||||||
|
val formats: Set<String>,
|
||||||
|
) {
|
||||||
|
val defaultFormat: String get() = formats.firstOrNull() ?: "magix"
|
||||||
|
}
|
||||||
|
|
||||||
|
public fun <T> MagixEndpoint.subscribe(
|
||||||
|
format: MagixFormat<T>,
|
||||||
|
originFilter: Collection<String?>? = null,
|
||||||
|
targetFilter: Collection<String?>? = null,
|
||||||
|
): Flow<Pair<MagixMessage, T>> = subscribe(
|
||||||
|
MagixMessageFilter(format = format.formats, origin = originFilter, target = targetFilter)
|
||||||
|
).map {
|
||||||
|
val value: T = magixJson.decodeFromJsonElement(format.serializer, it.payload)
|
||||||
|
it to value
|
||||||
|
}
|
||||||
|
|
||||||
|
public suspend fun <T> MagixEndpoint.broadcast(
|
||||||
|
format: MagixFormat<T>,
|
||||||
|
payload: T,
|
||||||
|
target: String? = null,
|
||||||
|
id: String? = null,
|
||||||
|
parentId: String? = null,
|
||||||
|
user: JsonElement? = null,
|
||||||
|
origin: String = format.defaultFormat,
|
||||||
|
) {
|
||||||
|
val message = MagixMessage(
|
||||||
|
origin = origin,
|
||||||
|
payload = magixJson.encodeToJsonElement(format.serializer, payload),
|
||||||
|
format = format.defaultFormat,
|
||||||
|
target = target,
|
||||||
|
id = id,
|
||||||
|
parentId = parentId,
|
||||||
|
user = user
|
||||||
|
)
|
||||||
|
broadcast(message)
|
||||||
|
}
|
||||||
|
|
@ -24,19 +24,12 @@ import kotlinx.serialization.json.JsonElement
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
@Serializable
|
@Serializable
|
||||||
public data class MagixMessage<T>(
|
public data class MagixMessage(
|
||||||
val origin: String,
|
val origin: String,
|
||||||
val payload: T,
|
val payload: JsonElement,
|
||||||
val format: String = origin,
|
val format: String = origin,
|
||||||
val target: String? = null,
|
val target: String? = null,
|
||||||
val id: String? = null,
|
val id: String? = null,
|
||||||
val parentId: String? = null,
|
val parentId: String? = null,
|
||||||
val user: JsonElement? = null,
|
val user: JsonElement? = null,
|
||||||
)
|
)
|
||||||
|
|
||||||
/**
|
|
||||||
* Create message with same field but replaced payload
|
|
||||||
*/
|
|
||||||
@Suppress("UNCHECKED_CAST")
|
|
||||||
public fun <T, R> MagixMessage<T>.replacePayload(payloadTransform: (T) -> R): MagixMessage<R> =
|
|
||||||
MagixMessage(origin, payloadTransform(payload), format, target, id, parentId, user)
|
|
@ -6,9 +6,9 @@ import kotlinx.serialization.Serializable
|
|||||||
|
|
||||||
@Serializable
|
@Serializable
|
||||||
public data class MagixMessageFilter(
|
public data class MagixMessageFilter(
|
||||||
val format: List<String?>? = null,
|
val format: Collection<String?>? = null,
|
||||||
val origin: List<String?>? = null,
|
val origin: Collection<String?>? = null,
|
||||||
val target: List<String?>? = null,
|
val target: Collection<String?>? = null,
|
||||||
) {
|
) {
|
||||||
public companion object {
|
public companion object {
|
||||||
public val ALL: MagixMessageFilter = MagixMessageFilter()
|
public val ALL: MagixMessageFilter = MagixMessageFilter()
|
||||||
@ -18,7 +18,7 @@ public data class MagixMessageFilter(
|
|||||||
/**
|
/**
|
||||||
* Filter a [Flow] of messages based on given filter
|
* Filter a [Flow] of messages based on given filter
|
||||||
*/
|
*/
|
||||||
public fun <T> Flow<MagixMessage<T>>.filter(filter: MagixMessageFilter): Flow<MagixMessage<T>> {
|
public fun Flow<MagixMessage>.filter(filter: MagixMessageFilter): Flow<MagixMessage> {
|
||||||
if (filter == MagixMessageFilter.ALL) {
|
if (filter == MagixMessageFilter.ALL) {
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
@ -4,20 +4,20 @@ import kotlinx.coroutines.CoroutineScope
|
|||||||
import kotlinx.coroutines.Job
|
import kotlinx.coroutines.Job
|
||||||
import kotlinx.coroutines.flow.launchIn
|
import kotlinx.coroutines.flow.launchIn
|
||||||
import kotlinx.coroutines.flow.onEach
|
import kotlinx.coroutines.flow.onEach
|
||||||
|
import kotlinx.serialization.json.JsonElement
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Launch magix message converter service
|
* Launch magix message converter service
|
||||||
*/
|
*/
|
||||||
public fun <T, R> CoroutineScope.launchMagixConverter(
|
public fun <T, R> CoroutineScope.launchMagixConverter(
|
||||||
inputEndpoint: MagixEndpoint<T>,
|
endpoint: MagixEndpoint,
|
||||||
outputEndpoint: MagixEndpoint<R>,
|
|
||||||
filter: MagixMessageFilter,
|
filter: MagixMessageFilter,
|
||||||
outputFormat: String,
|
outputFormat: String,
|
||||||
newOrigin: String? = null,
|
newOrigin: String? = null,
|
||||||
transformer: suspend (T) -> R,
|
transformer: suspend (JsonElement) -> JsonElement,
|
||||||
): Job = inputEndpoint.subscribe(filter).onEach { message->
|
): Job = endpoint.subscribe(filter).onEach { message->
|
||||||
val newPayload = transformer(message.payload)
|
val newPayload = transformer(message.payload)
|
||||||
val transformed: MagixMessage<R> = MagixMessage(
|
val transformed: MagixMessage = MagixMessage(
|
||||||
newOrigin ?: message.origin,
|
newOrigin ?: message.origin,
|
||||||
newPayload,
|
newPayload,
|
||||||
outputFormat,
|
outputFormat,
|
||||||
@ -26,5 +26,5 @@ public fun <T, R> CoroutineScope.launchMagixConverter(
|
|||||||
message.parentId,
|
message.parentId,
|
||||||
message.user
|
message.user
|
||||||
)
|
)
|
||||||
outputEndpoint.broadcast(transformed)
|
endpoint.broadcast(transformed)
|
||||||
}.launchIn(this)
|
}.launchIn(this)
|
||||||
|
@ -15,7 +15,7 @@ import java.awt.Desktop
|
|||||||
import java.net.URI
|
import java.net.URI
|
||||||
|
|
||||||
|
|
||||||
suspend fun MagixEndpoint<JsonObject>.sendJson(
|
suspend fun MagixEndpoint.sendJson(
|
||||||
origin: String,
|
origin: String,
|
||||||
format: String = "json",
|
format: String = "json",
|
||||||
target: String? = null,
|
target: String? = null,
|
||||||
@ -44,11 +44,11 @@ suspend fun main(): Unit = coroutineScope {
|
|||||||
|
|
||||||
logger.info("Starting client")
|
logger.info("Starting client")
|
||||||
//Create zmq magix endpoint and wait for to finish
|
//Create zmq magix endpoint and wait for to finish
|
||||||
ZmqMagixEndpoint("tcp://localhost", JsonObject.serializer()).use { client ->
|
ZmqMagixEndpoint("tcp://localhost").use { client ->
|
||||||
logger.info("Starting subscription")
|
logger.info("Starting subscription")
|
||||||
client.subscribe().onEach {
|
client.subscribe().onEach {
|
||||||
println(it.payload)
|
println(it.payload)
|
||||||
if (it.payload["index"]?.jsonPrimitive?.int == numberOfMessages) {
|
if (it.payload.jsonObject["index"]?.jsonPrimitive?.int == numberOfMessages) {
|
||||||
logger.info("Index $numberOfMessages reached. Terminating")
|
logger.info("Index $numberOfMessages reached. Terminating")
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
|
@ -12,9 +12,9 @@ import java.util.concurrent.Flow;
|
|||||||
* @param <T>
|
* @param <T>
|
||||||
*/
|
*/
|
||||||
public interface MagixClient<T> {
|
public interface MagixClient<T> {
|
||||||
void broadcast(MagixMessage<T> msg) throws IOException;
|
void broadcast(MagixMessage msg) throws IOException;
|
||||||
|
|
||||||
Flow.Publisher<MagixMessage<T>> subscribe();
|
Flow.Publisher<MagixMessage> subscribe();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a magix endpoint client using RSocket with raw tcp connection
|
* Create a magix endpoint client using RSocket with raw tcp connection
|
||||||
@ -23,7 +23,7 @@ public interface MagixClient<T> {
|
|||||||
* @return the client
|
* @return the client
|
||||||
*/
|
*/
|
||||||
static MagixClient<JsonElement> rSocketTcp(String host, int port) {
|
static MagixClient<JsonElement> rSocketTcp(String host, int port) {
|
||||||
return ControlsMagixClient.Companion.rSocketTcp(host, port, JsonElement.Companion.serializer());
|
return ControlsMagixClient.Companion.rSocketTcp(host, port);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -31,7 +31,6 @@ public interface MagixClient<T> {
|
|||||||
* @param host host name of magix server event loop
|
* @param host host name of magix server event loop
|
||||||
* @param port port of magix server event loop
|
* @param port port of magix server event loop
|
||||||
* @param path
|
* @param path
|
||||||
* @return
|
|
||||||
*/
|
*/
|
||||||
static MagixClient<JsonElement> rSocketWs(String host, int port, String path) {
|
static MagixClient<JsonElement> rSocketWs(String host, int port, String path) {
|
||||||
return ControlsMagixClient.Companion.rSocketWs(host, port, JsonElement.Companion.serializer(), path);
|
return ControlsMagixClient.Companion.rSocketWs(host, port, JsonElement.Companion.serializer(), path);
|
||||||
|
@ -11,25 +11,24 @@ import ru.mipt.npm.magix.rsocket.rSocketWithWebSockets
|
|||||||
import java.util.concurrent.Flow
|
import java.util.concurrent.Flow
|
||||||
|
|
||||||
internal class ControlsMagixClient<T>(
|
internal class ControlsMagixClient<T>(
|
||||||
private val endpoint: MagixEndpoint<T>,
|
private val endpoint: MagixEndpoint,
|
||||||
private val filter: MagixMessageFilter,
|
private val filter: MagixMessageFilter,
|
||||||
) : MagixClient<T> {
|
) : MagixClient<T> {
|
||||||
|
|
||||||
override fun broadcast(msg: MagixMessage<T>): Unit = runBlocking {
|
override fun broadcast(msg: MagixMessage): Unit = runBlocking {
|
||||||
endpoint.broadcast(msg)
|
endpoint.broadcast(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun subscribe(): Flow.Publisher<MagixMessage<T>> = endpoint.subscribe(filter).asPublisher()
|
override fun subscribe(): Flow.Publisher<MagixMessage> = endpoint.subscribe(filter).asPublisher()
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
|
|
||||||
fun <T> rSocketTcp(
|
fun <T> rSocketTcp(
|
||||||
host: String,
|
host: String,
|
||||||
port: Int,
|
port: Int,
|
||||||
payloadSerializer: KSerializer<T>
|
|
||||||
): ControlsMagixClient<T> {
|
): ControlsMagixClient<T> {
|
||||||
val endpoint = runBlocking {
|
val endpoint = runBlocking {
|
||||||
MagixEndpoint.rSocketWithTcp(host, payloadSerializer, port)
|
MagixEndpoint.rSocketWithTcp(host, port)
|
||||||
}
|
}
|
||||||
return ControlsMagixClient(endpoint, MagixMessageFilter())
|
return ControlsMagixClient(endpoint, MagixMessageFilter())
|
||||||
}
|
}
|
||||||
@ -41,7 +40,7 @@ internal class ControlsMagixClient<T>(
|
|||||||
path: String = "/rsocket"
|
path: String = "/rsocket"
|
||||||
): ControlsMagixClient<T> {
|
): ControlsMagixClient<T> {
|
||||||
val endpoint = runBlocking {
|
val endpoint = runBlocking {
|
||||||
MagixEndpoint.rSocketWithWebSockets(host, payloadSerializer, port, path)
|
MagixEndpoint.rSocketWithWebSockets(host, port, path)
|
||||||
}
|
}
|
||||||
return ControlsMagixClient(endpoint, MagixMessageFilter())
|
return ControlsMagixClient(endpoint, MagixMessageFilter())
|
||||||
}
|
}
|
||||||
|
@ -16,7 +16,6 @@ import kotlinx.coroutines.flow.Flow
|
|||||||
import kotlinx.coroutines.flow.flowOn
|
import kotlinx.coroutines.flow.flowOn
|
||||||
import kotlinx.coroutines.flow.map
|
import kotlinx.coroutines.flow.map
|
||||||
import kotlinx.coroutines.withContext
|
import kotlinx.coroutines.withContext
|
||||||
import kotlinx.serialization.KSerializer
|
|
||||||
import kotlinx.serialization.encodeToString
|
import kotlinx.serialization.encodeToString
|
||||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||||
import ru.mipt.npm.magix.api.MagixMessage
|
import ru.mipt.npm.magix.api.MagixMessage
|
||||||
@ -25,27 +24,24 @@ import ru.mipt.npm.magix.api.filter
|
|||||||
import kotlin.coroutines.CoroutineContext
|
import kotlin.coroutines.CoroutineContext
|
||||||
import kotlin.coroutines.coroutineContext
|
import kotlin.coroutines.coroutineContext
|
||||||
|
|
||||||
public class RSocketMagixEndpoint<T>(
|
public class RSocketMagixEndpoint(
|
||||||
payloadSerializer: KSerializer<T>,
|
|
||||||
private val rSocket: RSocket,
|
private val rSocket: RSocket,
|
||||||
private val coroutineContext: CoroutineContext,
|
private val coroutineContext: CoroutineContext,
|
||||||
) : MagixEndpoint<T> {
|
) : MagixEndpoint {
|
||||||
|
|
||||||
private val serializer = MagixMessage.serializer(payloadSerializer)
|
|
||||||
|
|
||||||
override fun subscribe(
|
override fun subscribe(
|
||||||
filter: MagixMessageFilter,
|
filter: MagixMessageFilter,
|
||||||
): Flow<MagixMessage<T>> {
|
): Flow<MagixMessage> {
|
||||||
val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(filter)) }
|
val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(filter)) }
|
||||||
val flow = rSocket.requestStream(payload)
|
val flow = rSocket.requestStream(payload)
|
||||||
return flow.map {
|
return flow.map {
|
||||||
MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText())
|
MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText())
|
||||||
}.filter(filter).flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined)
|
}.filter(filter).flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined)
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun broadcast(message: MagixMessage<T>) {
|
override suspend fun broadcast(message: MagixMessage) {
|
||||||
withContext(coroutineContext) {
|
withContext(coroutineContext) {
|
||||||
val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(serializer, message)) }
|
val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message)) }
|
||||||
rSocket.fireAndForget(payload)
|
rSocket.fireAndForget(payload)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -63,13 +59,12 @@ internal fun buildConnector(rSocketConfig: RSocketConnectorBuilder.ConnectionCon
|
|||||||
/**
|
/**
|
||||||
* Build a websocket based endpoint connected to [host], [port] and given routing [path]
|
* Build a websocket based endpoint connected to [host], [port] and given routing [path]
|
||||||
*/
|
*/
|
||||||
public suspend fun <T> MagixEndpoint.Companion.rSocketWithWebSockets(
|
public suspend fun MagixEndpoint.Companion.rSocketWithWebSockets(
|
||||||
host: String,
|
host: String,
|
||||||
payloadSerializer: KSerializer<T>,
|
|
||||||
port: Int = DEFAULT_MAGIX_HTTP_PORT,
|
port: Int = DEFAULT_MAGIX_HTTP_PORT,
|
||||||
path: String = "/rsocket",
|
path: String = "/rsocket",
|
||||||
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
|
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
|
||||||
): RSocketMagixEndpoint<T> {
|
): RSocketMagixEndpoint {
|
||||||
val client = HttpClient {
|
val client = HttpClient {
|
||||||
install(WebSockets)
|
install(WebSockets)
|
||||||
install(RSocketSupport) {
|
install(RSocketSupport) {
|
||||||
@ -84,5 +79,5 @@ public suspend fun <T> MagixEndpoint.Companion.rSocketWithWebSockets(
|
|||||||
client.close()
|
client.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
return RSocketMagixEndpoint(payloadSerializer, rSocket, coroutineContext)
|
return RSocketMagixEndpoint(rSocket, coroutineContext)
|
||||||
}
|
}
|
@ -4,7 +4,6 @@ import io.ktor.network.sockets.SocketOptions
|
|||||||
import io.ktor.util.InternalAPI
|
import io.ktor.util.InternalAPI
|
||||||
import io.rsocket.kotlin.core.RSocketConnectorBuilder
|
import io.rsocket.kotlin.core.RSocketConnectorBuilder
|
||||||
import io.rsocket.kotlin.transport.ktor.tcp.TcpClientTransport
|
import io.rsocket.kotlin.transport.ktor.tcp.TcpClientTransport
|
||||||
import kotlinx.serialization.KSerializer
|
|
||||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||||
import kotlin.coroutines.coroutineContext
|
import kotlin.coroutines.coroutineContext
|
||||||
|
|
||||||
@ -13,13 +12,12 @@ import kotlin.coroutines.coroutineContext
|
|||||||
* Create a plain TCP based [RSocketMagixEndpoint] connected to [host] and [port]
|
* Create a plain TCP based [RSocketMagixEndpoint] connected to [host] and [port]
|
||||||
*/
|
*/
|
||||||
@OptIn(InternalAPI::class)
|
@OptIn(InternalAPI::class)
|
||||||
public suspend fun <T> MagixEndpoint.Companion.rSocketWithTcp(
|
public suspend fun MagixEndpoint.Companion.rSocketWithTcp(
|
||||||
host: String,
|
host: String,
|
||||||
payloadSerializer: KSerializer<T>,
|
|
||||||
port: Int = DEFAULT_MAGIX_RAW_PORT,
|
port: Int = DEFAULT_MAGIX_RAW_PORT,
|
||||||
tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
|
tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
|
||||||
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
|
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
|
||||||
): RSocketMagixEndpoint<T> {
|
): RSocketMagixEndpoint {
|
||||||
val transport = TcpClientTransport(
|
val transport = TcpClientTransport(
|
||||||
hostname = host,
|
hostname = host,
|
||||||
port = port,
|
port = port,
|
||||||
@ -27,5 +25,5 @@ public suspend fun <T> MagixEndpoint.Companion.rSocketWithTcp(
|
|||||||
)
|
)
|
||||||
val rSocket = buildConnector(rSocketConfig).connect(transport)
|
val rSocket = buildConnector(rSocketConfig).connect(transport)
|
||||||
|
|
||||||
return RSocketMagixEndpoint(payloadSerializer, rSocket, coroutineContext)
|
return RSocketMagixEndpoint(rSocket, coroutineContext)
|
||||||
}
|
}
|
||||||
|
@ -21,44 +21,39 @@ import io.rsocket.kotlin.payload.data
|
|||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.flow.*
|
import kotlinx.coroutines.flow.*
|
||||||
import kotlinx.html.*
|
import kotlinx.html.*
|
||||||
import kotlinx.serialization.KSerializer
|
import kotlinx.serialization.decodeFromString
|
||||||
import kotlinx.serialization.json.JsonElement
|
import kotlinx.serialization.encodeToString
|
||||||
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.magixJson
|
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.magixJson
|
||||||
import ru.mipt.npm.magix.api.MagixMessage
|
import ru.mipt.npm.magix.api.MagixMessage
|
||||||
import ru.mipt.npm.magix.api.MagixMessageFilter
|
import ru.mipt.npm.magix.api.MagixMessageFilter
|
||||||
import ru.mipt.npm.magix.api.filter
|
import ru.mipt.npm.magix.api.filter
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
public typealias GenericMagixMessage = MagixMessage<JsonElement>
|
|
||||||
|
|
||||||
internal val genericMessageSerializer: KSerializer<MagixMessage<JsonElement>> =
|
internal fun CoroutineScope.magixAcceptor(magixFlow: MutableSharedFlow<MagixMessage>) = ConnectionAcceptor {
|
||||||
MagixMessage.serializer(JsonElement.serializer())
|
|
||||||
|
|
||||||
|
|
||||||
internal fun CoroutineScope.magixAcceptor(magixFlow: MutableSharedFlow<GenericMagixMessage>) = ConnectionAcceptor {
|
|
||||||
RSocketRequestHandler {
|
RSocketRequestHandler {
|
||||||
//handler for request/stream
|
//handler for request/stream
|
||||||
requestStream { request: Payload ->
|
requestStream { request: Payload ->
|
||||||
val filter = magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText())
|
val filter = magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText())
|
||||||
magixFlow.filter(filter).map { message ->
|
magixFlow.filter(filter).map { message ->
|
||||||
val string = magixJson.encodeToString(genericMessageSerializer, message)
|
val string = magixJson.encodeToString(message)
|
||||||
buildPayload { data(string) }
|
buildPayload { data(string) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fireAndForget { request: Payload ->
|
fireAndForget { request: Payload ->
|
||||||
val message = magixJson.decodeFromString(genericMessageSerializer, request.data.readText())
|
val message = magixJson.decodeFromString<MagixMessage>(request.data.readText())
|
||||||
magixFlow.emit(message)
|
magixFlow.emit(message)
|
||||||
}
|
}
|
||||||
// bi-directional connection
|
// bi-directional connection
|
||||||
requestChannel { request: Payload, input: Flow<Payload> ->
|
requestChannel { request: Payload, input: Flow<Payload> ->
|
||||||
input.onEach {
|
input.onEach {
|
||||||
magixFlow.emit(magixJson.decodeFromString(genericMessageSerializer, it.data.readText()))
|
magixFlow.emit(magixJson.decodeFromString(it.data.readText()))
|
||||||
}.launchIn(this@magixAcceptor)
|
}.launchIn(this@magixAcceptor)
|
||||||
|
|
||||||
val filter = magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText())
|
val filter = magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText())
|
||||||
|
|
||||||
magixFlow.filter(filter).map { message ->
|
magixFlow.filter(filter).map { message ->
|
||||||
val string = magixJson.encodeToString(genericMessageSerializer, message)
|
val string = magixJson.encodeToString(message)
|
||||||
buildPayload { data(string) }
|
buildPayload { data(string) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -86,7 +81,7 @@ private fun ApplicationCall.buildFilter(): MagixMessageFilter {
|
|||||||
/**
|
/**
|
||||||
* Attache magix http/sse and websocket-based rsocket event loop + statistics page to existing [MutableSharedFlow]
|
* Attache magix http/sse and websocket-based rsocket event loop + statistics page to existing [MutableSharedFlow]
|
||||||
*/
|
*/
|
||||||
public fun Application.magixModule(magixFlow: MutableSharedFlow<GenericMagixMessage>, route: String = "/") {
|
public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, route: String = "/") {
|
||||||
if (pluginOrNull(WebSockets) == null) {
|
if (pluginOrNull(WebSockets) == null) {
|
||||||
install(WebSockets)
|
install(WebSockets)
|
||||||
}
|
}
|
||||||
@ -126,7 +121,7 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<GenericMagixMess
|
|||||||
magixFlow.replayCache.forEach { message ->
|
magixFlow.replayCache.forEach { message ->
|
||||||
li {
|
li {
|
||||||
code {
|
code {
|
||||||
+magixJson.encodeToString(genericMessageSerializer, message)
|
+magixJson.encodeToString(message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -138,14 +133,14 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<GenericMagixMess
|
|||||||
get("sse") {
|
get("sse") {
|
||||||
val filter = call.buildFilter()
|
val filter = call.buildFilter()
|
||||||
val sseFlow = magixFlow.filter(filter).map {
|
val sseFlow = magixFlow.filter(filter).map {
|
||||||
val data = magixJson.encodeToString(genericMessageSerializer, it)
|
val data = magixJson.encodeToString(it)
|
||||||
val id = UUID.randomUUID()
|
val id = UUID.randomUUID()
|
||||||
SseEvent(data, id = id.toString(), event = "message")
|
SseEvent(data, id = id.toString(), event = "message")
|
||||||
}
|
}
|
||||||
call.respondSse(sseFlow)
|
call.respondSse(sseFlow)
|
||||||
}
|
}
|
||||||
post("broadcast") {
|
post("broadcast") {
|
||||||
val message = call.receive<GenericMagixMessage>()
|
val message = call.receive<MagixMessage>()
|
||||||
magixFlow.emit(message)
|
magixFlow.emit(message)
|
||||||
}
|
}
|
||||||
//rSocket server. Filter from Payload
|
//rSocket server. Filter from Payload
|
||||||
@ -158,6 +153,6 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<GenericMagixMess
|
|||||||
* Create a new loop [MutableSharedFlow] with given [buffer] and setup magix module based on it
|
* Create a new loop [MutableSharedFlow] with given [buffer] and setup magix module based on it
|
||||||
*/
|
*/
|
||||||
public fun Application.magixModule(route: String = "/", buffer: Int = 100) {
|
public fun Application.magixModule(route: String = "/", buffer: Int = 100) {
|
||||||
val magixFlow = MutableSharedFlow<GenericMagixMessage>(buffer)
|
val magixFlow = MutableSharedFlow<MagixMessage>(buffer)
|
||||||
magixModule(magixFlow, route)
|
magixModule(magixFlow, route)
|
||||||
}
|
}
|
@ -14,12 +14,13 @@ import org.slf4j.LoggerFactory
|
|||||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||||
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT
|
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT
|
||||||
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT
|
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT
|
||||||
|
import ru.mipt.npm.magix.api.MagixMessage
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Raw TCP magix server
|
* Raw TCP magix server
|
||||||
*/
|
*/
|
||||||
public fun CoroutineScope.launchMagixServerRawRSocket(
|
public fun CoroutineScope.launchMagixServerRawRSocket(
|
||||||
magixFlow: MutableSharedFlow<GenericMagixMessage>,
|
magixFlow: MutableSharedFlow<MagixMessage>,
|
||||||
rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT,
|
rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT,
|
||||||
): TcpServer {
|
): TcpServer {
|
||||||
val tcpTransport = TcpServerTransport(port = rawSocketPort)
|
val tcpTransport = TcpServerTransport(port = rawSocketPort)
|
||||||
@ -41,10 +42,10 @@ public fun CoroutineScope.startMagixServer(
|
|||||||
buffer: Int = 100,
|
buffer: Int = 100,
|
||||||
enableRawRSocket: Boolean = true,
|
enableRawRSocket: Boolean = true,
|
||||||
enableZmq: Boolean = true,
|
enableZmq: Boolean = true,
|
||||||
applicationConfiguration: Application.(MutableSharedFlow<GenericMagixMessage>) -> Unit = {},
|
applicationConfiguration: Application.(MutableSharedFlow<MagixMessage>) -> Unit = {},
|
||||||
): ApplicationEngine {
|
): ApplicationEngine {
|
||||||
val logger = LoggerFactory.getLogger("magix-server")
|
val logger = LoggerFactory.getLogger("magix-server")
|
||||||
val magixFlow = MutableSharedFlow<GenericMagixMessage>(
|
val magixFlow = MutableSharedFlow<MagixMessage>(
|
||||||
buffer,
|
buffer,
|
||||||
extraBufferCapacity = buffer
|
extraBufferCapacity = buffer
|
||||||
)
|
)
|
||||||
|
@ -4,13 +4,16 @@ import kotlinx.coroutines.*
|
|||||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||||
import kotlinx.coroutines.flow.launchIn
|
import kotlinx.coroutines.flow.launchIn
|
||||||
import kotlinx.coroutines.flow.onEach
|
import kotlinx.coroutines.flow.onEach
|
||||||
|
import kotlinx.serialization.decodeFromString
|
||||||
|
import kotlinx.serialization.encodeToString
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import org.zeromq.SocketType
|
import org.zeromq.SocketType
|
||||||
import org.zeromq.ZContext
|
import org.zeromq.ZContext
|
||||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||||
|
import ru.mipt.npm.magix.api.MagixMessage
|
||||||
|
|
||||||
public fun CoroutineScope.launchMagixServerZmqSocket(
|
public fun CoroutineScope.launchMagixServerZmqSocket(
|
||||||
magixFlow: MutableSharedFlow<GenericMagixMessage>,
|
magixFlow: MutableSharedFlow<MagixMessage>,
|
||||||
localHost: String = "tcp://*",
|
localHost: String = "tcp://*",
|
||||||
zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT,
|
zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT,
|
||||||
zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,
|
zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,
|
||||||
@ -22,7 +25,7 @@ public fun CoroutineScope.launchMagixServerZmqSocket(
|
|||||||
val pubSocket = context.createSocket(SocketType.PUB)
|
val pubSocket = context.createSocket(SocketType.PUB)
|
||||||
pubSocket.bind("$localHost:$zmqPubSocketPort")
|
pubSocket.bind("$localHost:$zmqPubSocketPort")
|
||||||
magixFlow.onEach { message ->
|
magixFlow.onEach { message ->
|
||||||
val string = MagixEndpoint.magixJson.encodeToString(genericMessageSerializer, message)
|
val string = MagixEndpoint.magixJson.encodeToString(message)
|
||||||
pubSocket.send(string)
|
pubSocket.send(string)
|
||||||
logger.debug("Published: $string")
|
logger.debug("Published: $string")
|
||||||
}.launchIn(this)
|
}.launchIn(this)
|
||||||
@ -36,7 +39,7 @@ public fun CoroutineScope.launchMagixServerZmqSocket(
|
|||||||
val string: String? = pullSocket.recvStr()
|
val string: String? = pullSocket.recvStr()
|
||||||
if (string != null) {
|
if (string != null) {
|
||||||
logger.debug("Received: $string")
|
logger.debug("Received: $string")
|
||||||
val message = MagixEndpoint.magixJson.decodeFromString(genericMessageSerializer, string)
|
val message = MagixEndpoint.magixJson.decodeFromString<MagixMessage>(string)
|
||||||
magixFlow.emit(message)
|
magixFlow.emit(message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,7 +5,6 @@ import kotlinx.coroutines.CoroutineScope
|
|||||||
import kotlinx.coroutines.flow.launchIn
|
import kotlinx.coroutines.flow.launchIn
|
||||||
import kotlinx.coroutines.flow.onEach
|
import kotlinx.coroutines.flow.onEach
|
||||||
import kotlinx.serialization.encodeToString
|
import kotlinx.serialization.encodeToString
|
||||||
import kotlinx.serialization.json.JsonElement
|
|
||||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||||
import ru.mipt.npm.magix.api.MagixMessage
|
import ru.mipt.npm.magix.api.MagixMessage
|
||||||
import ru.mipt.npm.magix.api.MagixMessageFilter
|
import ru.mipt.npm.magix.api.MagixMessageFilter
|
||||||
@ -13,7 +12,7 @@ import ru.mipt.npm.magix.api.MagixMessageFilter
|
|||||||
public class XodusMagixStorage(
|
public class XodusMagixStorage(
|
||||||
scope: CoroutineScope,
|
scope: CoroutineScope,
|
||||||
private val store: PersistentEntityStore,
|
private val store: PersistentEntityStore,
|
||||||
endpoint: MagixEndpoint<JsonElement>,
|
endpoint: MagixEndpoint,
|
||||||
filter: MagixMessageFilter = MagixMessageFilter(),
|
filter: MagixMessageFilter = MagixMessageFilter(),
|
||||||
) : AutoCloseable {
|
) : AutoCloseable {
|
||||||
|
|
||||||
@ -21,22 +20,22 @@ public class XodusMagixStorage(
|
|||||||
private val subscriptionJob = endpoint.subscribe(filter).onEach { message ->
|
private val subscriptionJob = endpoint.subscribe(filter).onEach { message ->
|
||||||
store.executeInTransaction { transaction ->
|
store.executeInTransaction { transaction ->
|
||||||
transaction.newEntity(MAGIC_MESSAGE_ENTITY_TYPE).apply {
|
transaction.newEntity(MAGIC_MESSAGE_ENTITY_TYPE).apply {
|
||||||
setProperty(MagixMessage<*>::origin.name, message.origin)
|
setProperty(MagixMessage::origin.name, message.origin)
|
||||||
setProperty(MagixMessage<*>::format.name, message.format)
|
setProperty(MagixMessage::format.name, message.format)
|
||||||
|
|
||||||
setBlobString(MagixMessage<*>::payload.name, MagixEndpoint.magixJson.encodeToString(message.payload))
|
setBlobString(MagixMessage::payload.name, MagixEndpoint.magixJson.encodeToString(message.payload))
|
||||||
|
|
||||||
message.target?.let {
|
message.target?.let {
|
||||||
setProperty(MagixMessage<*>::target.name, it)
|
setProperty(MagixMessage::target.name, it)
|
||||||
}
|
}
|
||||||
message.id?.let {
|
message.id?.let {
|
||||||
setProperty(MagixMessage<*>::id.name, it)
|
setProperty(MagixMessage::id.name, it)
|
||||||
}
|
}
|
||||||
message.parentId?.let {
|
message.parentId?.let {
|
||||||
setProperty(MagixMessage<*>::parentId.name, it)
|
setProperty(MagixMessage::parentId.name, it)
|
||||||
}
|
}
|
||||||
message.user?.let {
|
message.user?.let {
|
||||||
setBlobString(MagixMessage<*>::user.name, MagixEndpoint.magixJson.encodeToString(it))
|
setBlobString(MagixMessage::user.name, MagixEndpoint.magixJson.encodeToString(it))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,7 +4,8 @@ import kotlinx.coroutines.*
|
|||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.channelFlow
|
import kotlinx.coroutines.flow.channelFlow
|
||||||
import kotlinx.coroutines.flow.flowOn
|
import kotlinx.coroutines.flow.flowOn
|
||||||
import kotlinx.serialization.KSerializer
|
import kotlinx.serialization.decodeFromString
|
||||||
|
import kotlinx.serialization.encodeToString
|
||||||
import org.zeromq.SocketType
|
import org.zeromq.SocketType
|
||||||
import org.zeromq.ZContext
|
import org.zeromq.ZContext
|
||||||
import org.zeromq.ZMQ
|
import org.zeromq.ZMQ
|
||||||
@ -16,19 +17,16 @@ import ru.mipt.npm.magix.api.filter
|
|||||||
import kotlin.coroutines.CoroutineContext
|
import kotlin.coroutines.CoroutineContext
|
||||||
import kotlin.coroutines.coroutineContext
|
import kotlin.coroutines.coroutineContext
|
||||||
|
|
||||||
public class ZmqMagixEndpoint<T>(
|
public class ZmqMagixEndpoint(
|
||||||
private val host: String,
|
private val host: String,
|
||||||
payloadSerializer: KSerializer<T>,
|
|
||||||
private val pubPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT,
|
private val pubPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT,
|
||||||
private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,
|
private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,
|
||||||
private val coroutineContext: CoroutineContext = Dispatchers.IO,
|
private val coroutineContext: CoroutineContext = Dispatchers.IO,
|
||||||
) : MagixEndpoint<T>, AutoCloseable {
|
) : MagixEndpoint, AutoCloseable {
|
||||||
private val zmqContext by lazy { ZContext() }
|
private val zmqContext by lazy { ZContext() }
|
||||||
|
|
||||||
private val serializer = MagixMessage.serializer(payloadSerializer)
|
|
||||||
|
|
||||||
@OptIn(ExperimentalCoroutinesApi::class)
|
@OptIn(ExperimentalCoroutinesApi::class)
|
||||||
override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage<T>> {
|
override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage> {
|
||||||
val socket = zmqContext.createSocket(SocketType.SUB)
|
val socket = zmqContext.createSocket(SocketType.SUB)
|
||||||
socket.connect("$host:$pubPort")
|
socket.connect("$host:$pubPort")
|
||||||
socket.subscribe("")
|
socket.subscribe("")
|
||||||
@ -42,7 +40,7 @@ public class ZmqMagixEndpoint<T>(
|
|||||||
//This is a blocking call.
|
//This is a blocking call.
|
||||||
val string: String? = socket.recvStr()
|
val string: String? = socket.recvStr()
|
||||||
if (string != null) {
|
if (string != null) {
|
||||||
val message = MagixEndpoint.magixJson.decodeFromString(serializer, string)
|
val message = MagixEndpoint.magixJson.decodeFromString<MagixMessage>(string)
|
||||||
send(message)
|
send(message)
|
||||||
}
|
}
|
||||||
} catch (t: Throwable) {
|
} catch (t: Throwable) {
|
||||||
@ -64,8 +62,8 @@ public class ZmqMagixEndpoint<T>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun broadcast(message: MagixMessage<T>): Unit = withContext(coroutineContext) {
|
override suspend fun broadcast(message: MagixMessage): Unit = withContext(coroutineContext) {
|
||||||
val string = MagixEndpoint.magixJson.encodeToString(serializer, message)
|
val string = MagixEndpoint.magixJson.encodeToString(message)
|
||||||
publishSocket.send(string)
|
publishSocket.send(string)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -76,12 +74,10 @@ public class ZmqMagixEndpoint<T>(
|
|||||||
|
|
||||||
public suspend fun <T> MagixEndpoint.Companion.zmq(
|
public suspend fun <T> MagixEndpoint.Companion.zmq(
|
||||||
host: String,
|
host: String,
|
||||||
payloadSerializer: KSerializer<T>,
|
|
||||||
pubPort: Int = DEFAULT_MAGIX_ZMQ_PUB_PORT,
|
pubPort: Int = DEFAULT_MAGIX_ZMQ_PUB_PORT,
|
||||||
pullPort: Int = DEFAULT_MAGIX_ZMQ_PULL_PORT,
|
pullPort: Int = DEFAULT_MAGIX_ZMQ_PULL_PORT,
|
||||||
): ZmqMagixEndpoint<T> = ZmqMagixEndpoint(
|
): ZmqMagixEndpoint = ZmqMagixEndpoint(
|
||||||
host,
|
host,
|
||||||
payloadSerializer,
|
|
||||||
pubPort,
|
pubPort,
|
||||||
pullPort,
|
pullPort,
|
||||||
coroutineContext = coroutineContext
|
coroutineContext = coroutineContext
|
||||||
|
Loading…
Reference in New Issue
Block a user