idiomatic-kotlin-course/notebooks/Multitasking.ipynb

22 KiB
Raw Permalink Blame History

In [ ]:
%use coroutines

Три оси многозадачности

Синхронный х однопоточный х неконкурентный

In [ ]:
repeat(10){
    println("Line number $it")
}
In [ ]:
import java.util.stream.*

IntStream.range(0, 100).sum()

Синхронный х однопоточный х конкурентный

In [1]:
val list = (0..100).toMutableList()

list.forEach {
    if(it % 2 == 0) list.remove(it)
}
null
java.util.ConcurrentModificationException
	at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1095)
	at java.base/java.util.ArrayList$Itr.next(ArrayList.java:1049)
	at Line_0_jupyter.<init>(Line_0.jupyter.kts:6)
	at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486)
	at kotlin.script.experimental.jvm.BasicJvmScriptEvaluator.evalWithConfigAndOtherScriptsResults(BasicJvmScriptEvaluator.kt:122)
	at kotlin.script.experimental.jvm.BasicJvmScriptEvaluator.invoke$suspendImpl(BasicJvmScriptEvaluator.kt:48)
	at kotlin.script.experimental.jvm.BasicJvmScriptEvaluator.invoke(BasicJvmScriptEvaluator.kt)
	at kotlin.script.experimental.jvm.BasicJvmReplEvaluator.eval(BasicJvmReplEvaluator.kt:49)
	at org.jetbrains.kotlinx.jupyter.repl.impl.InternalEvaluatorImpl$eval$resultWithDiagnostics$1.invokeSuspend(InternalEvaluatorImpl.kt:127)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:104)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:277)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:95)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:69)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:48)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at org.jetbrains.kotlinx.jupyter.repl.impl.InternalEvaluatorImpl.eval(InternalEvaluatorImpl.kt:127)
	at org.jetbrains.kotlinx.jupyter.repl.impl.CellExecutorImpl$execute$1$result$1.invoke(CellExecutorImpl.kt:79)
	at org.jetbrains.kotlinx.jupyter.repl.impl.CellExecutorImpl$execute$1$result$1.invoke(CellExecutorImpl.kt:77)
	at org.jetbrains.kotlinx.jupyter.repl.impl.ReplForJupyterImpl.withHost(ReplForJupyterImpl.kt:758)
	at org.jetbrains.kotlinx.jupyter.repl.impl.CellExecutorImpl.execute(CellExecutorImpl.kt:77)
	at org.jetbrains.kotlinx.jupyter.repl.execution.CellExecutor$DefaultImpls.execute$default(CellExecutor.kt:12)
	at org.jetbrains.kotlinx.jupyter.repl.impl.ReplForJupyterImpl.evaluateUserCode(ReplForJupyterImpl.kt:581)
	at org.jetbrains.kotlinx.jupyter.repl.impl.ReplForJupyterImpl.access$evaluateUserCode(ReplForJupyterImpl.kt:136)
	at org.jetbrains.kotlinx.jupyter.repl.impl.ReplForJupyterImpl$evalEx$1.invoke(ReplForJupyterImpl.kt:439)
	at org.jetbrains.kotlinx.jupyter.repl.impl.ReplForJupyterImpl$evalEx$1.invoke(ReplForJupyterImpl.kt:436)
	at org.jetbrains.kotlinx.jupyter.repl.impl.ReplForJupyterImpl.withEvalContext(ReplForJupyterImpl.kt:417)
	at org.jetbrains.kotlinx.jupyter.repl.impl.ReplForJupyterImpl.evalEx(ReplForJupyterImpl.kt:436)
	at org.jetbrains.kotlinx.jupyter.messaging.IdeCompatibleMessageRequestProcessor$processExecuteRequest$1$response$1$1.invoke(IdeCompatibleMessageRequestProcessor.kt:140)
	at org.jetbrains.kotlinx.jupyter.messaging.IdeCompatibleMessageRequestProcessor$processExecuteRequest$1$response$1$1.invoke(IdeCompatibleMessageRequestProcessor.kt:139)
	at org.jetbrains.kotlinx.jupyter.execution.JupyterExecutorImpl$Task.execute(JupyterExecutorImpl.kt:42)
	at org.jetbrains.kotlinx.jupyter.execution.JupyterExecutorImpl$executorThread$1.invoke(JupyterExecutorImpl.kt:82)
	at org.jetbrains.kotlinx.jupyter.execution.JupyterExecutorImpl$executorThread$1.invoke(JupyterExecutorImpl.kt:80)
	at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)
In [ ]:

Синхронный х многопоточный х неконкурентный

In [ ]:
val sum = IntStream.range(0, 100).parallel().sum()

println(sum)
In [ ]:

Асинхронный х однопоточный х неконкурентный

In [ ]:
fun runLongTask(result: Int = 8): Int {
  Thread.sleep(100)
  println("Task complete: $result")
  return result
}
In [ ]:
import java.util.concurrent.*

val executor = Executors.newSingleThreadExecutor()

val future = executor.submit<Int> {
    val result = runLongTask()
    println(result)
    result
}

future.get()

Асинхронный х однопоточный х конкурентный

In [ ]:
interface Observable{
    fun onChange(callback: (Int) -> Unit)
}

val observable: Observable by lazy{ TODO() }
In [ ]:
val list: MutableList<Int> = mutableListOf<Int>()

observable.onChange { 
    list.add(it)
}

Асинхронный х многопоточный х неконкурентный

In [ ]:

In [ ]:
val executor: ExecutorService = Executors.newFixedThreadPool(4)

repeat(8){
    executor.submit{
        runLongTask(it)
    }
}

Синхронный х многопоточный х конкурентный

In [ ]:
import kotlin.io.path.Path
import kotlin.io.path.writeText

val file = Path("someFile.txt")

val data: List<String> = emptyList()

data.stream()
    .parallel()
    .forEach {  
        file.writeText(it)
    }

Асинхронный х многопоточный х конкурентный

In [ ]:
val cache = mutableMapOf<Int, Int>

val consumer: (Int) -> Unit = TODO()


repeat(8){ i->
    executor.submit{
        val result = cache.getOrPut(i){
            runLongTask(i)
        }
        consumer(result)
    }
}

Инструменты

Thread

In [ ]:
import kotlin.concurrent.*

val t = thread {
    runLongTask()
}
t.join()
In [ ]:

Future

In [ ]:
val future = executor.submit<Int>{
    runLongTask()
}

future.get()
future.cancel(true)

CompletableFuture

In [ ]:
import java.util.concurrent.*

val cf = CompletableFuture.supplyAsync{
    runLongTask()
}

val cf2 = cf.whenComplete{ res, _ ->
    runLongTask(res - 1)
}

val cf3 = cf.whenComplete{ res, _ ->
    runLongTask(res + 1)
}

cf2.join()
cf3.join()

Lock

In [ ]:
import java.util.concurrent.locks.*

val list = mutableListOf<Int>()
val lock = ReentrantLock()

val cf4 = CompletableFuture.supplyAsync{
    val res = runLongTask(4)
    lock.withLock { 
        list.add(res)
    }
}

val cf5 = CompletableFuture.supplyAsync{
    val res = runLongTask(5)
    lock.withLock { 
        list.add(res)
    }
}

cf4.join()
cf5.join()

list

Reactive streams

In [ ]:
class A(val value: Int)
class B(val value: Int)

val aFlow = flowOf<A>()
val bFlow = flowOf<B>()

val combined = aFlow.zip(bFlow){ a, b->
    a.value + b.value
}.debounce(2.seconds)

Actor

In [ ]:
sealed class Event {
    class Open(val transactionId: Int): Event()
    class Close(val tranactionId: Int): Event()
}

fun interface Actor {
    suspend fun receive(event: Event)
}

val actor = object : Actor {
    private var transaction: Int? = null
    override suspend fun receive(event: Event) {
        when (event) {
            is Event.Open -> if (transaction != null) {
                error("Transaction already open")
            } else {
                transaction = event.transactionId
            }
            is Event.Close ->if (transaction != event.tranactionId) {
                error("Wrong transaction id: ${event.tranactionId}")
            } else {
                transaction = null
            }
        }
    }

}

Сорта асинхронности

Callback

In [ ]:
import java.net.*
import java.net.http.*
import java.net.http.HttpResponse.*

val client: HttpClient = HttpClient.newHttpClient()

val request :  HttpRequest = HttpRequest.newBuilder()
    .uri(URI.create("https://sciprog.center"))
    .GET().build()

val regex = """href=\"(.+\.png)\"""".toRegex()

client.sendAsync(request, BodyHandlers.ofString())
    .thenApply{ it.body() }
    .thenApply{ 
        val resources = regex.findAll(it).map{it.groupValues[1]}

        resources.forEach { resourceName->
            val resourceRequest :  HttpRequest = HttpRequest.newBuilder()
                .uri(URI.create("https://sciprog.center$resourceName"))
                .GET().build()
            client.sendAsync(resourceRequest, BodyHandlers.ofByteArray()).thenAccept{ resourceResponse ->
                val bodyBytes = resourceResponse.body()
                //do something with the body
            }
        }
        resources
    }
    .thenAccept{ println(it.toList()) }
    .join()

Реактивные потоки

In [ ]: