diff --git a/build.gradle.kts b/build.gradle.kts index 1125edb..44b5006 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -19,7 +19,7 @@ repositories { dependencies { implementation(libs.kotlinx.html) - implementation(libs.kotlin.css) +// implementation(libs.kotlin.css) implementation(libs.logback.classic) implementation(libs.kotlinx.datetime) @@ -44,4 +44,3 @@ dependencies { testImplementation(libs.ktor.server.test.host) testImplementation(libs.kotlin.test.junit) } - diff --git a/src/main/kotlin/aggregatorClient.kt b/src/main/kotlin/aggregatorClient.kt index 5b92517..74b1350 100644 --- a/src/main/kotlin/aggregatorClient.kt +++ b/src/main/kotlin/aggregatorClient.kt @@ -20,6 +20,7 @@ suspend fun CoroutineScope.aggregateFromService(url: String): List { launch { client.webSocket(url) { + outgoing.send(Frame.Text("Connected")) val res = incoming.receiveAsFlow() .filterIsInstance() .take(3) diff --git a/src/main/kotlin/examples/ApplicaionWithScope.kt b/src/main/kotlin/examples/ApplicaionWithScope.kt new file mode 100644 index 0000000..c1e2d79 --- /dev/null +++ b/src/main/kotlin/examples/ApplicaionWithScope.kt @@ -0,0 +1,26 @@ +package examples + +import kotlinx.coroutines.* +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import kotlin.coroutines.CoroutineContext + +class ApplicationWithScope( + parentContext: CoroutineContext +) : CoroutineScope, AutoCloseable { + override val coroutineContext: CoroutineContext = parentContext + + Executors.newSingleThreadExecutor().asCoroutineDispatcher() + + SupervisorJob(parentContext[Job]) + + CoroutineExceptionHandler { _, exception -> } + + CoroutineName("ApplicationWithScope") + + override fun close() { + cancel() + } +} + +fun ApplicationWithScope.launchJob() = launch { } + +fun ApplicationWithScope(){ + CoroutineScope(Dispatchers.Default) +} \ No newline at end of file diff --git a/src/main/kotlin/examples/badRunBlocking.kt b/src/main/kotlin/examples/badRunBlocking.kt new file mode 100644 index 0000000..b03d5bb --- /dev/null +++ b/src/main/kotlin/examples/badRunBlocking.kt @@ -0,0 +1,20 @@ +package examples + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext + +private fun doSomethingBlocking(){ + runBlocking { + println("Blocking") + } +} + + +fun main() { + runBlocking { + withContext(Dispatchers.Default) { + doSomethingBlocking() + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/examples/coroutineContextDemo.kt b/src/main/kotlin/examples/coroutineContextDemo.kt new file mode 100644 index 0000000..c4d5d82 --- /dev/null +++ b/src/main/kotlin/examples/coroutineContextDemo.kt @@ -0,0 +1,33 @@ +package examples + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.coroutineContext + +class ContextLogger(val name: String) : CoroutineContext.Element { + override val key: CoroutineContext.Key<*> get() = Key + + fun log(level: String, messageBuilder: () -> String) { + println("[$name] $level: ${messageBuilder()}") + } + + companion object Key : CoroutineContext.Key +} + +suspend fun log(level: String = "INFO", messageBuilder: () -> String) { + coroutineContext[ContextLogger]?.log(level, messageBuilder) +} + + +suspend fun main(): Unit = coroutineScope { + withContext(ContextLogger("TEST") + Dispatchers.Default) { + launch(ContextLogger("TEST2")) { + log { "Hello, World!" } + } + log { "Bye, World!" } + coroutineContext[ContextLogger] + } +} \ No newline at end of file diff --git a/src/main/kotlin/examples/exceptionHandling.kt b/src/main/kotlin/examples/exceptionHandling.kt index d15bf55..51fcc65 100644 --- a/src/main/kotlin/examples/exceptionHandling.kt +++ b/src/main/kotlin/examples/exceptionHandling.kt @@ -3,13 +3,13 @@ package examples import kotlinx.coroutines.* -fun main(): Unit = runBlocking{ +suspend fun main(): Unit = coroutineScope { val masterJob = launch( - CoroutineExceptionHandler { coroutineContext, throwable -> - println(throwable) - } - ){ - supervisorScope { +// CoroutineExceptionHandler { coroutineContext, throwable -> +// println(throwable) +// } + ) { + coroutineScope { val subJob = launch { // println(coroutineContext[Job]) delay(100) diff --git a/src/main/kotlin/examples/globalScopeError.kt b/src/main/kotlin/examples/globalScopeError.kt new file mode 100644 index 0000000..466a32c --- /dev/null +++ b/src/main/kotlin/examples/globalScopeError.kt @@ -0,0 +1,24 @@ +package examples + +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking + +fun main(): Unit = runBlocking { + + + val job1 = GlobalScope.launch { + delay(100) + error("BOOM!") + } + + val job2 = GlobalScope.launch { + delay(200) + println("complete") + } + + job1.join() + job2.join() + +} \ No newline at end of file diff --git a/src/main/kotlin/examples/jobExample.kt b/src/main/kotlin/examples/jobExample.kt index e33e50f..2f8c00a 100644 --- a/src/main/kotlin/examples/jobExample.kt +++ b/src/main/kotlin/examples/jobExample.kt @@ -1,19 +1,17 @@ package examples -import kotlinx.coroutines.async -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.* -fun main() = runBlocking { +suspend fun main() = withContext(Dispatchers.Unconfined) { + println(currentCoroutineContext()) - val job1 = launch { + val job1: Job = launch { delay(200) println("Job1 completed") } - val deferred1 = async { + val deferred1: Deferred = async { delay(200) return@async "Complete" } diff --git a/src/main/kotlin/microservice/asyncResponse.kt b/src/main/kotlin/microservice/asyncResponse.kt index a9e8295..66c1495 100644 --- a/src/main/kotlin/microservice/asyncResponse.kt +++ b/src/main/kotlin/microservice/asyncResponse.kt @@ -1,5 +1,9 @@ package microservice +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.async +import kotlinx.coroutines.cancel + typealias RequestArgs = Map typealias ComputationData = Map typealias Response = Map @@ -24,20 +28,24 @@ interface ComputationService { } class ComputationContext( + val coroutineScope: CoroutineScope, val validationService: ValidationService, val dataBaseServiceA: DataBaseService, val dataBaseServiceB: DataBaseService, val dataBaseServiceC: DataBaseService, val computationService: ComputationService -) +) : CoroutineScope by coroutineScope suspend fun ComputationContext.respond(request: ServerRequest): Response { val isValid = validationService.isValid(request.user, request.token, request.action) if (isValid) { - val dataA = dataBaseServiceA.provide(request.arguments) - val dataB = dataBaseServiceB.provide(request.arguments) - val dataC = dataBaseServiceC.provide(request.arguments) - val result = computationService.compute(request.action, dataA + dataB + dataC) + val dataA = async { dataBaseServiceA.provide(request.arguments) } + val dataB = async { dataBaseServiceB.provide(request.arguments) } + val dataC = async { dataBaseServiceC.provide(request.arguments) } + val result = computationService.compute( + request.action, + dataA.await() + dataB.await() + dataC.await() + ) return result } else { error("Illegal access") diff --git a/src/main/kotlin/microservice/pricingFlow.kt b/src/main/kotlin/microservice/pricingFlow.kt new file mode 100644 index 0000000..c635262 --- /dev/null +++ b/src/main/kotlin/microservice/pricingFlow.kt @@ -0,0 +1,64 @@ +package microservice + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.* +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant +import java.math.BigDecimal +import kotlin.time.Duration + +fun CoroutineScope.launchPriceUpdateFlow( + reader: PriceReader, + writer: PriceWriter, + db: PriceDB, + duration: Duration +): Job { + val dbChannel = Channel>() + + dbChannel.consumeAsFlow().buffer(1000).onEach { (now, price) -> + db.storePrice(now, price) + }.catch { + + }.launchIn(this) + +// val pricesFlow = MutableSharedFlow>() +// +// pricesFlow.onEach { +// println(it) +// }.launchIn(this) +// +// pricesFlow.buffer(1000, onBufferOverflow = BufferOverflow.DROP_LATEST).onEach { (now, price) -> +// db.storePrice(now, price) +// }.launchIn(this) + + + return flow> { + while (true) { + delay(readDelay) + val now = Clock.System.now() + val price = reader.readPrice() + emit(now to price) + } + }.flowOn(Dispatchers.IO).onEach { pair -> + //pricesFlow.emit(pair) + dbChannel.send(pair) + //db.storePrice(now, price) + }.runningFold(ArrayDeque>()) { deque, item -> + deque.addFirst(item) + //TODO add cleanup + deque + }.sample(writeDelay).onEach { deque -> + val now = Clock.System.now() + val computedPrice = computePrice(deque.filter { it.first > (now - duration) }) + writer.writePrice(computedPrice) + }.catch { + println(it.message) + }.launchIn(this) +} + + diff --git a/src/main/kotlin/microservice/pricingService.kt b/src/main/kotlin/microservice/pricingService.kt index a0a8f20..96f35ed 100644 --- a/src/main/kotlin/microservice/pricingService.kt +++ b/src/main/kotlin/microservice/pricingService.kt @@ -1,6 +1,8 @@ package microservice import kotlinx.coroutines.* +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlinx.datetime.Clock import kotlinx.datetime.Instant import java.math.BigDecimal @@ -23,10 +25,10 @@ interface PriceDB { } -private suspend fun computePrice(history: List>): BigDecimal = TODO() +suspend fun computePrice(history: List>): BigDecimal = TODO() -private val readDelay = 5.seconds -private val writeDelay = 10.seconds +val readDelay = 5.seconds +val writeDelay = 10.seconds fun CoroutineScope.launchPriceUpdate( reader: PriceReader, @@ -35,6 +37,7 @@ fun CoroutineScope.launchPriceUpdate( duration: Duration ): Job = launch { val cache = ArrayDeque>() + val mutex = Mutex() //restore from db val now = Clock.System.now() @@ -44,10 +47,20 @@ fun CoroutineScope.launchPriceUpdate( launch { while (isActive) { delay(readDelay) - val now = Clock.System.now() - val price = reader.readPrice() - db.storePrice(now, price) - cache.addFirst(now to price) + try { + + val now = Clock.System.now() + val price = reader.readPrice() + + launch(Dispatchers.IO) { + db.storePrice(now, price) + } + mutex.withLock { + cache.addFirst(now to price) + } + } catch (e: Exception) { + e.printStackTrace() + } } } @@ -66,7 +79,9 @@ fun CoroutineScope.launchPriceUpdate( while (isActive) { delay(10.minutes) val now = Clock.System.now() - cache.removeIf { now - it.first > duration } + mutex.withLock { + cache.removeIf { now - it.first > duration } + } } } } \ No newline at end of file diff --git a/src/main/kotlin/serverMain.kt b/src/main/kotlin/serverMain.kt index a60650b..f1bf0bc 100644 --- a/src/main/kotlin/serverMain.kt +++ b/src/main/kotlin/serverMain.kt @@ -2,6 +2,7 @@ import io.ktor.http.HttpStatusCode import io.ktor.server.application.install import io.ktor.server.cio.CIO import io.ktor.server.engine.embeddedServer +import io.ktor.server.html.respondHtml import io.ktor.server.request.receiveText import io.ktor.server.response.respond import io.ktor.server.response.respondText @@ -14,17 +15,20 @@ import io.ktor.server.websocket.WebSockets import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.launch import kotlinx.coroutines.newFixedThreadPoolContext import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.sync.withPermit import kotlinx.coroutines.withContext +import kotlinx.html.body +import kotlinx.html.h1 import kotlin.coroutines.CoroutineContext -fun Route.subroute(){ +fun Route.subroute(key: String){ var content = "" get("get") { - call.respondText("Content: ${content}") + call.respondText("Content[$key]: ${content}") } post("set"){ @@ -35,13 +39,14 @@ fun Route.subroute(){ suspend fun executeQuery(queryName: String, ioContext: CoroutineContext = Dispatchers.IO): String{ withContext(ioContext){ + Thread.sleep(1000) //some blocking logic } return queryName } -fun main() { +suspend fun main():Unit = coroutineScope{ embeddedServer(CIO, port = 8080, host = "localhost") { install(WebSockets) @@ -50,18 +55,26 @@ fun main() { routing { get("/") { val callerName = call.queryParameters["name"] ?: "World" +// call.respondHtml { +// body{ +// h1("Hello $callerName!") +// } +// } call.respondText("Hello $callerName!") } + get("/query/{queryName}"){ val queryName = call.parameters["queryName"] ?: "query" val queryResult = executeQuery(queryName, ioContext) call.respondText("$queryName successful: $queryResult") } + route("subroute"){ - subroute() + subroute("0") } + route("subroute1"){ - subroute() + subroute("1") } route("producer"){ @@ -74,5 +87,5 @@ fun main() { } } - }.start(wait = true) + }.start() } \ No newline at end of file diff --git a/src/main/kotlin/streamingModule.kt b/src/main/kotlin/streamingModule.kt index fa5b117..484ee2f 100644 --- a/src/main/kotlin/streamingModule.kt +++ b/src/main/kotlin/streamingModule.kt @@ -3,6 +3,7 @@ import io.ktor.server.routing.application import io.ktor.server.websocket.webSocket import io.ktor.websocket.Frame import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch @@ -23,10 +24,14 @@ fun Route.streamingModule() { } webSocket { + launch { + incoming.consumeEach { frame -> + println(frame) + } + } repeat(3){ delay(100.milliseconds) outgoing.send(Frame.Text(Instant.now().toString())) - } // launch {