Migrating streaming to Flow

This commit is contained in:
Alexander Nozik 2019-04-27 22:15:21 +03:00
parent 68ccc0b3fc
commit 08e14b15c5
7 changed files with 81 additions and 530 deletions

View File

@ -1,9 +1,73 @@
package scientifik.kmath
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.flow.*
val Dispatchers.Math: CoroutineDispatcher get() = Dispatchers.Default
val Dispatchers.Math: CoroutineDispatcher get() = Dispatchers.Default
@FlowPreview
inline class AsyncFlow<T>(val deferredFlow: Flow<Deferred<T>>) : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
deferredFlow.collect {
collector.emit((it.await()))
}
}
}
@FlowPreview
fun <T, R> Flow<T>.async(
dispatcher: CoroutineDispatcher = Dispatchers.Default,
block: suspend (T) -> R
): AsyncFlow<R> {
val flow = map {
coroutineScope {
async(dispatcher, start = CoroutineStart.LAZY) { block(it) }
}
}
return AsyncFlow(flow)
}
@FlowPreview
fun <T, R> AsyncFlow<T>.map(action: (T) -> R) = deferredFlow.map { input ->
coroutineScope {
async(start = CoroutineStart.LAZY) { action(input.await()) }
}
}
@ExperimentalCoroutinesApi
@FlowPreview
suspend fun <T> AsyncFlow<T>.collect(concurrency: Int, collector: FlowCollector<T>){
require(concurrency >= 0) { "Buffer size should be positive, but was $concurrency" }
coroutineScope {
//Starting up to N deferred coroutines ahead of time
val channel = produce(capacity = concurrency) {
deferredFlow.collect { value ->
value.start()
send(value)
}
}
(channel as Job).invokeOnCompletion {
if (it is CancellationException && it.cause == null) cancel()
}
for (element in channel) {
collector.emit(element.await())
}
val producer = channel as Job
if (producer.isCancelled) {
producer.join()
//throw producer.getCancellationException()
}
}
}
@ExperimentalCoroutinesApi
@FlowPreview
suspend fun <T> AsyncFlow<T>.collect(concurrency: Int, action: suspend (value: T) -> Unit): Unit{
collect(concurrency, object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
}

View File

@ -59,37 +59,4 @@ fun Flow<Double>.chunked(bufferSize: Int) = flow {
emit(buffer)
}
}
}
/**
* Perform parallel mapping of flow elements
*/
@InternalCoroutinesApi
@ExperimentalCoroutinesApi
@FlowPreview
fun <T, R> Flow<T>.mapParallel(dispatcher: CoroutineDispatcher = Dispatchers.Default, bufferSize: Int = 16, transform: suspend (T) -> R) : Flow<R>{
require(bufferSize >= 0) {
"Buffer size should be positive, but was $bufferSize"
}
return flow {
coroutineScope {
val channel: ReceiveChannel<Deferred<R>> = produce(capacity = bufferSize) {
collect { value ->
send(async(dispatcher) { transform(value) })
}
}
// TODO semantics doesn't play well here and we pay for that with additional object
(channel as Job).invokeOnCompletion { if (it is CancellationException && it.cause == null) cancel() }
for (element in channel) {
emit(element.await())
}
val producer = channel as Job
if (producer.isCancelled) {
producer.join()
throw producer.getCancellationException()
}
}
}
}

View File

@ -1,24 +1,26 @@
package scientifik.kmath.streaming
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.runBlocking
import org.junit.Test
import scientifik.kmath.async
import scientifik.kmath.collect
@ExperimentalCoroutinesApi
@InternalCoroutinesApi
@FlowPreview
class BufferFlowTest {
@Test(timeout = 2000)
@Test
fun mapParallel() {
runBlocking {
(1..20).asFlow().mapParallel {
(1..20).asFlow().async(Dispatchers.IO) {
println("Started $it")
@Suppress("BlockingMethodInNonBlockingContext")
Thread.sleep(200)
it
}.collect {
}.collect(4) {
println("Completed $it")
}
}

View File

@ -1,36 +0,0 @@
plugins {
`multiplatform-config`
id("kotlinx-atomicfu") version Versions.atomicfuVersion
}
kotlin {
jvm ()
js()
sourceSets {
val commonMain by getting {
dependencies {
api(project(":kmath-core"))
api(project(":kmath-coroutines"))
compileOnly("org.jetbrains.kotlinx:atomicfu-common:${Versions.atomicfuVersion}")
}
}
val jvmMain by getting {
dependencies {
compileOnly("org.jetbrains.kotlinx:atomicfu:${Versions.atomicfuVersion}")
}
}
val jsMain by getting {
dependencies {
compileOnly("org.jetbrains.kotlinx:atomicfu-js:${Versions.atomicfuVersion}")
}
}
}
}
atomicfu {
variant = "VH"
}

View File

@ -1,82 +0,0 @@
package scientifik.kmath.streaming
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.isActive
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import scientifik.kmath.structures.Buffer
import scientifik.kmath.structures.BufferFactory
/**
* A processor that collects incoming elements into fixed size buffers
*/
@ExperimentalCoroutinesApi
class JoinProcessor<T>(
scope: CoroutineScope,
bufferSize: Int,
bufferFactory: BufferFactory<T> = Buffer.Companion::boxing
) : AbstractProcessor<T, Buffer<T>>(scope) {
private val input = Channel<T>(bufferSize)
private val output = produce(coroutineContext) {
val list = ArrayList<T>(bufferSize)
while (isActive) {
list.clear()
repeat(bufferSize) {
list.add(input.receive())
}
val buffer = bufferFactory(bufferSize) { list[it] }
send(buffer)
}
}
override suspend fun receive(): Buffer<T> = output.receive()
override suspend fun send(value: T) {
input.send(value)
}
}
/**
* A processor that splits incoming buffers into individual elements
*/
class SplitProcessor<T>(scope: CoroutineScope) : AbstractProcessor<Buffer<T>, T>(scope) {
private val input = Channel<Buffer<T>>()
private val mutex = Mutex()
private var currentBuffer: Buffer<T>? = null
private var pos = 0
override suspend fun receive(): T {
mutex.withLock {
while (currentBuffer == null || pos == currentBuffer!!.size) {
currentBuffer = input.receive()
pos = 0
}
return currentBuffer!![pos].also { pos++ }
}
}
override suspend fun send(value: Buffer<T>) {
input.send(value)
}
}
@ExperimentalCoroutinesApi
fun <T> Producer<T>.chunked(chunkSize: Int, bufferFactory: BufferFactory<T>) =
JoinProcessor<T>(this, chunkSize, bufferFactory).also { connect(it) }
@ExperimentalCoroutinesApi
inline fun <reified T : Any> Producer<T>.chunked(chunkSize: Int) =
JoinProcessor<T>(this, chunkSize, Buffer.Companion::auto).also { connect(it) }

View File

@ -1,89 +0,0 @@
package scientifik.kmath.streaming
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import scientifik.kmath.structures.Buffer
import scientifik.kmath.structures.MutableBuffer
import scientifik.kmath.structures.VirtualBuffer
/**
* Thread-safe ring buffer
*/
internal class RingBuffer<T>(
private val buffer: MutableBuffer<T>,
private var startIndex: Int = 0,
size: Int = 0
) : Buffer<T> {
private val mutex = Mutex()
override var size: Int = size
private set
override fun get(index: Int): T {
require(index >= 0) { "Index must be positive" }
require(index < size) { "Index $index is out of circular buffer size $size" }
return buffer[startIndex.forward(index)]
}
fun isFull() = size == buffer.size
/**
* Iterator could provide wrong results if buffer is changed in initialization (iteration is safe)
*/
override fun iterator(): Iterator<T> = object : AbstractIterator<T>() {
private var count = size
private var index = startIndex
val copy = buffer.copy()
override fun computeNext() {
if (count == 0) {
done()
} else {
setNext(copy[index])
index = index.forward(1)
count--
}
}
}
/**
* A safe snapshot operation
*/
suspend fun snapshot(): Buffer<T> {
mutex.withLock {
val copy = buffer.copy()
return VirtualBuffer(size) { i -> copy[startIndex.forward(i)] }
}
}
suspend fun push(element: T) {
mutex.withLock {
buffer[startIndex.forward(size)] = element
if (isFull()) {
startIndex++
} else {
size++
}
}
}
@Suppress("NOTHING_TO_INLINE")
private inline fun Int.forward(n: Int): Int = (this + n) % (buffer.size)
companion object {
inline fun <reified T : Any> build(size: Int, empty: T): RingBuffer<T> {
val buffer = MutableBuffer.auto(size) { empty }
return RingBuffer(buffer)
}
/**
* Slow yet universal buffer
*/
fun <T> boxing(size: Int): RingBuffer<T?> {
val buffer: MutableBuffer<T?> = MutableBuffer.boxing(size) { null }
return RingBuffer(buffer)
}
}
}

View File

@ -1,275 +0,0 @@
package scientifik.kmath.streaming
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import scientifik.kmath.structures.Buffer
import kotlin.coroutines.CoroutineContext
/**
* Initial chain block. Could produce an element sequence and be connected to single [Consumer]
*
* The general rule is that channel is created on first call. Also each element is responsible for its connection so
* while the connections are symmetric, the scope, used for making the connection is responsible for cancelation.
*
* Also connections are not reversible. Once connected block stays faithful until it finishes processing.
* Manually putting elements to connected block could lead to undetermined behavior and must be avoided.
*/
interface Producer<T> : CoroutineScope {
fun connect(consumer: Consumer<T>)
suspend fun receive(): T
val consumer: Consumer<T>?
val outputIsConnected: Boolean get() = consumer != null
//fun close()
}
/**
* Terminal chain block. Could consume an element sequence and be connected to signle [Producer]
*/
interface Consumer<T> : CoroutineScope {
fun connect(producer: Producer<T>)
suspend fun send(value: T)
val producer: Producer<T>?
val inputIsConnected: Boolean get() = producer != null
//fun close()
}
interface Processor<T, R> : Consumer<T>, Producer<R>
abstract class AbstractProducer<T>(scope: CoroutineScope) : Producer<T> {
override val coroutineContext: CoroutineContext = scope.coroutineContext
override var consumer: Consumer<T>? = null
protected set
override fun connect(consumer: Consumer<T>) {
//Ignore if already connected to specific consumer
if (consumer != this.consumer) {
if (outputIsConnected) error("The output slot of producer is occupied")
if (consumer.inputIsConnected) error("The input slot of consumer is occupied")
this.consumer = consumer
if (consumer.producer != null) {
//No need to save the job, it will be canceled on scope cancel
connectOutput(consumer)
// connect back, consumer is already set so no circular reference
consumer.connect(this)
} else error("Unreachable statement")
}
}
protected open fun connectOutput(consumer: Consumer<T>) {
launch {
while (this.isActive) {
consumer.send(receive())
}
}
}
}
abstract class AbstractConsumer<T>(scope: CoroutineScope) : Consumer<T> {
override val coroutineContext: CoroutineContext = scope.coroutineContext
override var producer: Producer<T>? = null
protected set
override fun connect(producer: Producer<T>) {
//Ignore if already connected to specific consumer
if (producer != this.producer) {
if (inputIsConnected) error("The input slot of consumer is occupied")
if (producer.outputIsConnected) error("The input slot of producer is occupied")
this.producer = producer
//No need to save the job, it will be canceled on scope cancel
if (producer.consumer != null) {
connectInput(producer)
// connect back
producer.connect(this)
} else error("Unreachable statement")
}
}
protected open fun connectInput(producer: Producer<T>) {
launch {
while (isActive) {
send(producer.receive())
}
}
}
}
abstract class AbstractProcessor<T, R>(scope: CoroutineScope) : Processor<T, R>, AbstractProducer<R>(scope) {
override var producer: Producer<T>? = null
protected set
override fun connect(producer: Producer<T>) {
//Ignore if already connected to specific consumer
if (producer != this.producer) {
if (inputIsConnected) error("The input slot of consumer is occupied")
if (producer.outputIsConnected) error("The input slot of producer is occupied")
this.producer = producer
//No need to save the job, it will be canceled on scope cancel
if (producer.consumer != null) {
connectInput(producer)
// connect back
producer.connect(this)
} else error("Unreachable statement")
}
}
protected open fun connectInput(producer: Producer<T>) {
launch {
while (isActive) {
send(producer.receive())
}
}
}
}
/**
* A simple [produce]-based producer
*/
class GenericProducer<T>(
scope: CoroutineScope,
capacity: Int = Channel.UNLIMITED,
block: suspend ProducerScope<T>.() -> Unit
) : AbstractProducer<T>(scope) {
private val channel: ReceiveChannel<T> by lazy { produce(capacity = capacity, block = block) }
override suspend fun receive(): T = channel.receive()
}
/**
* A simple pipeline [Processor] block
*/
class PipeProcessor<T, R>(
scope: CoroutineScope,
capacity: Int = Channel.RENDEZVOUS,
process: suspend (T) -> R
) : AbstractProcessor<T, R>(scope) {
private val input = Channel<T>(capacity)
private val output: ReceiveChannel<R> = input.map(coroutineContext, process)
override suspend fun receive(): R = output.receive()
override suspend fun send(value: T) {
input.send(value)
}
}
/**
* A moving window [Processor] with circular buffer
*/
class WindowedProcessor<T, R>(
scope: CoroutineScope,
window: Int,
val process: suspend (Buffer<T?>) -> R
) : AbstractProcessor<T, R>(scope) {
private val ringBuffer = RingBuffer.boxing<T>(window)
private val channel = Channel<R>(Channel.RENDEZVOUS)
override suspend fun receive(): R {
return channel.receive()
}
override suspend fun send(value: T) {
ringBuffer.push(value)
channel.send(process(ringBuffer.snapshot()))
}
}
/**
* Thread-safe aggregator of values from input. The aggregator does not store all incoming values, it uses fold procedure
* to incorporate them into state on-arrival.
* The current aggregated state could be accessed by [state]. The input channel is inactive unless requested
* @param T - the type of the input element
* @param S - the type of the aggregator
*/
class Reducer<T, S>(
scope: CoroutineScope,
initialState: S,
val fold: suspend (S, T) -> S
) : AbstractConsumer<T>(scope) {
var state: S = initialState
private set
private val mutex = Mutex()
override suspend fun send(value: T) = mutex.withLock {
state = fold(state, value)
}
}
/**
* Collector that accumulates all values in a list. List could be accessed from non-suspending environment via [list] value.
*/
class Collector<T>(scope: CoroutineScope) : AbstractConsumer<T>(scope) {
private val _list = ArrayList<T>()
private val mutex = Mutex()
val list: List<T> get() = _list
override suspend fun send(value: T) {
mutex.withLock {
_list.add(value)
}
}
}
/**
* Convert a sequence to [Producer]
*/
fun <T> Sequence<T>.produce(scope: CoroutineScope = GlobalScope) =
GenericProducer<T>(scope) { forEach { send(it) } }
/**
* Convert a [ReceiveChannel] to [Producer]
*/
fun <T> ReceiveChannel<T>.produce(scope: CoroutineScope = GlobalScope) =
GenericProducer<T>(scope) { for (e in this@produce) send(e) }
fun <T, C : Consumer<T>> Producer<T>.consumer(consumerFactory: () -> C): C =
consumerFactory().also { connect(it) }
fun <T, R> Producer<T>.map(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) =
PipeProcessor(this, capacity, process).also { connect(it) }
/**
* Create a reducer and connect this producer to reducer
*/
fun <T, S> Producer<T>.reduce(initialState: S, fold: suspend (S, T) -> S) =
Reducer(this, initialState, fold).also { connect(it) }
/**
* Create a [Collector] and attach it to this [Producer]
*/
fun <T> Producer<T>.collect() =
Collector<T>(this).also { connect(it) }
fun <T, R, P : Processor<T, R>> Producer<T>.process(processorBuilder: () -> P): P =
processorBuilder().also { connect(it) }
fun <T, R> Producer<T>.process(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) =
PipeProcessor<T, R>(this, capacity, process).also { connect(it) }
fun <T, R> Producer<T>.windowed(window: Int, process: suspend (Buffer<T?>) -> R) =
WindowedProcessor(this, window, process).also { connect(it) }