Concurrent map for flows

This commit is contained in:
Alexander Nozik 2019-04-29 21:48:05 +03:00
parent 3ddff86e24
commit 6f4f658030
2 changed files with 33 additions and 36 deletions

View File

@ -1,12 +1,8 @@
package scientifik.kmath
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.*
val Dispatchers.Math: CoroutineDispatcher get() = Dispatchers.Default
@ -17,7 +13,7 @@ internal class LazyDeferred<T>(val dispatcher: CoroutineDispatcher, val block: s
private var deferred: Deferred<T>? = null
internal fun start(scope: CoroutineScope) {
if(deferred==null) {
if (deferred == null) {
deferred = scope.async(dispatcher, block = block)
}
}
@ -60,7 +56,7 @@ suspend fun <T> AsyncFlow<T>.collect(concurrency: Int, collector: FlowCollector<
require(concurrency >= 1) { "Buffer size should be more than 1, but was $concurrency" }
coroutineScope {
//Starting up to N deferred coroutines ahead of time
val channel = produce(capacity = concurrency-1) {
val channel = produce(capacity = concurrency - 1) {
deferredFlow.collect { value ->
value.start(this@coroutineScope)
send(value)
@ -91,31 +87,16 @@ suspend fun <T> AsyncFlow<T>.collect(concurrency: Int, action: suspend (value: T
})
}
//suspend fun <T> Flow<T>.collect(concurrency: Int, dispatcher: CoroutineDispatcher, collector: FlowCollector<T>){
// require(concurrency >= 1) { "Buffer size should be more than 1, but was $concurrency" }
// coroutineScope {
// //Starting up to N deferred coroutines ahead of time
// val channel = produce(capacity = concurrency-1) {
// this@collect.
// deferredFlow.collect { value ->
// value.start(this@coroutineScope)
// send(value)
// }
// }
//
// (channel as Job).invokeOnCompletion {
// if (it is CancellationException && it.cause == null) cancel()
// }
//
// for (element in channel) {
// collector.emit(element.await())
// }
//
// val producer = channel as Job
// if (producer.isCancelled) {
// producer.join()
// //throw producer.getCancellationException()
// }
// }
//}
@FlowPreview
fun <T, R> Flow<T>.map(
concurrencyLevel: Int,
dispatcher: CoroutineDispatcher = Dispatchers.Default,
bufferSize: Int = concurrencyLevel,
transform: suspend (T) -> R
): Flow<R> {
return flatMapMerge(concurrencyLevel, bufferSize) { value ->
flow { emit(transform(value)) }
}.flowOn(dispatcher)
}

View File

@ -1,20 +1,36 @@
package scientifik.kmath.streaming
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.*
import org.junit.Test
import scientifik.kmath.async
import scientifik.kmath.collect
import scientifik.kmath.map
@ExperimentalCoroutinesApi
@InternalCoroutinesApi
@FlowPreview
class BufferFlowTest {
@Test(timeout = 2000)
fun concurrentMap() {
runBlocking {
(1..20).asFlow().map(4) {
println("Started $it")
@Suppress("BlockingMethodInNonBlockingContext")
Thread.sleep(200)
it
}.collect {
println("Completed $it")
}
}
}
@Test(timeout = 2000)
fun mapParallel() {
runBlocking {
(1..20).asFlow().async(Dispatchers.Default) {
(1..20).asFlow().async {
println("Started $it")
@Suppress("BlockingMethodInNonBlockingContext")
Thread.sleep(200)