From 8c8d53b1875aef72b65e221d2c774a860c0fbc4e Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Wed, 21 Oct 2020 23:16:15 +0300 Subject: [PATCH] Move to stand-alone sse --- dataforge-device-core/build.gradle.kts | 5 - .../dataforge/control/server/sseOnServer.kt | 26 ----- dataforge-device-tcp/build.gradle.kts | 8 +- .../hep/dataforge/control/sse/SseEvent.kt | 60 ----------- .../dataforge/control/ports/TcpPortTest.kt | 102 ------------------ .../hep/dataforge/control/sse/SseTest.kt | 76 ------------- dataforge-magix-client/build.gradle.kts | 7 +- .../dataforge/control/client/MagixClient.kt | 34 ++---- gradle/wrapper/gradle-wrapper.properties | 2 +- motors/build.gradle.kts | 5 + .../pimotionmaster/PiMotionMasterApp.kt | 2 +- settings.gradle.kts | 6 +- 12 files changed, 27 insertions(+), 306 deletions(-) delete mode 100644 dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/sseOnServer.kt delete mode 100644 dataforge-device-tcp/src/commonMain/kotlin/hep/dataforge/control/sse/SseEvent.kt delete mode 100644 dataforge-device-tcp/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt delete mode 100644 dataforge-device-tcp/src/jvmTest/kotlin/hep/dataforge/control/sse/SseTest.kt diff --git a/dataforge-device-core/build.gradle.kts b/dataforge-device-core/build.gradle.kts index 55e897b..6ad2390 100644 --- a/dataforge-device-core/build.gradle.kts +++ b/dataforge-device-core/build.gradle.kts @@ -18,10 +18,5 @@ kotlin { api("hep.dataforge:dataforge-io:$dataforgeVersion") } } - jvmTest{ - dependencies{ - api("io.ktor:ktor-network:$ktorVersion") - } - } } } \ No newline at end of file diff --git a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/sseOnServer.kt b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/sseOnServer.kt deleted file mode 100644 index 0fb30ae..0000000 --- a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/sseOnServer.kt +++ /dev/null @@ -1,26 +0,0 @@ -package hep.dataforge.control.server - -import hep.dataforge.control.sse.SseEvent -import hep.dataforge.control.sse.writeSseFlow -import io.ktor.application.ApplicationCall -import io.ktor.http.CacheControl -import io.ktor.http.ContentType -import io.ktor.response.cacheControl -import io.ktor.response.respondBytesWriter -import io.ktor.util.KtorExperimentalAPI -import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.coroutines.flow.Flow - -/** - * Method that responds an [ApplicationCall] by reading all the [SseEvent]s from the specified [events] [ReceiveChannel] - * and serializing them in a way that is compatible with the Server-Sent Events specification. - * - * You can read more about it here: https://www.html5rocks.com/en/tutorials/eventsource/basics/ - */ -@OptIn(KtorExperimentalAPI::class) -public suspend fun ApplicationCall.respondSse(events: Flow) { - response.cacheControl(CacheControl.NoCache(null)) - respondBytesWriter(contentType = ContentType.Text.EventStream) { - writeSseFlow(events) - } -} \ No newline at end of file diff --git a/dataforge-device-tcp/build.gradle.kts b/dataforge-device-tcp/build.gradle.kts index 0312ed2..b1e74ec 100644 --- a/dataforge-device-tcp/build.gradle.kts +++ b/dataforge-device-tcp/build.gradle.kts @@ -17,12 +17,6 @@ kotlin { api("io.ktor:ktor-network:$ktorVersion") } } - jvmTest{ - dependencies{ - implementation("io.ktor:ktor-server-cio:$ktorVersion") - implementation("io.ktor:ktor-client-cio:$ktorVersion") - implementation("ch.qos.logback:logback-classic:1.2.3") - } - } + } } \ No newline at end of file diff --git a/dataforge-device-tcp/src/commonMain/kotlin/hep/dataforge/control/sse/SseEvent.kt b/dataforge-device-tcp/src/commonMain/kotlin/hep/dataforge/control/sse/SseEvent.kt deleted file mode 100644 index 3dd9f79..0000000 --- a/dataforge-device-tcp/src/commonMain/kotlin/hep/dataforge/control/sse/SseEvent.kt +++ /dev/null @@ -1,60 +0,0 @@ -package hep.dataforge.control.sse - -import io.ktor.utils.io.* -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.channels.awaitClose -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.channelFlow -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.isActive - - -/** - * The data class representing a SSE Event that will be sent to the client. - */ -public data class SseEvent(val data: String, val event: String? = null, val id: String? = null) - -public suspend fun ByteWriteChannel.writeSseFlow(events: Flow): Unit = events.collect { event -> - if (event.id != null) { - writeStringUtf8("id: ${event.id}\n") - } - if (event.event != null) { - writeStringUtf8("event: ${event.event}\n") - } - for (dataLine in event.data.lines()) { - writeStringUtf8("data: $dataLine\n") - } - writeStringUtf8("\n") - flush() -} - -@OptIn(ExperimentalCoroutinesApi::class) -public suspend fun ByteReadChannel.readSseFlow(): Flow = channelFlow { - while (isActive) { - //val lines = ArrayList() - val builder = StringBuilder() - var id: String? = null - var event: String? = null - //read lines until blank line or the end of stream - - do{ - val line = readUTF8Line() - if (line != null && line.isNotBlank()) { - val key = line.substringBefore(":") - val value = line.substringAfter(": ") - when (key) { - "id" -> id = value - "event" -> event = value - "data" -> builder.append(value) - else -> error("Unrecognized event-stream key $key") - } - } - } while (line?.isBlank() != true) - if(builder.isNotBlank()) { - send(SseEvent(builder.toString(), event, id)) - } - } - awaitClose { - this@readSseFlow.cancel() - } -} diff --git a/dataforge-device-tcp/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt b/dataforge-device-tcp/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt deleted file mode 100644 index ed1621d..0000000 --- a/dataforge-device-tcp/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt +++ /dev/null @@ -1,102 +0,0 @@ -package hep.dataforge.control.ports - -import hep.dataforge.context.Global -import io.ktor.network.selector.ActorSelectorManager -import io.ktor.network.sockets.aSocket -import io.ktor.network.sockets.openReadChannel -import io.ktor.network.sockets.openWriteChannel -import io.ktor.util.KtorExperimentalAPI -import io.ktor.util.cio.write -import io.ktor.utils.io.readUTF8Line -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.collect -import org.junit.jupiter.api.Test -import java.net.InetSocketAddress - -@OptIn(KtorExperimentalAPI::class) -fun CoroutineScope.launchEchoServer(port: Int): Job = launch { - val server = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().bind(InetSocketAddress("localhost", port)) - println("Started echo telnet server at ${server.localAddress}") - - while (isActive) { - val socket = try { - server.accept() - } catch (ex: Exception) { - server.close() - return@launch - } - - launch { - println("Socket accepted: ${socket.remoteAddress}") - - try { - val input = socket.openReadChannel() - val output = socket.openWriteChannel(autoFlush = true) - - - while (isActive) { - val line = input.readUTF8Line() - - //println("${socket.remoteAddress}: $line") - output.write("[response] $line") - } - } catch (ex: Exception) { - cancel() - } finally { - socket.close() - } - } - } - -} - -class TcpPortTest { - @Test - fun testWithEchoServer() { - try { - runBlocking { - val server = launchEchoServer(22188) - val port = TcpPort.open(Global, "localhost", 22188) - - val logJob = launch { - port.receiving().collect { - println("Flow: ${it.decodeToString()}") - } - } - port.startJob.join() - port.send("aaa\n") - port.send("ddd\n") - - delay(200) - - cancel() - } - } catch (ex: Exception) { - if (ex !is CancellationException) throw ex - } - } - - @Test - fun testKtorWithEchoServer() { - try { - runBlocking { - val server = launchEchoServer(22188) - val port = KtorTcpPort.open(Global,"localhost", 22188) - - val logJob = launch { - port.receiving().collect { - println("Flow: ${it.decodeToString()}") - } - } - port.send("aaa\n") - port.send("ddd\n") - - delay(200) - - cancel() - } - } catch (ex: Exception) { - if (ex !is CancellationException) throw ex - } - } -} \ No newline at end of file diff --git a/dataforge-device-tcp/src/jvmTest/kotlin/hep/dataforge/control/sse/SseTest.kt b/dataforge-device-tcp/src/jvmTest/kotlin/hep/dataforge/control/sse/SseTest.kt deleted file mode 100644 index 86d6db0..0000000 --- a/dataforge-device-tcp/src/jvmTest/kotlin/hep/dataforge/control/sse/SseTest.kt +++ /dev/null @@ -1,76 +0,0 @@ -package hep.dataforge.control.sse - -import io.ktor.application.ApplicationCall -import io.ktor.application.call -import io.ktor.client.HttpClient -import io.ktor.client.call.receive -import io.ktor.client.request.get -import io.ktor.client.statement.HttpResponse -import io.ktor.client.statement.HttpStatement -import io.ktor.http.CacheControl -import io.ktor.http.ContentType -import io.ktor.response.cacheControl -import io.ktor.response.respondBytesWriter -import io.ktor.routing.get -import io.ktor.routing.routing -import io.ktor.server.cio.CIO -import io.ktor.server.engine.embeddedServer -import io.ktor.util.KtorExperimentalAPI -import io.ktor.utils.io.ByteReadChannel -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.map -import org.junit.jupiter.api.Test - -@OptIn(KtorExperimentalAPI::class) -suspend fun ApplicationCall.respondSse(events: Flow) { - response.cacheControl(CacheControl.NoCache(null)) - respondBytesWriter(contentType = ContentType.Text.EventStream) { - writeSseFlow(events) - } -} - -suspend fun HttpClient.readSse(address: String, block: suspend (SseEvent) -> Unit): Job = launch { - get(address).execute { response: HttpResponse -> - // Response is not downloaded here. - val channel = response.receive() - val flow = channel.readSseFlow() - flow.collect(block) - } -} - -class SseTest { - @OptIn(KtorExperimentalAPI::class) - @Test - fun testSseIntegration() { - runBlocking(Dispatchers.Default) { - val server = embeddedServer(CIO, 12080) { - routing { - get("/") { - val flow = flow { - repeat(5) { - delay(300) - emit(it) - } - }.map { - SseEvent(data = it.toString(), id = it.toString()) - } - call.respondSse(flow) - } - } - } - server.start(wait = false) - delay(1000) - val client = HttpClient(io.ktor.client.engine.cio.CIO) - client.readSse("http://localhost:12080") { - println(it) - } - delay(2000) - println("Closing the client after waiting") - client.close() - server.stop(1000, 1000) - } - } -} \ No newline at end of file diff --git a/dataforge-magix-client/build.gradle.kts b/dataforge-magix-client/build.gradle.kts index 679b062..1518204 100644 --- a/dataforge-magix-client/build.gradle.kts +++ b/dataforge-magix-client/build.gradle.kts @@ -5,6 +5,10 @@ plugins { val ktorVersion: String by rootProject.extra +repositories{ + maven("https://maven.pkg.github.com/altavir/ktor-client-sse") +} + kotlin { sourceSets { commonMain { @@ -12,11 +16,12 @@ kotlin { implementation(project(":dataforge-device-core")) implementation(project(":dataforge-device-tcp")) implementation("io.ktor:ktor-client-core:$ktorVersion") + implementation("ru.mipt.npm:ktor-client-sse:0.1.0") } } jvmMain { dependencies { - implementation("io.ktor:ktor-client-cio:$ktorVersion") + } } jsMain { diff --git a/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt b/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt index 0a8af4a..18ecf36 100644 --- a/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt +++ b/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt @@ -3,38 +3,23 @@ package hep.dataforge.control.client import hep.dataforge.control.controllers.DeviceManager import hep.dataforge.control.controllers.DeviceMessage import hep.dataforge.control.controllers.respondMessage -import hep.dataforge.control.sse.SseEvent -import hep.dataforge.control.sse.readSseFlow import hep.dataforge.meta.toJson import hep.dataforge.meta.toMeta import hep.dataforge.meta.wrap import io.ktor.client.HttpClient -import io.ktor.client.call.receive -import io.ktor.client.request.get import io.ktor.client.request.post -import io.ktor.client.statement.HttpResponse -import io.ktor.client.statement.HttpStatement import io.ktor.http.ContentType import io.ktor.http.Url import io.ktor.http.contentType -import io.ktor.utils.io.ByteReadChannel import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import kotlinx.serialization.json.* +import ru.mipt.npm.ktor.sse.readSse import kotlin.coroutines.CoroutineContext -private suspend fun HttpClient.readSse(address: String, block: suspend (SseEvent) -> Unit): Job = launch { - get(address).execute { response: HttpResponse -> - // Response is not downloaded here. - val channel = response.receive() - val flow = channel.readSseFlow() - flow.collect(block) - } -} - /* { "id":"string|number[optional, but desired]", @@ -53,17 +38,18 @@ private suspend fun HttpClient.readSse(address: String, block: suspend (SseEvent public class MagixClient( private val manager: DeviceManager, private val postUrl: Url, - private val sseUrl: Url + private val sseUrl: Url, //private val inbox: Flow -): CoroutineScope { +) : CoroutineScope { - override val coroutineContext: CoroutineContext = manager.context.coroutineContext + Job(manager.context.coroutineContext[Job]) + override val coroutineContext: CoroutineContext = + manager.context.coroutineContext + Job(manager.context.coroutineContext[Job]) private val client = HttpClient() - protected fun generateId(message: DeviceMessage, requestId: String?): String = if(requestId != null){ + protected fun generateId(message: DeviceMessage, requestId: String?): String = if (requestId != null) { "$requestId.response" - } else{ + } else { "df[${message.hashCode()}" } @@ -95,18 +81,18 @@ public class MagixClient( } private val respondJob = launch { - client.readSse(sseUrl.toString()){ + client.readSse(sseUrl.toString()) { val json = Json.parseToJsonElement(it.data) as JsonObject val requestId = json["id"]?.jsonPrimitive?.content val payload = json["payload"]?.jsonObject //TODO analyze action - if(payload != null){ + if (payload != null) { val meta = payload.toMeta() val request = DeviceMessage.wrap(meta) val response = manager.respondMessage(request) - send(wrapMessage(response,requestId)) + send(wrapMessage(response, requestId)) } else { TODO("process heartbeat and other system messages") } diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 12d38de..be52383 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.6.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-6.7-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/motors/build.gradle.kts b/motors/build.gradle.kts index eef45a2..cd2bb4b 100644 --- a/motors/build.gradle.kts +++ b/motors/build.gradle.kts @@ -3,10 +3,15 @@ import ru.mipt.npm.gradle.useFx plugins { id("ru.mipt.npm.jvm") id("ru.mipt.npm.publish") + application } //TODO to be moved to a separate project +application{ + mainClassName = "ru.mipt.npm.devices.pimotionmaster.PiMotionMasterAppKt" +} + kotlin{ explicitApi = null useFx(ru.mipt.npm.gradle.FXModule.CONTROLS, configuration = ru.mipt.npm.gradle.DependencyConfiguration.IMPLEMENTATION) diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterApp.kt b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterApp.kt index 9eda700..4763ac1 100644 --- a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterApp.kt +++ b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterApp.kt @@ -111,7 +111,7 @@ class PiMotionMasterView : View() { action { if (!debugServerStarted.get()) { debugServerJobProperty.value = - controller.context.launchPiDebugServer(port.get(), listOf("1", "2")) + controller.context.launchPiDebugServer(port.get(), listOf("1", "2", "3", "4")) } else { debugServerJobProperty.get().cancel() debugServerJobProperty.value = null diff --git a/settings.gradle.kts b/settings.gradle.kts index ad4e337..62c2f90 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -3,10 +3,10 @@ pluginManagement { val toolsVersion = "0.6.3-dev-1.4.20-M1" repositories { - //mavenLocal() + mavenLocal() jcenter() gradlePluginPortal() - maven("https://kotlin.bintray.com/kotlinx") + //maven("https://kotlin.bintray.com/kotlinx") maven("https://dl.bintray.com/kotlin/kotlin-eap") maven("https://dl.bintray.com/mipt-npm/dataforge") maven("https://dl.bintray.com/mipt-npm/kscience") @@ -32,7 +32,7 @@ include( ":dataforge-device-serial", ":dataforge-device-server", ":dataforge-magix-client", - ":demo", +// ":demo", ":motors" )