Concurrent map for flows
This commit is contained in:
parent
6f4f658030
commit
f79a9e86a1
@ -89,8 +89,8 @@ suspend fun <T> AsyncFlow<T>.collect(concurrency: Int, action: suspend (value: T
|
|||||||
|
|
||||||
@FlowPreview
|
@FlowPreview
|
||||||
fun <T, R> Flow<T>.map(
|
fun <T, R> Flow<T>.map(
|
||||||
concurrencyLevel: Int,
|
dispatcher: CoroutineDispatcher,
|
||||||
dispatcher: CoroutineDispatcher = Dispatchers.Default,
|
concurrencyLevel: Int = 16,
|
||||||
bufferSize: Int = concurrencyLevel,
|
bufferSize: Int = concurrencyLevel,
|
||||||
transform: suspend (T) -> R
|
transform: suspend (T) -> R
|
||||||
): Flow<R> {
|
): Flow<R> {
|
||||||
|
@ -6,6 +6,7 @@ import org.junit.Test
|
|||||||
import scientifik.kmath.async
|
import scientifik.kmath.async
|
||||||
import scientifik.kmath.collect
|
import scientifik.kmath.collect
|
||||||
import scientifik.kmath.map
|
import scientifik.kmath.map
|
||||||
|
import java.util.concurrent.Executors
|
||||||
|
|
||||||
|
|
||||||
@ExperimentalCoroutinesApi
|
@ExperimentalCoroutinesApi
|
||||||
@ -13,30 +14,32 @@ import scientifik.kmath.map
|
|||||||
@FlowPreview
|
@FlowPreview
|
||||||
class BufferFlowTest {
|
class BufferFlowTest {
|
||||||
|
|
||||||
|
val dispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
|
||||||
|
|
||||||
@Test(timeout = 2000)
|
@Test(timeout = 2000)
|
||||||
fun concurrentMap() {
|
fun map() {
|
||||||
runBlocking {
|
runBlocking {
|
||||||
(1..20).asFlow().map(4) {
|
(1..20).asFlow().map( dispatcher) {
|
||||||
println("Started $it")
|
println("Started $it on ${Thread.currentThread().name}")
|
||||||
@Suppress("BlockingMethodInNonBlockingContext")
|
@Suppress("BlockingMethodInNonBlockingContext")
|
||||||
Thread.sleep(200)
|
Thread.sleep(200)
|
||||||
it
|
it
|
||||||
}.collect {
|
}.collect {
|
||||||
println("Completed $it")
|
println("Completed $it on ${Thread.currentThread().name}")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 2000)
|
@Test(timeout = 2000)
|
||||||
fun mapParallel() {
|
fun async() {
|
||||||
runBlocking {
|
runBlocking {
|
||||||
(1..20).asFlow().async {
|
(1..20).asFlow().async(dispatcher) {
|
||||||
println("Started $it")
|
println("Started $it on ${Thread.currentThread().name}")
|
||||||
@Suppress("BlockingMethodInNonBlockingContext")
|
@Suppress("BlockingMethodInNonBlockingContext")
|
||||||
Thread.sleep(200)
|
Thread.sleep(200)
|
||||||
it
|
it
|
||||||
}.collect(4) {
|
}.collect(4) {
|
||||||
println("Completed $it")
|
println("Completed $it on ${Thread.currentThread().name}")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user