Move to stand-alone sse

This commit is contained in:
Alexander Nozik 2020-10-21 23:16:15 +03:00
parent 599d08b62a
commit 8c8d53b187
12 changed files with 27 additions and 306 deletions

View File

@ -18,10 +18,5 @@ kotlin {
api("hep.dataforge:dataforge-io:$dataforgeVersion")
}
}
jvmTest{
dependencies{
api("io.ktor:ktor-network:$ktorVersion")
}
}
}
}

View File

@ -1,26 +0,0 @@
package hep.dataforge.control.server
import hep.dataforge.control.sse.SseEvent
import hep.dataforge.control.sse.writeSseFlow
import io.ktor.application.ApplicationCall
import io.ktor.http.CacheControl
import io.ktor.http.ContentType
import io.ktor.response.cacheControl
import io.ktor.response.respondBytesWriter
import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.Flow
/**
* Method that responds an [ApplicationCall] by reading all the [SseEvent]s from the specified [events] [ReceiveChannel]
* and serializing them in a way that is compatible with the Server-Sent Events specification.
*
* You can read more about it here: https://www.html5rocks.com/en/tutorials/eventsource/basics/
*/
@OptIn(KtorExperimentalAPI::class)
public suspend fun ApplicationCall.respondSse(events: Flow<SseEvent>) {
response.cacheControl(CacheControl.NoCache(null))
respondBytesWriter(contentType = ContentType.Text.EventStream) {
writeSseFlow(events)
}
}

View File

@ -17,12 +17,6 @@ kotlin {
api("io.ktor:ktor-network:$ktorVersion")
}
}
jvmTest{
dependencies{
implementation("io.ktor:ktor-server-cio:$ktorVersion")
implementation("io.ktor:ktor-client-cio:$ktorVersion")
implementation("ch.qos.logback:logback-classic:1.2.3")
}
}
}
}

View File

@ -1,60 +0,0 @@
package hep.dataforge.control.sse
import io.ktor.utils.io.*
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.isActive
/**
* The data class representing a SSE Event that will be sent to the client.
*/
public data class SseEvent(val data: String, val event: String? = null, val id: String? = null)
public suspend fun ByteWriteChannel.writeSseFlow(events: Flow<SseEvent>): Unit = events.collect { event ->
if (event.id != null) {
writeStringUtf8("id: ${event.id}\n")
}
if (event.event != null) {
writeStringUtf8("event: ${event.event}\n")
}
for (dataLine in event.data.lines()) {
writeStringUtf8("data: $dataLine\n")
}
writeStringUtf8("\n")
flush()
}
@OptIn(ExperimentalCoroutinesApi::class)
public suspend fun ByteReadChannel.readSseFlow(): Flow<SseEvent> = channelFlow {
while (isActive) {
//val lines = ArrayList<String>()
val builder = StringBuilder()
var id: String? = null
var event: String? = null
//read lines until blank line or the end of stream
do{
val line = readUTF8Line()
if (line != null && line.isNotBlank()) {
val key = line.substringBefore(":")
val value = line.substringAfter(": ")
when (key) {
"id" -> id = value
"event" -> event = value
"data" -> builder.append(value)
else -> error("Unrecognized event-stream key $key")
}
}
} while (line?.isBlank() != true)
if(builder.isNotBlank()) {
send(SseEvent(builder.toString(), event, id))
}
}
awaitClose {
this@readSseFlow.cancel()
}
}

View File

@ -1,102 +0,0 @@
package hep.dataforge.control.ports
import hep.dataforge.context.Global
import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.util.KtorExperimentalAPI
import io.ktor.util.cio.write
import io.ktor.utils.io.readUTF8Line
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.collect
import org.junit.jupiter.api.Test
import java.net.InetSocketAddress
@OptIn(KtorExperimentalAPI::class)
fun CoroutineScope.launchEchoServer(port: Int): Job = launch {
val server = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().bind(InetSocketAddress("localhost", port))
println("Started echo telnet server at ${server.localAddress}")
while (isActive) {
val socket = try {
server.accept()
} catch (ex: Exception) {
server.close()
return@launch
}
launch {
println("Socket accepted: ${socket.remoteAddress}")
try {
val input = socket.openReadChannel()
val output = socket.openWriteChannel(autoFlush = true)
while (isActive) {
val line = input.readUTF8Line()
//println("${socket.remoteAddress}: $line")
output.write("[response] $line")
}
} catch (ex: Exception) {
cancel()
} finally {
socket.close()
}
}
}
}
class TcpPortTest {
@Test
fun testWithEchoServer() {
try {
runBlocking {
val server = launchEchoServer(22188)
val port = TcpPort.open(Global, "localhost", 22188)
val logJob = launch {
port.receiving().collect {
println("Flow: ${it.decodeToString()}")
}
}
port.startJob.join()
port.send("aaa\n")
port.send("ddd\n")
delay(200)
cancel()
}
} catch (ex: Exception) {
if (ex !is CancellationException) throw ex
}
}
@Test
fun testKtorWithEchoServer() {
try {
runBlocking {
val server = launchEchoServer(22188)
val port = KtorTcpPort.open(Global,"localhost", 22188)
val logJob = launch {
port.receiving().collect {
println("Flow: ${it.decodeToString()}")
}
}
port.send("aaa\n")
port.send("ddd\n")
delay(200)
cancel()
}
} catch (ex: Exception) {
if (ex !is CancellationException) throw ex
}
}
}

View File

@ -1,76 +0,0 @@
package hep.dataforge.control.sse
import io.ktor.application.ApplicationCall
import io.ktor.application.call
import io.ktor.client.HttpClient
import io.ktor.client.call.receive
import io.ktor.client.request.get
import io.ktor.client.statement.HttpResponse
import io.ktor.client.statement.HttpStatement
import io.ktor.http.CacheControl
import io.ktor.http.ContentType
import io.ktor.response.cacheControl
import io.ktor.response.respondBytesWriter
import io.ktor.routing.get
import io.ktor.routing.routing
import io.ktor.server.cio.CIO
import io.ktor.server.engine.embeddedServer
import io.ktor.util.KtorExperimentalAPI
import io.ktor.utils.io.ByteReadChannel
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import org.junit.jupiter.api.Test
@OptIn(KtorExperimentalAPI::class)
suspend fun ApplicationCall.respondSse(events: Flow<SseEvent>) {
response.cacheControl(CacheControl.NoCache(null))
respondBytesWriter(contentType = ContentType.Text.EventStream) {
writeSseFlow(events)
}
}
suspend fun HttpClient.readSse(address: String, block: suspend (SseEvent) -> Unit): Job = launch {
get<HttpStatement>(address).execute { response: HttpResponse ->
// Response is not downloaded here.
val channel = response.receive<ByteReadChannel>()
val flow = channel.readSseFlow()
flow.collect(block)
}
}
class SseTest {
@OptIn(KtorExperimentalAPI::class)
@Test
fun testSseIntegration() {
runBlocking(Dispatchers.Default) {
val server = embeddedServer(CIO, 12080) {
routing {
get("/") {
val flow = flow {
repeat(5) {
delay(300)
emit(it)
}
}.map {
SseEvent(data = it.toString(), id = it.toString())
}
call.respondSse(flow)
}
}
}
server.start(wait = false)
delay(1000)
val client = HttpClient(io.ktor.client.engine.cio.CIO)
client.readSse("http://localhost:12080") {
println(it)
}
delay(2000)
println("Closing the client after waiting")
client.close()
server.stop(1000, 1000)
}
}
}

View File

@ -5,6 +5,10 @@ plugins {
val ktorVersion: String by rootProject.extra
repositories{
maven("https://maven.pkg.github.com/altavir/ktor-client-sse")
}
kotlin {
sourceSets {
commonMain {
@ -12,11 +16,12 @@ kotlin {
implementation(project(":dataforge-device-core"))
implementation(project(":dataforge-device-tcp"))
implementation("io.ktor:ktor-client-core:$ktorVersion")
implementation("ru.mipt.npm:ktor-client-sse:0.1.0")
}
}
jvmMain {
dependencies {
implementation("io.ktor:ktor-client-cio:$ktorVersion")
}
}
jsMain {

View File

@ -3,38 +3,23 @@ package hep.dataforge.control.client
import hep.dataforge.control.controllers.DeviceManager
import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.control.controllers.respondMessage
import hep.dataforge.control.sse.SseEvent
import hep.dataforge.control.sse.readSseFlow
import hep.dataforge.meta.toJson
import hep.dataforge.meta.toMeta
import hep.dataforge.meta.wrap
import io.ktor.client.HttpClient
import io.ktor.client.call.receive
import io.ktor.client.request.get
import io.ktor.client.request.post
import io.ktor.client.statement.HttpResponse
import io.ktor.client.statement.HttpStatement
import io.ktor.http.ContentType
import io.ktor.http.Url
import io.ktor.http.contentType
import io.ktor.utils.io.ByteReadChannel
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.serialization.json.*
import ru.mipt.npm.ktor.sse.readSse
import kotlin.coroutines.CoroutineContext
private suspend fun HttpClient.readSse(address: String, block: suspend (SseEvent) -> Unit): Job = launch {
get<HttpStatement>(address).execute { response: HttpResponse ->
// Response is not downloaded here.
val channel = response.receive<ByteReadChannel>()
val flow = channel.readSseFlow()
flow.collect(block)
}
}
/*
{
"id":"string|number[optional, but desired]",
@ -53,17 +38,18 @@ private suspend fun HttpClient.readSse(address: String, block: suspend (SseEvent
public class MagixClient(
private val manager: DeviceManager,
private val postUrl: Url,
private val sseUrl: Url
private val sseUrl: Url,
//private val inbox: Flow<JsonObject>
): CoroutineScope {
) : CoroutineScope {
override val coroutineContext: CoroutineContext = manager.context.coroutineContext + Job(manager.context.coroutineContext[Job])
override val coroutineContext: CoroutineContext =
manager.context.coroutineContext + Job(manager.context.coroutineContext[Job])
private val client = HttpClient()
protected fun generateId(message: DeviceMessage, requestId: String?): String = if(requestId != null){
protected fun generateId(message: DeviceMessage, requestId: String?): String = if (requestId != null) {
"$requestId.response"
} else{
} else {
"df[${message.hashCode()}"
}
@ -95,18 +81,18 @@ public class MagixClient(
}
private val respondJob = launch {
client.readSse(sseUrl.toString()){
client.readSse(sseUrl.toString()) {
val json = Json.parseToJsonElement(it.data) as JsonObject
val requestId = json["id"]?.jsonPrimitive?.content
val payload = json["payload"]?.jsonObject
//TODO analyze action
if(payload != null){
if (payload != null) {
val meta = payload.toMeta()
val request = DeviceMessage.wrap(meta)
val response = manager.respondMessage(request)
send(wrapMessage(response,requestId))
send(wrapMessage(response, requestId))
} else {
TODO("process heartbeat and other system messages")
}

View File

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.6.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.7-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

View File

@ -3,10 +3,15 @@ import ru.mipt.npm.gradle.useFx
plugins {
id("ru.mipt.npm.jvm")
id("ru.mipt.npm.publish")
application
}
//TODO to be moved to a separate project
application{
mainClassName = "ru.mipt.npm.devices.pimotionmaster.PiMotionMasterAppKt"
}
kotlin{
explicitApi = null
useFx(ru.mipt.npm.gradle.FXModule.CONTROLS, configuration = ru.mipt.npm.gradle.DependencyConfiguration.IMPLEMENTATION)

View File

@ -111,7 +111,7 @@ class PiMotionMasterView : View() {
action {
if (!debugServerStarted.get()) {
debugServerJobProperty.value =
controller.context.launchPiDebugServer(port.get(), listOf("1", "2"))
controller.context.launchPiDebugServer(port.get(), listOf("1", "2", "3", "4"))
} else {
debugServerJobProperty.get().cancel()
debugServerJobProperty.value = null

View File

@ -3,10 +3,10 @@ pluginManagement {
val toolsVersion = "0.6.3-dev-1.4.20-M1"
repositories {
//mavenLocal()
mavenLocal()
jcenter()
gradlePluginPortal()
maven("https://kotlin.bintray.com/kotlinx")
//maven("https://kotlin.bintray.com/kotlinx")
maven("https://dl.bintray.com/kotlin/kotlin-eap")
maven("https://dl.bintray.com/mipt-npm/dataforge")
maven("https://dl.bintray.com/mipt-npm/kscience")
@ -32,7 +32,7 @@ include(
":dataforge-device-serial",
":dataforge-device-server",
":dataforge-magix-client",
":demo",
// ":demo",
":motors"
)