From 598e1c931c4745f2360e7af8a54e3cb293d05fa3 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Thu, 12 Dec 2024 20:50:35 +0300 Subject: [PATCH] add examples for services --- build.gradle.kts | 49 +++++++++--- gradle/libs.versions.toml | 30 +++++++ src/main/kotlin/aggregatorClient.kt | 54 +++++++++++++ src/main/kotlin/examples/exceptionHandling.kt | 34 ++++++++ .../kotlin/{ => examples}/javaHttpClient.kt | 12 ++- src/main/kotlin/examples/jobExample.kt | 32 ++++++++ src/main/kotlin/{ => examples}/ktorClient.kt | 6 +- src/main/kotlin/microservice/asyncResponse.kt | 45 +++++++++++ .../kotlin/microservice/pricingService.kt | 69 ++++++++++++++++ src/main/kotlin/serverMain.kt | 78 +++++++++++++++++++ src/main/kotlin/streamingModule.kt | 39 ++++++++++ 11 files changed, 431 insertions(+), 17 deletions(-) create mode 100644 gradle/libs.versions.toml create mode 100644 src/main/kotlin/aggregatorClient.kt create mode 100644 src/main/kotlin/examples/exceptionHandling.kt rename src/main/kotlin/{ => examples}/javaHttpClient.kt (78%) create mode 100644 src/main/kotlin/examples/jobExample.kt rename src/main/kotlin/{ => examples}/ktorClient.kt (96%) create mode 100644 src/main/kotlin/microservice/asyncResponse.kt create mode 100644 src/main/kotlin/microservice/pricingService.kt create mode 100644 src/main/kotlin/serverMain.kt create mode 100644 src/main/kotlin/streamingModule.kt diff --git a/build.gradle.kts b/build.gradle.kts index 8406961..1125edb 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,20 +1,47 @@ plugins{ - kotlin("jvm") version "2.0.20" + alias(libs.plugins.kotlin.jvm) + alias(libs.plugins.ktor) } -repositories{ +group = "center.sciprog.demo" +version = "0.0.1" + +application { + mainClass.set("serverMainKt") + + val isDevelopment: Boolean = project.ext.has("development") + applicationDefaultJvmArgs = listOf("-Dio.ktor.development=$isDevelopment") +} + +repositories { mavenCentral() } -val ktorVersion = "3.0.1" - dependencies { - implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0") + implementation(libs.kotlinx.html) + implementation(libs.kotlin.css) + implementation(libs.logback.classic) + implementation(libs.kotlinx.datetime) - implementation("io.ktor:ktor-client-core:$ktorVersion") - implementation("io.ktor:ktor-client-cio:$ktorVersion") - implementation("io.ktor:ktor-client-core-jvm:3.0.1") - implementation("io.ktor:ktor-client-apache:3.0.1") - implementation("ch.qos.logback:logback-classic:1.5.12") -} \ No newline at end of file + implementation(libs.ktor.server.core) + implementation(libs.ktor.server.websockets) + implementation(libs.ktor.server.html.builder) + implementation(libs.ktor.server.call.logging) + implementation(libs.ktor.server.cors) + implementation(libs.ktor.server.host.common) + implementation(libs.ktor.server.cio) + + implementation(libs.ktor.client.core) + implementation(libs.ktor.client.cio) + implementation(libs.ktor.client.websockets) + +// implementation("io.ktor:ktor-client-core:$ktorVersion") +// implementation("io.ktor:ktor-client-cio:$ktorVersion") +// implementation("io.ktor:ktor-client-core-jvm:3.0.1") +// implementation("io.ktor:ktor-client-apache:3.0.1") + + testImplementation(libs.ktor.server.test.host) + testImplementation(libs.kotlin.test.junit) +} + diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml new file mode 100644 index 0000000..50cb2c0 --- /dev/null +++ b/gradle/libs.versions.toml @@ -0,0 +1,30 @@ +[versions] +kotlin-version = "2.0.21" +kotlinx-html-version = "0.11.0" +ktor-version = "3.0.1" +logback-version = "1.4.14" + +[libraries] +ktor-server-core = { module = "io.ktor:ktor-server-core-jvm", version.ref = "ktor-version" } +ktor-server-websockets = { module = "io.ktor:ktor-server-websockets-jvm", version.ref = "ktor-version" } +ktor-server-html-builder = { module = "io.ktor:ktor-server-html-builder-jvm", version.ref = "ktor-version" } +kotlinx-html = { module = "org.jetbrains.kotlinx:kotlinx-html-jvm", version.ref = "kotlinx-html-version" } +kotlin-css = { module = "org.jetbrains:kotlin-css-jvm", version = "1.0.0-pre.129-kotlin-1.4.20" } +ktor-server-call-logging = { module = "io.ktor:ktor-server-call-logging-jvm", version.ref = "ktor-version" } +ktor-server-cors = { module = "io.ktor:ktor-server-cors-jvm", version.ref = "ktor-version" } +ktor-server-host-common = { module = "io.ktor:ktor-server-host-common-jvm", version.ref = "ktor-version" } +ktor-server-cio = { module = "io.ktor:ktor-server-cio-jvm", version.ref = "ktor-version" } +logback-classic = { module = "ch.qos.logback:logback-classic", version.ref = "logback-version" } +ktor-server-test-host = { module = "io.ktor:ktor-server-test-host-jvm", version.ref = "ktor-version" } +kotlin-test-junit = { module = "org.jetbrains.kotlin:kotlin-test-junit", version.ref = "kotlin-version" } +kotlinx-datetime = "org.jetbrains.kotlinx:kotlinx-datetime:0.6.1" + +ktor-client-core = { module = "io.ktor:ktor-client-core", version.ref = "ktor-version" } +ktor-client-cio = { module = "io.ktor:ktor-client-cio", version.ref = "ktor-version" } +ktor-client-apache = { module = "io.ktor:ktor-client-apache", version.ref = "ktor-version" } +ktor-client-websockets = { module = "io.ktor:ktor-client-websockets", version.ref = "ktor-version" } + + +[plugins] +kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin-version" } +ktor = { id = "io.ktor.plugin", version.ref = "ktor-version" } diff --git a/src/main/kotlin/aggregatorClient.kt b/src/main/kotlin/aggregatorClient.kt new file mode 100644 index 0000000..5b92517 --- /dev/null +++ b/src/main/kotlin/aggregatorClient.kt @@ -0,0 +1,54 @@ +import io.ktor.client.HttpClient +import io.ktor.client.plugins.websocket.WebSockets +import io.ktor.client.plugins.websocket.webSocket +import io.ktor.websocket.Frame +import io.ktor.websocket.readText +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.launch +import java.time.Instant +import io.ktor.client.engine.cio.CIO as ClientCIO + +suspend fun CoroutineScope.aggregateFromService(url: String): List { + val client = HttpClient(ClientCIO) { + install(WebSockets) + } + + val result = CompletableDeferred>() + + launch { + client.webSocket(url) { + val res = incoming.receiveAsFlow() + .filterIsInstance() + .take(3) + .map { Instant.parse(it.readText()) } + .toList() + + result.complete(res) + } + } + + return result.await() +} + +//suspend fun aggregateFromServiceAsync(url: String): Deferred> { +// val client = HttpClient(ClientCIO) { +// install(WebSockets) +// } +// +// val result = CompletableDeferred>() +// +// client.webSocket(url) { +// val res = incoming.consumeAsFlow() +// .filterIsInstance() +// .take(3) +// .map { Instant.parse(it.readText()) } +// .toList() +// +// result.complete(res) +// } +// +// return result +//} \ No newline at end of file diff --git a/src/main/kotlin/examples/exceptionHandling.kt b/src/main/kotlin/examples/exceptionHandling.kt new file mode 100644 index 0000000..d15bf55 --- /dev/null +++ b/src/main/kotlin/examples/exceptionHandling.kt @@ -0,0 +1,34 @@ +package examples + +import kotlinx.coroutines.* + + +fun main(): Unit = runBlocking{ + val masterJob = launch( + CoroutineExceptionHandler { coroutineContext, throwable -> + println(throwable) + } + ){ + supervisorScope { + val subJob = launch { +// println(coroutineContext[Job]) + delay(100) + println("Interrupting") + error("BOOM!") + } + + val subDeferred = async { + delay(200) + println("Task completed") + "Completed" + } + +// subJob.await() + println(subDeferred.await()) + } + } +// delay(50) +// masterJob.cancel() + masterJob.join() + println("Master job joined") +} \ No newline at end of file diff --git a/src/main/kotlin/javaHttpClient.kt b/src/main/kotlin/examples/javaHttpClient.kt similarity index 78% rename from src/main/kotlin/javaHttpClient.kt rename to src/main/kotlin/examples/javaHttpClient.kt index f1b5446..d8c3fa4 100644 --- a/src/main/kotlin/javaHttpClient.kt +++ b/src/main/kotlin/examples/javaHttpClient.kt @@ -1,3 +1,5 @@ +package examples + import java.net.* import java.net.http.* import java.net.http.HttpResponse.* @@ -13,11 +15,11 @@ fun main() { client.sendAsync(request, BodyHandlers.ofString()) .thenApply{ it.body() } - .thenApply{ - val resources = regex.findAll(it).map{it.groupValues[1]} + .thenApply{ body -> + val resources = regex.findAll(body).map{it.groupValues[1]} - resources.map { resourceName-> - println("Resource processing for $resourceName started") + resources.toList().map { resourceName-> + println("Resource processing for $resourceName") val resourceRequest : HttpRequest = HttpRequest.newBuilder() .uri(URI.create("https://sciprog.center$resourceName")) .GET().build() @@ -26,6 +28,8 @@ fun main() { //do something with the body println("The resource with name $resourceName has ${bodyBytes.size} bytes") } + }.forEach { + it.join() } resources } diff --git a/src/main/kotlin/examples/jobExample.kt b/src/main/kotlin/examples/jobExample.kt new file mode 100644 index 0000000..e33e50f --- /dev/null +++ b/src/main/kotlin/examples/jobExample.kt @@ -0,0 +1,32 @@ +package examples + +import kotlinx.coroutines.async +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking + + +fun main() = runBlocking { + + val job1 = launch { + delay(200) + println("Job1 completed") + } + + val deferred1 = async { + delay(200) + return@async "Complete" + } + + val job2 = launch { + delay(100) + job1.cancel() + println("Job1 canceled") + deferred1.cancel() + println("Deferred1 canceled") + } + + job1.join() + println("Job1 joined") + println(deferred1.await()) +} \ No newline at end of file diff --git a/src/main/kotlin/ktorClient.kt b/src/main/kotlin/examples/ktorClient.kt similarity index 96% rename from src/main/kotlin/ktorClient.kt rename to src/main/kotlin/examples/ktorClient.kt index 8eeaf4b..ecb2bd8 100644 --- a/src/main/kotlin/ktorClient.kt +++ b/src/main/kotlin/examples/ktorClient.kt @@ -1,5 +1,6 @@ +package examples + import io.ktor.client.HttpClient -import io.ktor.client.call.body import io.ktor.client.engine.cio.CIO import io.ktor.client.request.get import io.ktor.client.statement.bodyAsBytes @@ -23,4 +24,5 @@ suspend fun main() = coroutineScope { println("The resource with name $resourceName has ${bodyBytes.size} bytes") } } -} \ No newline at end of file +} + diff --git a/src/main/kotlin/microservice/asyncResponse.kt b/src/main/kotlin/microservice/asyncResponse.kt new file mode 100644 index 0000000..a9e8295 --- /dev/null +++ b/src/main/kotlin/microservice/asyncResponse.kt @@ -0,0 +1,45 @@ +package microservice + +typealias RequestArgs = Map +typealias ComputationData = Map +typealias Response = Map + +data class ServerRequest( + val user: String, + val token: String, + val action: String, + val arguments: RequestArgs +) + +interface ValidationService { + suspend fun isValid(user: String, token: String, action: String): Boolean +} + +interface DataBaseService { + suspend fun provide(arguments: RequestArgs): ComputationData +} + +interface ComputationService { + suspend fun compute(action: String, arguments: ComputationData): Response +} + +class ComputationContext( + val validationService: ValidationService, + val dataBaseServiceA: DataBaseService, + val dataBaseServiceB: DataBaseService, + val dataBaseServiceC: DataBaseService, + val computationService: ComputationService +) + +suspend fun ComputationContext.respond(request: ServerRequest): Response { + val isValid = validationService.isValid(request.user, request.token, request.action) + if (isValid) { + val dataA = dataBaseServiceA.provide(request.arguments) + val dataB = dataBaseServiceB.provide(request.arguments) + val dataC = dataBaseServiceC.provide(request.arguments) + val result = computationService.compute(request.action, dataA + dataB + dataC) + return result + } else { + error("Illegal access") + } +} \ No newline at end of file diff --git a/src/main/kotlin/microservice/pricingService.kt b/src/main/kotlin/microservice/pricingService.kt new file mode 100644 index 0000000..1944cb6 --- /dev/null +++ b/src/main/kotlin/microservice/pricingService.kt @@ -0,0 +1,69 @@ +package microservice + +import kotlinx.coroutines.* +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant +import java.math.BigDecimal +import kotlin.time.Duration +import kotlin.time.Duration.Companion.minutes +import kotlin.time.Duration.Companion.seconds + +interface PriceReader { + suspend fun readPrice(): BigDecimal +} + +interface PriceWriter { + suspend fun writePrice(price: BigDecimal) +} + +interface PriceDB { + suspend fun storePrice(time: Instant, price: BigDecimal) + + suspend fun restorePrice(timeRange: ClosedRange): List> +} + + +private suspend fun computePrice(history: List>): BigDecimal = TODO() + +private val readDelay = 5.seconds +private val writeDelay = 10.seconds + +fun CoroutineScope.launchPriceUpdate( + reader: PriceReader, + writer: PriceWriter, + db: PriceDB, + duration: Duration +): Job = launch { + val cache = ArrayDeque>() + + + //read job + launch { + while (isActive) { + delay(readDelay) + val now = Clock.System.now() + val price = reader.readPrice() + db.storePrice(now, price) + cache.addFirst(now to price) + } + } + + //write job + launch { + while (isActive) { + delay(writeDelay) + val now = Clock.System.now() + val computedPrice = computePrice(cache.filter { it.first > (now - duration) }) + writer.writePrice(computedPrice) + } + } + + //cache cleanup job + launch { + while (isActive) { + delay(1.minutes) + val now = Clock.System.now() + cache.removeIf { now - it.first > duration } + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/serverMain.kt b/src/main/kotlin/serverMain.kt new file mode 100644 index 0000000..a60650b --- /dev/null +++ b/src/main/kotlin/serverMain.kt @@ -0,0 +1,78 @@ +import io.ktor.http.HttpStatusCode +import io.ktor.server.application.install +import io.ktor.server.cio.CIO +import io.ktor.server.engine.embeddedServer +import io.ktor.server.request.receiveText +import io.ktor.server.response.respond +import io.ktor.server.response.respondText +import io.ktor.server.routing.Route +import io.ktor.server.routing.get +import io.ktor.server.routing.post +import io.ktor.server.routing.route +import io.ktor.server.routing.routing +import io.ktor.server.websocket.WebSockets +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.newFixedThreadPoolContext +import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withPermit +import kotlinx.coroutines.withContext +import kotlin.coroutines.CoroutineContext + +fun Route.subroute(){ + var content = "" + + get("get") { + call.respondText("Content: ${content}") + } + + post("set"){ + content = call.receiveText() + call.respond(HttpStatusCode.OK) + } +} + +suspend fun executeQuery(queryName: String, ioContext: CoroutineContext = Dispatchers.IO): String{ + withContext(ioContext){ + //some blocking logic + } + return queryName +} + + +fun main() { + embeddedServer(CIO, port = 8080, host = "localhost") { + install(WebSockets) + + val ioContext = newFixedThreadPoolContext(12, "DB") //Dispatchers.IO + + routing { + get("/") { + val callerName = call.queryParameters["name"] ?: "World" + call.respondText("Hello $callerName!") + } + get("/query/{queryName}"){ + val queryName = call.parameters["queryName"] ?: "query" + val queryResult = executeQuery(queryName, ioContext) + call.respondText("$queryName successful: $queryResult") + } + route("subroute"){ + subroute() + } + route("subroute1"){ + subroute() + } + + route("producer"){ + streamingModule() + } + + get("aggregated"){ + val result = aggregateFromService("ws://localhost:8080/producer") + call.respondText(result.toString()) + } + + } + }.start(wait = true) +} \ No newline at end of file diff --git a/src/main/kotlin/streamingModule.kt b/src/main/kotlin/streamingModule.kt new file mode 100644 index 0000000..fa5b117 --- /dev/null +++ b/src/main/kotlin/streamingModule.kt @@ -0,0 +1,39 @@ +import io.ktor.server.routing.Route +import io.ktor.server.routing.application +import io.ktor.server.websocket.webSocket +import io.ktor.websocket.Frame +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import java.time.Instant +import kotlin.time.Duration.Companion.microseconds +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds + + +fun Route.streamingModule() { + val channel = Channel() + + application.launch { + while (isActive) { + delay(0.1.seconds) + channel.send(Instant.now()) + } + } + + webSocket { + repeat(3){ + delay(100.milliseconds) + outgoing.send(Frame.Text(Instant.now().toString())) + + } + +// launch { +// while (isActive) { +// outgoing.send(Frame.Text(channel.receive().toString())) +// } +// } + } +} +