diff --git a/gradle.properties b/gradle.properties index 3f32a52..a3293d9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,2 @@ kotlin.code.style=official -ktor_version=2.3.12 -kotlin_version=2.0.10 -logback_version=1.4.14 -kotlinx_html_version=0.10.1 + diff --git a/src/main/kotlin/Application.kt b/src/main/kotlin/Application.kt index a062f94..2e98582 100644 --- a/src/main/kotlin/Application.kt +++ b/src/main/kotlin/Application.kt @@ -7,10 +7,7 @@ import io.ktor.http.HttpStatusCode import io.ktor.serialization.deserialize import io.ktor.serialization.kotlinx.KotlinxWebsocketSerializationConverter import io.ktor.serialization.kotlinx.json.json -import io.ktor.server.application.Application -import io.ktor.server.application.ApplicationCall -import io.ktor.server.application.call -import io.ktor.server.application.install +import io.ktor.server.application.* import io.ktor.server.cio.CIO import io.ktor.server.engine.embeddedServer import io.ktor.server.html.respondHtml @@ -18,10 +15,15 @@ import io.ktor.server.plugins.contentnegotiation.ContentNegotiation import io.ktor.server.plugins.cors.routing.CORS import io.ktor.server.plugins.statuspages.StatusPages import io.ktor.server.response.respondText +import io.ktor.server.routing.Route import io.ktor.server.routing.get import io.ktor.server.routing.routing import io.ktor.server.websocket.* +import kotlinx.coroutines.CoroutineExceptionHandler +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch @@ -36,10 +38,10 @@ import java.time.Duration fun main(args: Array) { embeddedServer(CIO, port = 8080) { module() - } + }.start(true) } -suspend inline fun ApplicationCall.respondCss(builder: CSSBuilder.() -> Unit) { +private suspend inline fun ApplicationCall.respondCss(builder: CSSBuilder.() -> Unit) { this.respondText(CSSBuilder().apply(builder).toString(), ContentType.Text.CSS) } @@ -115,37 +117,54 @@ fun Application.module() { } } } + businessLogic() + } +} - val mutex = Mutex() - val data = mutableMapOf() - val outBox = MutableSharedFlow() - val runner: TaskRunner = TaskRunner.ECHO +fun Route.businessLogic() { - webSocket("/ws") { // websocketSession - val converter = converter ?: error("Converter is null") + val mutex = Mutex() + val data = mutableMapOf() - outBox.onEach { - sendSerialized(it) - }.launchIn(this) + val outBox = MutableSharedFlow(100) - for (frame in incoming) { - when (val event: UserEvent = converter.deserialize(frame)) { - is UserDataEvent -> mutex.withLock { - data[event.key] = event.value + val runner: TaskRunner = TaskRunner.GROUP_ARGS + + webSocket("/ws") { // websocketSession + val converter = converter ?: error("Converter is null") + + //launch sending from outbox in a separate coroutine + val sendJob = outBox.onEach { + sendSerialized(it) + }.catch { + application.log.error("Error on send", it) + }.launchIn(this) + + for (frame in incoming) { + when (val event: UserEvent = converter.deserialize(frame)) { + is UserDataEvent -> mutex.withLock { + data[event.key] = event.value + } + + is UserStartTaskEvent -> { + + val snapshot = mutex.withLock { + // val snapshot = LinkedHashMap(data) + // data.clear() + // snapshot + HashMap(data).also { data.clear() } } - is UserStartTaskEvent -> launch { //FIXME incorrect scope - mutex.withLock { - val snapshot = HashMap(data) - data.clear() - val taskDef = TaskDefinition(event.taskId, snapshot) - + application.launch(Dispatchers.Default) { + val taskDef = TaskDefinition(event.taskId, snapshot) + try { val result = runner.runTask(taskDef) val resultEvent = ServerTaskCompleteEvent(taskDef, result) outBox.emit(resultEvent) + } catch (e: Exception) { + outBox.emit(ServerTaskErrorEvent(taskDef, e.message ?: "")) } } - } } } diff --git a/src/main/kotlin/TaskRunner.kt b/src/main/kotlin/TaskRunner.kt index 6c52a89..0ffca54 100644 --- a/src/main/kotlin/TaskRunner.kt +++ b/src/main/kotlin/TaskRunner.kt @@ -1,8 +1,12 @@ package center.sciprog.ktor.sample +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.async +import kotlinx.coroutines.delay import kotlinx.serialization.Serializable import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.buildJsonObject +import kotlin.time.Duration.Companion.milliseconds @Serializable data class TaskDefinition(val id: String, val arguments: Map) @@ -12,12 +16,24 @@ fun interface TaskRunner { suspend fun runTask(definition: TaskDefinition): JsonElement companion object { - val ECHO = TaskRunner { definition: TaskDefinition -> + val GROUP_ARGS = TaskRunner { definition: TaskDefinition -> + delay(100.milliseconds) buildJsonObject { definition.arguments.forEach { put(it.key, it.value) } } } + + fun scopedRunner(scope: CoroutineScope) = TaskRunner { definition -> + scope.async { + delay(100.milliseconds) + buildJsonObject { + definition.arguments.forEach { + put(it.key, it.value) + } + } + }.await() + } } } \ No newline at end of file diff --git a/src/main/kotlin/events.kt b/src/main/kotlin/events.kt index e634508..38f5391 100644 --- a/src/main/kotlin/events.kt +++ b/src/main/kotlin/events.kt @@ -12,8 +12,13 @@ data class UserDataEvent(val key: String, val value: JsonElement) : UserEvent @Serializable data class UserStartTaskEvent(val taskId: String) : UserEvent + + @Serializable sealed interface ServerEvent @Serializable -data class ServerTaskCompleteEvent(val taskDefinition: TaskDefinition, val result: JsonElement) : ServerEvent \ No newline at end of file +data class ServerTaskCompleteEvent(val taskDefinition: TaskDefinition, val result: JsonElement) : ServerEvent + +@Serializable +data class ServerTaskErrorEvent(val taskDefinition: TaskDefinition, val message: String) : ServerEvent \ No newline at end of file