idiomatic-kotlin-course/notebooks/Multitasking.ipynb
2024-12-08 12:38:44 +03:00

18 KiB
Raw Blame History

In [ ]:
%use coroutines

Три оси многозадачности

Синхронный х однопоточный х неконкурентный

In [ ]:
repeat(10){
    println("Line number $it")
}
In [ ]:
import java.util.stream.*

IntStream.range(0, 100).sum()

Синхронный х однопоточный х конкурентный

In [ ]:
val list = (0..100).toMutableList()

list.forEach {
    if(it % 2 == 0) list.remove(it)
}
In [ ]:

Синхронный х многопоточный х неконкурентный

In [ ]:
val sum = IntStream.range(0, 100).parallel().sum()

println(sum)
In [ ]:

Асинхронный х однопоточный х неконкурентный

In [ ]:
fun runLongTask(result: Int = 8): Int {
  Thread.sleep(100)
  println("Task complete: $result")
  return result
}
In [ ]:
import java.util.concurrent.*

val executor = Executors.newSingleThreadExecutor()

val future = executor.submit<Int> {
    val result = runLongTask()
    println(result)
    result
}

future.get()

Асинхронный х однопоточный х конкурентный

In [ ]:
interface Observable{
    fun onChange(callback: (Int) -> Unit)
}

val observable: Observable by lazy{ TODO() }
In [ ]:
val list: MutableList<Int> = mutableListOf<Int>()

observable.onChange { 
    list.add(it)
}

Асинхронный х многопоточный х неконкурентный

In [ ]:

In [ ]:
val executor: ExecutorService = Executors.newFixedThreadPool(4)

repeat(8){
    executor.submit{
        runLongTask(it)
    }
}

Синхронный х многопоточный х конкурентный

In [ ]:
import kotlin.io.path.Path
import kotlin.io.path.writeText

val file = Path("someFile.txt")

val data: List<String> = emptyList()

data.stream()
    .parallel()
    .forEach {  
        file.writeText(it)
    }

Асинхронный х многопоточный х конкурентный

In [ ]:
val cache = mutableMapOf<Int, Int>

val consumer: (Int) -> Unit = TODO()


repeat(8){ i->
    executor.submit{
        val result = cache.getOrPut(i){
            runLongTask(i)
        }
        consumer(result)
    }
}

Инструменты

Thread

In [ ]:
import kotlin.concurrent.*

val t = thread {
    runLongTask()
}
t.join()
In [ ]:

Future

In [ ]:
val future = executor.submit<Int>{
    runLongTask()
}

future.get()
future.cancel(true)

CompletableFuture

In [ ]:
import java.util.concurrent.*

val cf = CompletableFuture.supplyAsync{
    runLongTask()
}

val cf2 = cf.whenComplete{ res, _ ->
    runLongTask(res - 1)
}

val cf3 = cf.whenComplete{ res, _ ->
    runLongTask(res + 1)
}

cf2.join()
cf3.join()

Lock

In [ ]:
import java.util.concurrent.locks.*

val list = mutableListOf<Int>()
val lock = ReentrantLock()

val cf4 = CompletableFuture.supplyAsync{
    val res = runLongTask(4)
    lock.withLock { 
        list.add(res)
    }
}

val cf5 = CompletableFuture.supplyAsync{
    val res = runLongTask(5)
    lock.withLock { 
        list.add(res)
    }
}

cf4.join()
cf5.join()

list

Reactive streams

In [ ]:
class A(val value: Int)
class B(val value: Int)

val aFlow = flowOf<A>()
val bFlow = flowOf<B>()

val combined = aFlow.zip(bFlow){ a, b->
    a.value + b.value
}.debounce(2.seconds)

Actor

In [ ]:
sealed class Event {
    class Open(val transactionId: Int): Event()
    class Close(val tranactionId: Int): Event()
}

fun interface Actor {
    suspend fun receive(event: Event)
}

val actor = object : Actor {
    private var transaction: Int? = null
    override suspend fun receive(event: Event) {
        when (event) {
            is Event.Open -> if (transaction != null) {
                error("Transaction already open")
            } else {
                transaction = event.transactionId
            }
            is Event.Close ->if (transaction != event.tranactionId) {
                error("Wrong transaction id: ${event.tranactionId}")
            } else {
                transaction = null
            }
        }
    }

}

Сорта асинхронности

Callback

In [ ]:
import java.net.*
import java.net.http.*
import java.net.http.HttpResponse.*

val client: HttpClient = HttpClient.newHttpClient()

val request :  HttpRequest = HttpRequest.newBuilder()
    .uri(URI.create("https://sciprog.center"))
    .GET().build()

val regex = """href=\"(.+\.png)\"""".toRegex()

client.sendAsync(request, BodyHandlers.ofString())
    .thenApply{ it.body() }
    .thenApply{ 
        val resources = regex.findAll(it).map{it.groupValues[1]}

        resources.forEach { resourceName->
            val resourceRequest :  HttpRequest = HttpRequest.newBuilder()
                .uri(URI.create("https://sciprog.center$resourceName"))
                .GET().build()
            client.sendAsync(resourceRequest, BodyHandlers.ofByteArray()).thenAccept{ resourceResponse ->
                val bodyBytes = resourceResponse.body()
                //do something with the body
            }
        }
        resources
    }
    .thenAccept{ println(it.toList()) }
    .join()

Реактивные потоки

In [ ]: