Update to coroutines

This commit is contained in:
Alexander Nozik 2024-08-20 12:13:06 +03:00
parent 13874bde53
commit 3e1280efed
4 changed files with 69 additions and 32 deletions

View File

@ -1,5 +1,2 @@
kotlin.code.style=official
ktor_version=2.3.12
kotlin_version=2.0.10
logback_version=1.4.14
kotlinx_html_version=0.10.1

View File

@ -7,10 +7,7 @@ import io.ktor.http.HttpStatusCode
import io.ktor.serialization.deserialize
import io.ktor.serialization.kotlinx.KotlinxWebsocketSerializationConverter
import io.ktor.serialization.kotlinx.json.json
import io.ktor.server.application.Application
import io.ktor.server.application.ApplicationCall
import io.ktor.server.application.call
import io.ktor.server.application.install
import io.ktor.server.application.*
import io.ktor.server.cio.CIO
import io.ktor.server.engine.embeddedServer
import io.ktor.server.html.respondHtml
@ -18,10 +15,15 @@ import io.ktor.server.plugins.contentnegotiation.ContentNegotiation
import io.ktor.server.plugins.cors.routing.CORS
import io.ktor.server.plugins.statuspages.StatusPages
import io.ktor.server.response.respondText
import io.ktor.server.routing.Route
import io.ktor.server.routing.get
import io.ktor.server.routing.routing
import io.ktor.server.websocket.*
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
@ -36,10 +38,10 @@ import java.time.Duration
fun main(args: Array<String>) {
embeddedServer(CIO, port = 8080) {
module()
}
}.start(true)
}
suspend inline fun ApplicationCall.respondCss(builder: CSSBuilder.() -> Unit) {
private suspend inline fun ApplicationCall.respondCss(builder: CSSBuilder.() -> Unit) {
this.respondText(CSSBuilder().apply(builder).toString(), ContentType.Text.CSS)
}
@ -115,37 +117,54 @@ fun Application.module() {
}
}
}
businessLogic()
}
}
val mutex = Mutex()
val data = mutableMapOf<String, JsonElement>()
val outBox = MutableSharedFlow<ServerEvent>()
val runner: TaskRunner = TaskRunner.ECHO
fun Route.businessLogic() {
webSocket("/ws") { // websocketSession
val converter = converter ?: error("Converter is null")
val mutex = Mutex()
val data = mutableMapOf<String, JsonElement>()
outBox.onEach {
sendSerialized(it)
}.launchIn(this)
val outBox = MutableSharedFlow<ServerEvent>(100)
for (frame in incoming) {
when (val event: UserEvent = converter.deserialize(frame)) {
is UserDataEvent -> mutex.withLock {
data[event.key] = event.value
val runner: TaskRunner = TaskRunner.GROUP_ARGS
webSocket("/ws") { // websocketSession
val converter = converter ?: error("Converter is null")
//launch sending from outbox in a separate coroutine
val sendJob = outBox.onEach {
sendSerialized(it)
}.catch {
application.log.error("Error on send", it)
}.launchIn(this)
for (frame in incoming) {
when (val event: UserEvent = converter.deserialize(frame)) {
is UserDataEvent -> mutex.withLock {
data[event.key] = event.value
}
is UserStartTaskEvent -> {
val snapshot = mutex.withLock {
// val snapshot = LinkedHashMap(data)
// data.clear()
// snapshot
HashMap(data).also { data.clear() }
}
is UserStartTaskEvent -> launch { //FIXME incorrect scope
mutex.withLock {
val snapshot = HashMap(data)
data.clear()
val taskDef = TaskDefinition(event.taskId, snapshot)
application.launch(Dispatchers.Default) {
val taskDef = TaskDefinition(event.taskId, snapshot)
try {
val result = runner.runTask(taskDef)
val resultEvent = ServerTaskCompleteEvent(taskDef, result)
outBox.emit(resultEvent)
} catch (e: Exception) {
outBox.emit(ServerTaskErrorEvent(taskDef, e.message ?: ""))
}
}
}
}
}

View File

@ -1,8 +1,12 @@
package center.sciprog.ktor.sample
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.buildJsonObject
import kotlin.time.Duration.Companion.milliseconds
@Serializable
data class TaskDefinition(val id: String, val arguments: Map<String, JsonElement>)
@ -12,12 +16,24 @@ fun interface TaskRunner {
suspend fun runTask(definition: TaskDefinition): JsonElement
companion object {
val ECHO = TaskRunner { definition: TaskDefinition ->
val GROUP_ARGS = TaskRunner { definition: TaskDefinition ->
delay(100.milliseconds)
buildJsonObject {
definition.arguments.forEach {
put(it.key, it.value)
}
}
}
fun scopedRunner(scope: CoroutineScope) = TaskRunner { definition ->
scope.async {
delay(100.milliseconds)
buildJsonObject {
definition.arguments.forEach {
put(it.key, it.value)
}
}
}.await()
}
}
}

View File

@ -12,8 +12,13 @@ data class UserDataEvent(val key: String, val value: JsonElement) : UserEvent
@Serializable
data class UserStartTaskEvent(val taskId: String) : UserEvent
@Serializable
sealed interface ServerEvent
@Serializable
data class ServerTaskCompleteEvent(val taskDefinition: TaskDefinition, val result: JsonElement) : ServerEvent
data class ServerTaskCompleteEvent(val taskDefinition: TaskDefinition, val result: JsonElement) : ServerEvent
@Serializable
data class ServerTaskErrorEvent(val taskDefinition: TaskDefinition, val message: String) : ServerEvent