Examples after the course

This commit is contained in:
Alexander Nozik 2024-12-20 12:07:15 +03:00
parent b7279b55c2
commit 3fee214bc9
13 changed files with 241 additions and 35 deletions

View File

@ -19,7 +19,7 @@ repositories {
dependencies { dependencies {
implementation(libs.kotlinx.html) implementation(libs.kotlinx.html)
implementation(libs.kotlin.css) // implementation(libs.kotlin.css)
implementation(libs.logback.classic) implementation(libs.logback.classic)
implementation(libs.kotlinx.datetime) implementation(libs.kotlinx.datetime)
@ -44,4 +44,3 @@ dependencies {
testImplementation(libs.ktor.server.test.host) testImplementation(libs.ktor.server.test.host)
testImplementation(libs.kotlin.test.junit) testImplementation(libs.kotlin.test.junit)
} }

View File

@ -20,6 +20,7 @@ suspend fun CoroutineScope.aggregateFromService(url: String): List<Instant> {
launch { launch {
client.webSocket(url) { client.webSocket(url) {
outgoing.send(Frame.Text("Connected"))
val res = incoming.receiveAsFlow() val res = incoming.receiveAsFlow()
.filterIsInstance<Frame.Text>() .filterIsInstance<Frame.Text>()
.take(3) .take(3)

View File

@ -0,0 +1,26 @@
package examples
import kotlinx.coroutines.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.coroutines.CoroutineContext
class ApplicationWithScope(
parentContext: CoroutineContext
) : CoroutineScope, AutoCloseable {
override val coroutineContext: CoroutineContext = parentContext +
Executors.newSingleThreadExecutor().asCoroutineDispatcher() +
SupervisorJob(parentContext[Job]) +
CoroutineExceptionHandler { _, exception -> } +
CoroutineName("ApplicationWithScope")
override fun close() {
cancel()
}
}
fun ApplicationWithScope.launchJob() = launch { }
fun ApplicationWithScope(){
CoroutineScope(Dispatchers.Default)
}

View File

@ -0,0 +1,20 @@
package examples
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
private fun doSomethingBlocking(){
runBlocking {
println("Blocking")
}
}
fun main() {
runBlocking {
withContext(Dispatchers.Default) {
doSomethingBlocking()
}
}
}

View File

@ -0,0 +1,33 @@
package examples
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
class ContextLogger(val name: String) : CoroutineContext.Element {
override val key: CoroutineContext.Key<*> get() = Key
fun log(level: String, messageBuilder: () -> String) {
println("[$name] $level: ${messageBuilder()}")
}
companion object Key : CoroutineContext.Key<ContextLogger>
}
suspend fun log(level: String = "INFO", messageBuilder: () -> String) {
coroutineContext[ContextLogger]?.log(level, messageBuilder)
}
suspend fun main(): Unit = coroutineScope {
withContext(ContextLogger("TEST") + Dispatchers.Default) {
launch(ContextLogger("TEST2")) {
log { "Hello, World!" }
}
log { "Bye, World!" }
coroutineContext[ContextLogger]
}
}

View File

@ -3,13 +3,13 @@ package examples
import kotlinx.coroutines.* import kotlinx.coroutines.*
fun main(): Unit = runBlocking{ suspend fun main(): Unit = coroutineScope {
val masterJob = launch( val masterJob = launch(
CoroutineExceptionHandler { coroutineContext, throwable -> // CoroutineExceptionHandler { coroutineContext, throwable ->
println(throwable) // println(throwable)
} // }
) { ) {
supervisorScope { coroutineScope {
val subJob = launch { val subJob = launch {
// println(coroutineContext[Job]) // println(coroutineContext[Job])
delay(100) delay(100)

View File

@ -0,0 +1,24 @@
package examples
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun main(): Unit = runBlocking {
val job1 = GlobalScope.launch {
delay(100)
error("BOOM!")
}
val job2 = GlobalScope.launch {
delay(200)
println("complete")
}
job1.join()
job2.join()
}

View File

@ -1,19 +1,17 @@
package examples package examples
import kotlinx.coroutines.async import kotlinx.coroutines.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun main() = runBlocking { suspend fun main() = withContext(Dispatchers.Unconfined) {
println(currentCoroutineContext())
val job1 = launch { val job1: Job = launch {
delay(200) delay(200)
println("Job1 completed") println("Job1 completed")
} }
val deferred1 = async { val deferred1: Deferred<String> = async {
delay(200) delay(200)
return@async "Complete" return@async "Complete"
} }

View File

@ -1,5 +1,9 @@
package microservice package microservice
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
typealias RequestArgs = Map<String, String> typealias RequestArgs = Map<String, String>
typealias ComputationData = Map<String, String> typealias ComputationData = Map<String, String>
typealias Response = Map<String, String> typealias Response = Map<String, String>
@ -24,20 +28,24 @@ interface ComputationService {
} }
class ComputationContext( class ComputationContext(
val coroutineScope: CoroutineScope,
val validationService: ValidationService, val validationService: ValidationService,
val dataBaseServiceA: DataBaseService, val dataBaseServiceA: DataBaseService,
val dataBaseServiceB: DataBaseService, val dataBaseServiceB: DataBaseService,
val dataBaseServiceC: DataBaseService, val dataBaseServiceC: DataBaseService,
val computationService: ComputationService val computationService: ComputationService
) ) : CoroutineScope by coroutineScope
suspend fun ComputationContext.respond(request: ServerRequest): Response { suspend fun ComputationContext.respond(request: ServerRequest): Response {
val isValid = validationService.isValid(request.user, request.token, request.action) val isValid = validationService.isValid(request.user, request.token, request.action)
if (isValid) { if (isValid) {
val dataA = dataBaseServiceA.provide(request.arguments) val dataA = async { dataBaseServiceA.provide(request.arguments) }
val dataB = dataBaseServiceB.provide(request.arguments) val dataB = async { dataBaseServiceB.provide(request.arguments) }
val dataC = dataBaseServiceC.provide(request.arguments) val dataC = async { dataBaseServiceC.provide(request.arguments) }
val result = computationService.compute(request.action, dataA + dataB + dataC) val result = computationService.compute(
request.action,
dataA.await() + dataB.await() + dataC.await()
)
return result return result
} else { } else {
error("Illegal access") error("Illegal access")

View File

@ -0,0 +1,64 @@
package microservice
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import java.math.BigDecimal
import kotlin.time.Duration
fun CoroutineScope.launchPriceUpdateFlow(
reader: PriceReader,
writer: PriceWriter,
db: PriceDB,
duration: Duration
): Job {
val dbChannel = Channel<Pair<Instant, BigDecimal>>()
dbChannel.consumeAsFlow().buffer(1000).onEach { (now, price) ->
db.storePrice(now, price)
}.catch {
}.launchIn(this)
// val pricesFlow = MutableSharedFlow<Pair<Instant, BigDecimal>>()
//
// pricesFlow.onEach {
// println(it)
// }.launchIn(this)
//
// pricesFlow.buffer(1000, onBufferOverflow = BufferOverflow.DROP_LATEST).onEach { (now, price) ->
// db.storePrice(now, price)
// }.launchIn(this)
return flow<Pair<Instant, BigDecimal>> {
while (true) {
delay(readDelay)
val now = Clock.System.now()
val price = reader.readPrice()
emit(now to price)
}
}.flowOn(Dispatchers.IO).onEach { pair ->
//pricesFlow.emit(pair)
dbChannel.send(pair)
//db.storePrice(now, price)
}.runningFold(ArrayDeque<Pair<Instant, BigDecimal>>()) { deque, item ->
deque.addFirst(item)
//TODO add cleanup
deque
}.sample(writeDelay).onEach { deque ->
val now = Clock.System.now()
val computedPrice = computePrice(deque.filter { it.first > (now - duration) })
writer.writePrice(computedPrice)
}.catch {
println(it.message)
}.launchIn(this)
}

View File

@ -1,6 +1,8 @@
package microservice package microservice
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Clock import kotlinx.datetime.Clock
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
import java.math.BigDecimal import java.math.BigDecimal
@ -23,10 +25,10 @@ interface PriceDB {
} }
private suspend fun computePrice(history: List<Pair<Instant, BigDecimal>>): BigDecimal = TODO() suspend fun computePrice(history: List<Pair<Instant, BigDecimal>>): BigDecimal = TODO()
private val readDelay = 5.seconds val readDelay = 5.seconds
private val writeDelay = 10.seconds val writeDelay = 10.seconds
fun CoroutineScope.launchPriceUpdate( fun CoroutineScope.launchPriceUpdate(
reader: PriceReader, reader: PriceReader,
@ -35,6 +37,7 @@ fun CoroutineScope.launchPriceUpdate(
duration: Duration duration: Duration
): Job = launch { ): Job = launch {
val cache = ArrayDeque<Pair<Instant, BigDecimal>>() val cache = ArrayDeque<Pair<Instant, BigDecimal>>()
val mutex = Mutex()
//restore from db //restore from db
val now = Clock.System.now() val now = Clock.System.now()
@ -44,11 +47,21 @@ fun CoroutineScope.launchPriceUpdate(
launch { launch {
while (isActive) { while (isActive) {
delay(readDelay) delay(readDelay)
try {
val now = Clock.System.now() val now = Clock.System.now()
val price = reader.readPrice() val price = reader.readPrice()
launch(Dispatchers.IO) {
db.storePrice(now, price) db.storePrice(now, price)
}
mutex.withLock {
cache.addFirst(now to price) cache.addFirst(now to price)
} }
} catch (e: Exception) {
e.printStackTrace()
}
}
} }
//write job //write job
@ -66,7 +79,9 @@ fun CoroutineScope.launchPriceUpdate(
while (isActive) { while (isActive) {
delay(10.minutes) delay(10.minutes)
val now = Clock.System.now() val now = Clock.System.now()
mutex.withLock {
cache.removeIf { now - it.first > duration } cache.removeIf { now - it.first > duration }
} }
} }
} }
}

View File

@ -2,6 +2,7 @@ import io.ktor.http.HttpStatusCode
import io.ktor.server.application.install import io.ktor.server.application.install
import io.ktor.server.cio.CIO import io.ktor.server.cio.CIO
import io.ktor.server.engine.embeddedServer import io.ktor.server.engine.embeddedServer
import io.ktor.server.html.respondHtml
import io.ktor.server.request.receiveText import io.ktor.server.request.receiveText
import io.ktor.server.response.respond import io.ktor.server.response.respond
import io.ktor.server.response.respondText import io.ktor.server.response.respondText
@ -14,17 +15,20 @@ import io.ktor.server.websocket.WebSockets
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.launch
import kotlinx.coroutines.newFixedThreadPoolContext import kotlinx.coroutines.newFixedThreadPoolContext
import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit import kotlinx.coroutines.sync.withPermit
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import kotlinx.html.body
import kotlinx.html.h1
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
fun Route.subroute(){ fun Route.subroute(key: String){
var content = "" var content = ""
get("get") { get("get") {
call.respondText("Content: ${content}") call.respondText("Content[$key]: ${content}")
} }
post("set"){ post("set"){
@ -35,13 +39,14 @@ fun Route.subroute(){
suspend fun executeQuery(queryName: String, ioContext: CoroutineContext = Dispatchers.IO): String{ suspend fun executeQuery(queryName: String, ioContext: CoroutineContext = Dispatchers.IO): String{
withContext(ioContext){ withContext(ioContext){
Thread.sleep(1000)
//some blocking logic //some blocking logic
} }
return queryName return queryName
} }
fun main() { suspend fun main():Unit = coroutineScope{
embeddedServer(CIO, port = 8080, host = "localhost") { embeddedServer(CIO, port = 8080, host = "localhost") {
install(WebSockets) install(WebSockets)
@ -50,18 +55,26 @@ fun main() {
routing { routing {
get("/") { get("/") {
val callerName = call.queryParameters["name"] ?: "World" val callerName = call.queryParameters["name"] ?: "World"
// call.respondHtml {
// body{
// h1("Hello $callerName!")
// }
// }
call.respondText("Hello $callerName!") call.respondText("Hello $callerName!")
} }
get("/query/{queryName}"){ get("/query/{queryName}"){
val queryName = call.parameters["queryName"] ?: "query" val queryName = call.parameters["queryName"] ?: "query"
val queryResult = executeQuery(queryName, ioContext) val queryResult = executeQuery(queryName, ioContext)
call.respondText("$queryName successful: $queryResult") call.respondText("$queryName successful: $queryResult")
} }
route("subroute"){ route("subroute"){
subroute() subroute("0")
} }
route("subroute1"){ route("subroute1"){
subroute() subroute("1")
} }
route("producer"){ route("producer"){
@ -74,5 +87,5 @@ fun main() {
} }
} }
}.start(wait = true) }.start()
} }

View File

@ -3,6 +3,7 @@ import io.ktor.server.routing.application
import io.ktor.server.websocket.webSocket import io.ktor.server.websocket.webSocket
import io.ktor.websocket.Frame import io.ktor.websocket.Frame
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
@ -23,10 +24,14 @@ fun Route.streamingModule() {
} }
webSocket { webSocket {
launch {
incoming.consumeEach { frame ->
println(frame)
}
}
repeat(3){ repeat(3){
delay(100.milliseconds) delay(100.milliseconds)
outgoing.send(Frame.Text(Instant.now().toString())) outgoing.send(Frame.Text(Instant.now().toString()))
} }
// launch { // launch {