Add benchmark demo. Fix some issues with RSocket

This commit is contained in:
Alexander Nozik 2022-06-05 19:06:50 +03:00
parent 025a444db8
commit cfaeb964e7
No known key found for this signature in database
GPG Key ID: F7FCF2DD25C71357
20 changed files with 237 additions and 34 deletions

View File

@ -62,6 +62,7 @@ public class TcpPort private constructor(
futureChannel.await().write(ByteBuffer.wrap(data)) futureChannel.await().write(ByteBuffer.wrap(data))
} }
@OptIn(ExperimentalCoroutinesApi::class)
override fun close() { override fun close() {
listenerJob.cancel() listenerJob.cancel()
if(futureChannel.isCompleted){ if(futureChannel.isCompleted){

View File

@ -0,0 +1,32 @@
plugins {
kotlin("jvm")
application
}
repositories {
mavenCentral()
maven("https://repo.kotlin.link")
}
val ktorVersion: String by rootProject.extra
val rsocketVersion: String by rootProject.extra
dependencies {
implementation(projects.magix.magixServer)
implementation(projects.magix.magixRsocket)
implementation(projects.magix.magixZmq)
implementation("io.ktor:ktor-client-cio:$ktorVersion")
implementation("ch.qos.logback:logback-classic:1.2.11")
}
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach {
kotlinOptions {
jvmTarget = "11"
freeCompilerArgs = freeCompilerArgs + listOf("-Xjvm-default=all", "-Xopt-in=kotlin.RequiresOptIn")
}
}
application {
mainClass.set("ru.mipt.npm.controls.demo.echo.MainKt")
}

View File

@ -0,0 +1,85 @@
package ru.mipt.npm.controls.demo.echo
import io.ktor.server.application.log
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.serialization.json.JsonObject
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixMessage
import ru.mipt.npm.magix.api.MagixMessageFilter
import ru.mipt.npm.magix.rsocket.rSocketStreamWithTcp
import ru.mipt.npm.magix.server.startMagixServer
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
private suspend fun MagixEndpoint.collectEcho(scope: CoroutineScope, n: Int) {
val complete = CompletableDeferred<Boolean>()
val responseIds = HashSet<String>()
scope.launch {
subscribe(
MagixMessageFilter(
origin = listOf("loop")
)
).collect { message ->
if (message.id?.endsWith(".response") == true) {
responseIds.add(message.parentId!!)
}
val parentId = message.parentId
if (parentId != null && parentId.toInt() >= n - 1) {
println("Losses ${(1 - responseIds.size.toDouble() / n) * 100}%")
complete.complete(true)
cancel()
}
}
}
scope.launch {
repeat(n) {
if (it % 20 == 0) delay(1)
broadcast(
MagixMessage(
format = "test",
payload = JsonObject(emptyMap()),
origin = "test",
target = "loop",
id = it.toString()
)
)
}
}
complete.await()
println("completed")
}
@OptIn(ExperimentalTime::class)
suspend fun main(): Unit = coroutineScope {
launch(Dispatchers.Default) {
val server = startMagixServer(enableRawRSocket = true, enableZmq = true) { flow ->
//echo each message
flow.onEach { message ->
if (message.parentId == null) {
val m = message.copy(origin = "loop", parentId = message.id, id = message.id + ".response")
log.info("echo: $m")
flow.emit(m)
}
}.launchIn(this)
}
val responseTime = measureTime {
MagixEndpoint.rSocketStreamWithTcp("localhost").use {
it.collectEcho(this, 5000)
}
}
println(responseTime)
server.stop(500, 500)
cancel()
}
}

View File

@ -44,7 +44,7 @@ suspend fun main(): Unit = coroutineScope {
logger.info("Starting client") logger.info("Starting client")
//Create zmq magix endpoint and wait for to finish //Create zmq magix endpoint and wait for to finish
ZmqMagixEndpoint("tcp://localhost").use { client -> ZmqMagixEndpoint("localhost","tcp").use { client ->
logger.info("Starting subscription") logger.info("Starting subscription")
client.subscribe().onEach { client.subscribe().onEach {
println(it.payload) println(it.payload)

View File

@ -2,6 +2,7 @@ package ru.mipt.npm.magix.rsocket
import io.ktor.client.HttpClient import io.ktor.client.HttpClient
import io.ktor.client.plugins.websocket.WebSockets import io.ktor.client.plugins.websocket.WebSockets
import io.ktor.utils.io.core.Closeable
import io.rsocket.kotlin.RSocket import io.rsocket.kotlin.RSocket
import io.rsocket.kotlin.core.RSocketConnector import io.rsocket.kotlin.core.RSocketConnector
import io.rsocket.kotlin.core.RSocketConnectorBuilder import io.rsocket.kotlin.core.RSocketConnectorBuilder
@ -9,13 +10,10 @@ import io.rsocket.kotlin.ktor.client.RSocketSupport
import io.rsocket.kotlin.ktor.client.rSocket import io.rsocket.kotlin.ktor.client.rSocket
import io.rsocket.kotlin.payload.buildPayload import io.rsocket.kotlin.payload.buildPayload
import io.rsocket.kotlin.payload.data import io.rsocket.kotlin.payload.data
import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.coroutines.withContext
import kotlinx.serialization.encodeToString import kotlinx.serialization.encodeToString
import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixMessage import ru.mipt.npm.magix.api.MagixMessage
@ -27,7 +25,7 @@ import kotlin.coroutines.coroutineContext
public class RSocketMagixEndpoint( public class RSocketMagixEndpoint(
private val rSocket: RSocket, private val rSocket: RSocket,
private val coroutineContext: CoroutineContext, private val coroutineContext: CoroutineContext,
) : MagixEndpoint { ) : MagixEndpoint, Closeable {
override fun subscribe( override fun subscribe(
filter: MagixMessageFilter, filter: MagixMessageFilter,
@ -39,11 +37,15 @@ public class RSocketMagixEndpoint(
}.filter(filter).flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined) }.filter(filter).flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined)
} }
override suspend fun broadcast(message: MagixMessage) { override suspend fun broadcast(message: MagixMessage): Unit = withContext(coroutineContext) {
withContext(coroutineContext) { val payload = buildPayload {
val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message)) } data(MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message))
rSocket.fireAndForget(payload)
} }
rSocket.fireAndForget(payload)
}
override fun close() {
rSocket.cancel()
} }
public companion object public companion object

View File

@ -0,0 +1,57 @@
package ru.mipt.npm.magix.rsocket
import io.ktor.utils.io.core.Closeable
import io.rsocket.kotlin.RSocket
import io.rsocket.kotlin.payload.Payload
import io.rsocket.kotlin.payload.buildPayload
import io.rsocket.kotlin.payload.data
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixMessage
import ru.mipt.npm.magix.api.MagixMessageFilter
import ru.mipt.npm.magix.api.filter
import kotlin.coroutines.CoroutineContext
/**
* RSocket endpoint based on established channel
*/
public class RSocketStreamMagixEndpoint(
private val rSocket: RSocket,
private val coroutineContext: CoroutineContext,
) : MagixEndpoint, Closeable {
private val output: MutableSharedFlow<MagixMessage> = MutableSharedFlow()
private val input: Flow<Payload> by lazy {
rSocket.requestChannel(
Payload.Empty,
output.map { message ->
buildPayload {
data(MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message))
}
}.flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined)
)
}
override fun subscribe(
filter: MagixMessageFilter,
): Flow<MagixMessage> {
return input.map {
MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText())
}.filter(filter).flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined)
}
override suspend fun broadcast(message: MagixMessage): Unit {
output.emit(message)
}
override fun close() {
rSocket.cancel()
}
}

View File

@ -25,3 +25,20 @@ public suspend fun MagixEndpoint.Companion.rSocketWithTcp(
return RSocketMagixEndpoint(rSocket, coroutineContext) return RSocketMagixEndpoint(rSocket, coroutineContext)
} }
public suspend fun MagixEndpoint.Companion.rSocketStreamWithTcp(
host: String,
port: Int = DEFAULT_MAGIX_RAW_PORT,
tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
): RSocketStreamMagixEndpoint {
val transport = TcpClientTransport(
hostname = host,
port = port,
configure = tcpConfig
)
val rSocket = buildConnector(rSocketConfig).connect(transport)
return RSocketStreamMagixEndpoint(rSocket, coroutineContext)
}

View File

@ -5,10 +5,7 @@ import io.ktor.server.application.*
import io.ktor.server.html.respondHtml import io.ktor.server.html.respondHtml
import io.ktor.server.plugins.contentnegotiation.ContentNegotiation import io.ktor.server.plugins.contentnegotiation.ContentNegotiation
import io.ktor.server.request.receive import io.ktor.server.request.receive
import io.ktor.server.routing.get import io.ktor.server.routing.*
import io.ktor.server.routing.post
import io.ktor.server.routing.route
import io.ktor.server.routing.routing
import io.ktor.server.util.getValue import io.ktor.server.util.getValue
import io.ktor.server.websocket.WebSockets import io.ktor.server.websocket.WebSockets
import io.rsocket.kotlin.ConnectionAcceptor import io.rsocket.kotlin.ConnectionAcceptor
@ -21,7 +18,6 @@ import io.rsocket.kotlin.payload.data
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
import kotlinx.html.* import kotlinx.html.*
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.encodeToString import kotlinx.serialization.encodeToString
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.magixJson import ru.mipt.npm.magix.api.MagixEndpoint.Companion.magixJson
import ru.mipt.npm.magix.api.MagixMessage import ru.mipt.npm.magix.api.MagixMessage
@ -30,27 +26,36 @@ import ru.mipt.npm.magix.api.filter
import java.util.* import java.util.*
internal fun CoroutineScope.magixAcceptor(magixFlow: MutableSharedFlow<MagixMessage>) = ConnectionAcceptor { internal fun CoroutineScope.magixAcceptor(
RSocketRequestHandler { magixFlow: MutableSharedFlow<MagixMessage>,
): ConnectionAcceptor = ConnectionAcceptor {
RSocketRequestHandler(coroutineContext) {
//handler for request/stream //handler for request/stream
requestStream { request: Payload -> requestStream { request: Payload ->
val filter = magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText()) val filter = magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText())
magixFlow.filter(filter).map { message -> magixFlow.filter(filter).map { message ->
val string = magixJson.encodeToString(message) val string = magixJson.encodeToString(MagixMessage.serializer(), message)
buildPayload { data(string) } buildPayload { data(string) }
} }
} }
//single send
fireAndForget { request: Payload -> fireAndForget { request: Payload ->
val message = magixJson.decodeFromString<MagixMessage>(request.data.readText()) val message = magixJson.decodeFromString(MagixMessage.serializer(), request.data.readText())
magixFlow.emit(message) magixFlow.emit(message)
} }
// bi-directional connection // bi-directional connection
requestChannel { request: Payload, input: Flow<Payload> -> requestChannel { request: Payload, input: Flow<Payload> ->
input.onEach { input.onEach {
magixFlow.emit(magixJson.decodeFromString(it.data.readText())) magixFlow.emit(magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText()))
}.launchIn(this@magixAcceptor) }.launchIn(this)
val filter = magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText()) val filterText = request.data.readText()
val filter = if(filterText.isNotBlank()){
magixJson.decodeFromString(MagixMessageFilter.serializer(), filterText)
} else {
MagixMessageFilter()
}
magixFlow.filter(filter).map { message -> magixFlow.filter(filter).map { message ->
val string = magixJson.encodeToString(message) val string = magixJson.encodeToString(message)
@ -144,7 +149,7 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, r
magixFlow.emit(message) magixFlow.emit(message)
} }
//rSocket server. Filter from Payload //rSocket server. Filter from Payload
rSocket("rsocket", acceptor = this@magixModule.magixAcceptor(magixFlow)) rSocket("rsocket", acceptor = application.magixAcceptor(magixFlow))
} }
} }
} }

View File

@ -9,6 +9,7 @@ import io.rsocket.kotlin.transport.ktor.tcp.TcpServer
import io.rsocket.kotlin.transport.ktor.tcp.TcpServerTransport import io.rsocket.kotlin.transport.ktor.tcp.TcpServerTransport
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableSharedFlow
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.api.MagixEndpoint
@ -39,15 +40,16 @@ public fun CoroutineScope.launchMagixServerRawRSocket(
*/ */
public fun CoroutineScope.startMagixServer( public fun CoroutineScope.startMagixServer(
port: Int = DEFAULT_MAGIX_HTTP_PORT, port: Int = DEFAULT_MAGIX_HTTP_PORT,
buffer: Int = 100, buffer: Int = 1000,
enableRawRSocket: Boolean = true, enableRawRSocket: Boolean = true,
enableZmq: Boolean = true, enableZmq: Boolean = true,
applicationConfiguration: Application.(MutableSharedFlow<MagixMessage>) -> Unit = {}, applicationConfiguration: Application.(MutableSharedFlow<MagixMessage>) -> Unit = {},
): ApplicationEngine { ): ApplicationEngine {
val logger = LoggerFactory.getLogger("magix-server") val logger = LoggerFactory.getLogger("magix-server")
val magixFlow = MutableSharedFlow<MagixMessage>( val magixFlow = MutableSharedFlow<MagixMessage>(
buffer, replay = buffer,
extraBufferCapacity = buffer extraBufferCapacity = buffer,
onBufferOverflow = BufferOverflow.DROP_OLDEST
) )
if (enableRawRSocket) { if (enableRawRSocket) {

View File

@ -4,8 +4,6 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.flowOn
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.encodeToString
import org.zeromq.SocketType import org.zeromq.SocketType
import org.zeromq.ZContext import org.zeromq.ZContext
import org.zeromq.ZMQ import org.zeromq.ZMQ
@ -19,6 +17,7 @@ import kotlin.coroutines.coroutineContext
public class ZmqMagixEndpoint( public class ZmqMagixEndpoint(
private val host: String, private val host: String,
private val protocol: String,
private val pubPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT, private val pubPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT,
private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,
private val coroutineContext: CoroutineContext = Dispatchers.IO, private val coroutineContext: CoroutineContext = Dispatchers.IO,
@ -28,7 +27,7 @@ public class ZmqMagixEndpoint(
@OptIn(ExperimentalCoroutinesApi::class) @OptIn(ExperimentalCoroutinesApi::class)
override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage> { override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage> {
val socket = zmqContext.createSocket(SocketType.SUB) val socket = zmqContext.createSocket(SocketType.SUB)
socket.connect("$host:$pubPort") socket.connect("$protocol://$host:$pubPort")
socket.subscribe("") socket.subscribe("")
return channelFlow { return channelFlow {
@ -40,7 +39,7 @@ public class ZmqMagixEndpoint(
//This is a blocking call. //This is a blocking call.
val string: String? = socket.recvStr() val string: String? = socket.recvStr()
if (string != null) { if (string != null) {
val message = MagixEndpoint.magixJson.decodeFromString<MagixMessage>(string) val message = MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), string)
send(message) send(message)
} }
} catch (t: Throwable) { } catch (t: Throwable) {
@ -58,12 +57,12 @@ public class ZmqMagixEndpoint(
private val publishSocket by lazy { private val publishSocket by lazy {
zmqContext.createSocket(SocketType.PUSH).apply { zmqContext.createSocket(SocketType.PUSH).apply {
connect("$host:$pullPort") connect("$protocol://$host:$pullPort")
} }
} }
override suspend fun broadcast(message: MagixMessage): Unit = withContext(coroutineContext) { override suspend fun broadcast(message: MagixMessage): Unit = withContext(coroutineContext) {
val string = MagixEndpoint.magixJson.encodeToString(message) val string = MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message)
publishSocket.send(string) publishSocket.send(string)
} }
@ -72,12 +71,14 @@ public class ZmqMagixEndpoint(
} }
} }
public suspend fun <T> MagixEndpoint.Companion.zmq( public suspend fun MagixEndpoint.Companion.zmq(
host: String, host: String,
protocol: String = "tcp",
pubPort: Int = DEFAULT_MAGIX_ZMQ_PUB_PORT, pubPort: Int = DEFAULT_MAGIX_ZMQ_PUB_PORT,
pullPort: Int = DEFAULT_MAGIX_ZMQ_PULL_PORT, pullPort: Int = DEFAULT_MAGIX_ZMQ_PULL_PORT,
): ZmqMagixEndpoint = ZmqMagixEndpoint( ): ZmqMagixEndpoint = ZmqMagixEndpoint(
host, host,
protocol,
pubPort, pubPort,
pullPort, pullPort,
coroutineContext = coroutineContext coroutineContext = coroutineContext

View File

@ -57,7 +57,8 @@ include(
// ":magix:magix-storage", // ":magix:magix-storage",
":magix:magix-storage:magix-storage-xodus", ":magix:magix-storage:magix-storage-xodus",
":controls-magix-client", ":controls-magix-client",
":motors",
":demo:all-things", ":demo:all-things",
":demo:car", ":demo:car",
":demo:motors",
":demo:echo"
) )