From f79a9e86a1806637e7b6f2f0704276e55dd4b7a8 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Mon, 29 Apr 2019 22:14:04 +0300 Subject: [PATCH] Concurrent map for flows --- .../scientifik/kmath/CoroutinesExtra.kt | 4 ++-- .../kmath/streaming/BufferFlowTest.kt | 19 +++++++++++-------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/CoroutinesExtra.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/CoroutinesExtra.kt index 1290e6ea6..51cc07511 100644 --- a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/CoroutinesExtra.kt +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/CoroutinesExtra.kt @@ -89,8 +89,8 @@ suspend fun AsyncFlow.collect(concurrency: Int, action: suspend (value: T @FlowPreview fun Flow.map( - concurrencyLevel: Int, - dispatcher: CoroutineDispatcher = Dispatchers.Default, + dispatcher: CoroutineDispatcher, + concurrencyLevel: Int = 16, bufferSize: Int = concurrencyLevel, transform: suspend (T) -> R ): Flow { diff --git a/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/BufferFlowTest.kt b/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/BufferFlowTest.kt index 5b04886d7..63b716c01 100644 --- a/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/BufferFlowTest.kt +++ b/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/BufferFlowTest.kt @@ -6,6 +6,7 @@ import org.junit.Test import scientifik.kmath.async import scientifik.kmath.collect import scientifik.kmath.map +import java.util.concurrent.Executors @ExperimentalCoroutinesApi @@ -13,30 +14,32 @@ import scientifik.kmath.map @FlowPreview class BufferFlowTest { + val dispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher() + @Test(timeout = 2000) - fun concurrentMap() { + fun map() { runBlocking { - (1..20).asFlow().map(4) { - println("Started $it") + (1..20).asFlow().map( dispatcher) { + println("Started $it on ${Thread.currentThread().name}") @Suppress("BlockingMethodInNonBlockingContext") Thread.sleep(200) it }.collect { - println("Completed $it") + println("Completed $it on ${Thread.currentThread().name}") } } } @Test(timeout = 2000) - fun mapParallel() { + fun async() { runBlocking { - (1..20).asFlow().async { - println("Started $it") + (1..20).asFlow().async(dispatcher) { + println("Started $it on ${Thread.currentThread().name}") @Suppress("BlockingMethodInNonBlockingContext") Thread.sleep(200) it }.collect(4) { - println("Completed $it") + println("Completed $it on ${Thread.currentThread().name}") } } }