22 KiB
22 KiB
In [ ]:
%use coroutines
Три оси многозадачности¶
Синхронный х однопоточный х неконкурентный¶
In [ ]:
repeat(10){ println("Line number $it") }
In [ ]:
import java.util.stream.* IntStream.range(0, 100).sum()
Синхронный х однопоточный х конкурентный¶
In [1]:
val list = (0..100).toMutableList() list.forEach { if(it % 2 == 0) list.remove(it) }
null java.util.ConcurrentModificationException at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1095) at java.base/java.util.ArrayList$Itr.next(ArrayList.java:1049) at Line_0_jupyter.<init>(Line_0.jupyter.kts:6) at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62) at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486) at kotlin.script.experimental.jvm.BasicJvmScriptEvaluator.evalWithConfigAndOtherScriptsResults(BasicJvmScriptEvaluator.kt:122) at kotlin.script.experimental.jvm.BasicJvmScriptEvaluator.invoke$suspendImpl(BasicJvmScriptEvaluator.kt:48) at kotlin.script.experimental.jvm.BasicJvmScriptEvaluator.invoke(BasicJvmScriptEvaluator.kt) at kotlin.script.experimental.jvm.BasicJvmReplEvaluator.eval(BasicJvmReplEvaluator.kt:49) at org.jetbrains.kotlinx.jupyter.repl.impl.InternalEvaluatorImpl$eval$resultWithDiagnostics$1.invokeSuspend(InternalEvaluatorImpl.kt:127) at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:104) at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:277) at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:95) at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:69) at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source) at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:48) at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source) at org.jetbrains.kotlinx.jupyter.repl.impl.InternalEvaluatorImpl.eval(InternalEvaluatorImpl.kt:127) at org.jetbrains.kotlinx.jupyter.repl.impl.CellExecutorImpl$execute$1$result$1.invoke(CellExecutorImpl.kt:79) at org.jetbrains.kotlinx.jupyter.repl.impl.CellExecutorImpl$execute$1$result$1.invoke(CellExecutorImpl.kt:77) at org.jetbrains.kotlinx.jupyter.repl.impl.ReplForJupyterImpl.withHost(ReplForJupyterImpl.kt:758) at org.jetbrains.kotlinx.jupyter.repl.impl.CellExecutorImpl.execute(CellExecutorImpl.kt:77) at org.jetbrains.kotlinx.jupyter.repl.execution.CellExecutor$DefaultImpls.execute$default(CellExecutor.kt:12) at org.jetbrains.kotlinx.jupyter.repl.impl.ReplForJupyterImpl.evaluateUserCode(ReplForJupyterImpl.kt:581) at org.jetbrains.kotlinx.jupyter.repl.impl.ReplForJupyterImpl.access$evaluateUserCode(ReplForJupyterImpl.kt:136) at org.jetbrains.kotlinx.jupyter.repl.impl.ReplForJupyterImpl$evalEx$1.invoke(ReplForJupyterImpl.kt:439) at org.jetbrains.kotlinx.jupyter.repl.impl.ReplForJupyterImpl$evalEx$1.invoke(ReplForJupyterImpl.kt:436) at org.jetbrains.kotlinx.jupyter.repl.impl.ReplForJupyterImpl.withEvalContext(ReplForJupyterImpl.kt:417) at org.jetbrains.kotlinx.jupyter.repl.impl.ReplForJupyterImpl.evalEx(ReplForJupyterImpl.kt:436) at org.jetbrains.kotlinx.jupyter.messaging.IdeCompatibleMessageRequestProcessor$processExecuteRequest$1$response$1$1.invoke(IdeCompatibleMessageRequestProcessor.kt:140) at org.jetbrains.kotlinx.jupyter.messaging.IdeCompatibleMessageRequestProcessor$processExecuteRequest$1$response$1$1.invoke(IdeCompatibleMessageRequestProcessor.kt:139) at org.jetbrains.kotlinx.jupyter.execution.JupyterExecutorImpl$Task.execute(JupyterExecutorImpl.kt:42) at org.jetbrains.kotlinx.jupyter.execution.JupyterExecutorImpl$executorThread$1.invoke(JupyterExecutorImpl.kt:82) at org.jetbrains.kotlinx.jupyter.execution.JupyterExecutorImpl$executorThread$1.invoke(JupyterExecutorImpl.kt:80) at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)
In [ ]:
Синхронный х многопоточный х неконкурентный¶
In [ ]:
val sum = IntStream.range(0, 100).parallel().sum() println(sum)
In [ ]:
Асинхронный х однопоточный х неконкурентный¶
In [ ]:
fun runLongTask(result: Int = 8): Int { Thread.sleep(100) println("Task complete: $result") return result }
In [ ]:
import java.util.concurrent.* val executor = Executors.newSingleThreadExecutor() val future = executor.submit<Int> { val result = runLongTask() println(result) result } future.get()
Асинхронный х однопоточный х конкурентный¶
In [ ]:
interface Observable{ fun onChange(callback: (Int) -> Unit) } val observable: Observable by lazy{ TODO() }
In [ ]:
val list: MutableList<Int> = mutableListOf<Int>() observable.onChange { list.add(it) }
Асинхронный х многопоточный х неконкурентный¶
In [ ]:
In [ ]:
val executor: ExecutorService = Executors.newFixedThreadPool(4) repeat(8){ executor.submit{ runLongTask(it) } }
Синхронный х многопоточный х конкурентный¶
In [ ]:
import kotlin.io.path.Path import kotlin.io.path.writeText val file = Path("someFile.txt") val data: List<String> = emptyList() data.stream() .parallel() .forEach { file.writeText(it) }
Асинхронный х многопоточный х конкурентный¶
In [ ]:
val cache = mutableMapOf<Int, Int> val consumer: (Int) -> Unit = TODO() repeat(8){ i-> executor.submit{ val result = cache.getOrPut(i){ runLongTask(i) } consumer(result) } }
Инструменты¶
Thread¶
In [ ]:
import kotlin.concurrent.* val t = thread { runLongTask() } t.join()
In [ ]:
Future¶
In [ ]:
val future = executor.submit<Int>{ runLongTask() } future.get() future.cancel(true)
CompletableFuture¶
In [ ]:
import java.util.concurrent.* val cf = CompletableFuture.supplyAsync{ runLongTask() } val cf2 = cf.whenComplete{ res, _ -> runLongTask(res - 1) } val cf3 = cf.whenComplete{ res, _ -> runLongTask(res + 1) } cf2.join() cf3.join()
Lock¶
In [ ]:
import java.util.concurrent.locks.* val list = mutableListOf<Int>() val lock = ReentrantLock() val cf4 = CompletableFuture.supplyAsync{ val res = runLongTask(4) lock.withLock { list.add(res) } } val cf5 = CompletableFuture.supplyAsync{ val res = runLongTask(5) lock.withLock { list.add(res) } } cf4.join() cf5.join() list
Reactive streams
In [ ]:
class A(val value: Int) class B(val value: Int) val aFlow = flowOf<A>() val bFlow = flowOf<B>() val combined = aFlow.zip(bFlow){ a, b-> a.value + b.value }.debounce(2.seconds)
Actor
In [ ]:
sealed class Event { class Open(val transactionId: Int): Event() class Close(val tranactionId: Int): Event() } fun interface Actor { suspend fun receive(event: Event) } val actor = object : Actor { private var transaction: Int? = null override suspend fun receive(event: Event) { when (event) { is Event.Open -> if (transaction != null) { error("Transaction already open") } else { transaction = event.transactionId } is Event.Close ->if (transaction != event.tranactionId) { error("Wrong transaction id: ${event.tranactionId}") } else { transaction = null } } } }
Сорта асинхронности¶
Callback¶
In [ ]:
import java.net.* import java.net.http.* import java.net.http.HttpResponse.* val client: HttpClient = HttpClient.newHttpClient() val request : HttpRequest = HttpRequest.newBuilder() .uri(URI.create("https://sciprog.center")) .GET().build() val regex = """href=\"(.+\.png)\"""".toRegex() client.sendAsync(request, BodyHandlers.ofString()) .thenApply{ it.body() } .thenApply{ val resources = regex.findAll(it).map{it.groupValues[1]} resources.forEach { resourceName-> val resourceRequest : HttpRequest = HttpRequest.newBuilder() .uri(URI.create("https://sciprog.center$resourceName")) .GET().build() client.sendAsync(resourceRequest, BodyHandlers.ofByteArray()).thenAccept{ resourceResponse -> val bodyBytes = resourceResponse.body() //do something with the body } } resources } .thenAccept{ println(it.toList()) } .join()
Реактивные потоки¶
In [ ]: