From 08e14b15c59f1bd7234dabb3be69af05c18a3754 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sat, 27 Apr 2019 22:15:21 +0300 Subject: [PATCH] Migrating streaming to Flow --- .../scientifik/kmath/CoroutinesExtra.kt | 76 ++++- .../scientifik/kmath/streaming/BufferFlow.kt | 33 --- .../kmath/streaming/BufferFlowTest.kt | 20 +- kmath-streaming/build.gradle.kts | 36 --- .../kmath/streaming/BufferStreaming.kt | 82 ------ .../scientifik/kmath/streaming/RingBuffer.kt | 89 ------ .../scientifik/kmath/streaming/Streaming.kt | 275 ------------------ 7 files changed, 81 insertions(+), 530 deletions(-) delete mode 100644 kmath-streaming/build.gradle.kts delete mode 100644 kmath-streaming/src/commonMain/kotlin/scientifik/kmath/streaming/BufferStreaming.kt delete mode 100644 kmath-streaming/src/commonMain/kotlin/scientifik/kmath/streaming/RingBuffer.kt delete mode 100644 kmath-streaming/src/commonMain/kotlin/scientifik/kmath/streaming/Streaming.kt diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/CoroutinesExtra.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/CoroutinesExtra.kt index 7580ef6ef..a0e11390d 100644 --- a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/CoroutinesExtra.kt +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/CoroutinesExtra.kt @@ -1,9 +1,73 @@ package scientifik.kmath -import kotlinx.coroutines.CoroutineDispatcher -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.EmptyCoroutineContext +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.produce +import kotlinx.coroutines.flow.* -val Dispatchers.Math: CoroutineDispatcher get() = Dispatchers.Default \ No newline at end of file +val Dispatchers.Math: CoroutineDispatcher get() = Dispatchers.Default + +@FlowPreview +inline class AsyncFlow(val deferredFlow: Flow>) : Flow { + override suspend fun collect(collector: FlowCollector) { + deferredFlow.collect { + collector.emit((it.await())) + } + } +} + +@FlowPreview +fun Flow.async( + dispatcher: CoroutineDispatcher = Dispatchers.Default, + block: suspend (T) -> R +): AsyncFlow { + val flow = map { + coroutineScope { + async(dispatcher, start = CoroutineStart.LAZY) { block(it) } + } + } + return AsyncFlow(flow) +} + +@FlowPreview +fun AsyncFlow.map(action: (T) -> R) = deferredFlow.map { input -> + coroutineScope { + async(start = CoroutineStart.LAZY) { action(input.await()) } + } +} + +@ExperimentalCoroutinesApi +@FlowPreview +suspend fun AsyncFlow.collect(concurrency: Int, collector: FlowCollector){ + require(concurrency >= 0) { "Buffer size should be positive, but was $concurrency" } + coroutineScope { + //Starting up to N deferred coroutines ahead of time + val channel = produce(capacity = concurrency) { + deferredFlow.collect { value -> + value.start() + send(value) + } + } + + (channel as Job).invokeOnCompletion { + if (it is CancellationException && it.cause == null) cancel() + } + + for (element in channel) { + collector.emit(element.await()) + } + + val producer = channel as Job + if (producer.isCancelled) { + producer.join() + //throw producer.getCancellationException() + } + } +} + +@ExperimentalCoroutinesApi +@FlowPreview +suspend fun AsyncFlow.collect(concurrency: Int, action: suspend (value: T) -> Unit): Unit{ + collect(concurrency, object : FlowCollector { + override suspend fun emit(value: T) = action(value) + }) +} diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt index f6c38e153..67b74d7de 100644 --- a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt @@ -59,37 +59,4 @@ fun Flow.chunked(bufferSize: Int) = flow { emit(buffer) } } -} - -/** - * Perform parallel mapping of flow elements - */ -@InternalCoroutinesApi -@ExperimentalCoroutinesApi -@FlowPreview -fun Flow.mapParallel(dispatcher: CoroutineDispatcher = Dispatchers.Default, bufferSize: Int = 16, transform: suspend (T) -> R) : Flow{ - require(bufferSize >= 0) { - "Buffer size should be positive, but was $bufferSize" - } - return flow { - coroutineScope { - val channel: ReceiveChannel> = produce(capacity = bufferSize) { - collect { value -> - send(async(dispatcher) { transform(value) }) - } - } - - // TODO semantics doesn't play well here and we pay for that with additional object - (channel as Job).invokeOnCompletion { if (it is CancellationException && it.cause == null) cancel() } - for (element in channel) { - emit(element.await()) - } - - val producer = channel as Job - if (producer.isCancelled) { - producer.join() - throw producer.getCancellationException() - } - } - } } \ No newline at end of file diff --git a/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/BufferFlowTest.kt b/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/BufferFlowTest.kt index 4dd0700d1..e3a38ff43 100644 --- a/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/BufferFlowTest.kt +++ b/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/BufferFlowTest.kt @@ -1,24 +1,26 @@ package scientifik.kmath.streaming -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.Job +import kotlinx.coroutines.* import kotlinx.coroutines.flow.asFlow -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.flowOf -import kotlinx.coroutines.runBlocking import org.junit.Test +import scientifik.kmath.async +import scientifik.kmath.collect +@ExperimentalCoroutinesApi @InternalCoroutinesApi +@FlowPreview class BufferFlowTest { - @Test(timeout = 2000) + + @Test fun mapParallel() { runBlocking { - (1..20).asFlow().mapParallel { + (1..20).asFlow().async(Dispatchers.IO) { + println("Started $it") + @Suppress("BlockingMethodInNonBlockingContext") Thread.sleep(200) it - }.collect { + }.collect(4) { println("Completed $it") } } diff --git a/kmath-streaming/build.gradle.kts b/kmath-streaming/build.gradle.kts deleted file mode 100644 index f49e71116..000000000 --- a/kmath-streaming/build.gradle.kts +++ /dev/null @@ -1,36 +0,0 @@ -plugins { - `multiplatform-config` - id("kotlinx-atomicfu") version Versions.atomicfuVersion -} - - - -kotlin { - jvm () - js() - - sourceSets { - val commonMain by getting { - dependencies { - api(project(":kmath-core")) - api(project(":kmath-coroutines")) - compileOnly("org.jetbrains.kotlinx:atomicfu-common:${Versions.atomicfuVersion}") - } - } - val jvmMain by getting { - dependencies { - compileOnly("org.jetbrains.kotlinx:atomicfu:${Versions.atomicfuVersion}") - } - } - val jsMain by getting { - dependencies { - compileOnly("org.jetbrains.kotlinx:atomicfu-js:${Versions.atomicfuVersion}") - } - } - - } -} - -atomicfu { - variant = "VH" -} \ No newline at end of file diff --git a/kmath-streaming/src/commonMain/kotlin/scientifik/kmath/streaming/BufferStreaming.kt b/kmath-streaming/src/commonMain/kotlin/scientifik/kmath/streaming/BufferStreaming.kt deleted file mode 100644 index 023199452..000000000 --- a/kmath-streaming/src/commonMain/kotlin/scientifik/kmath/streaming/BufferStreaming.kt +++ /dev/null @@ -1,82 +0,0 @@ -package scientifik.kmath.streaming - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.produce -import kotlinx.coroutines.isActive -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import scientifik.kmath.structures.Buffer -import scientifik.kmath.structures.BufferFactory - -/** - * A processor that collects incoming elements into fixed size buffers - */ -@ExperimentalCoroutinesApi -class JoinProcessor( - scope: CoroutineScope, - bufferSize: Int, - bufferFactory: BufferFactory = Buffer.Companion::boxing -) : AbstractProcessor>(scope) { - - private val input = Channel(bufferSize) - - private val output = produce(coroutineContext) { - val list = ArrayList(bufferSize) - while (isActive) { - list.clear() - repeat(bufferSize) { - list.add(input.receive()) - } - val buffer = bufferFactory(bufferSize) { list[it] } - send(buffer) - } - } - - override suspend fun receive(): Buffer = output.receive() - - override suspend fun send(value: T) { - input.send(value) - } -} - -/** - * A processor that splits incoming buffers into individual elements - */ -class SplitProcessor(scope: CoroutineScope) : AbstractProcessor, T>(scope) { - - private val input = Channel>() - - private val mutex = Mutex() - - private var currentBuffer: Buffer? = null - - private var pos = 0 - - - override suspend fun receive(): T { - mutex.withLock { - while (currentBuffer == null || pos == currentBuffer!!.size) { - currentBuffer = input.receive() - pos = 0 - } - return currentBuffer!![pos].also { pos++ } - } - } - - override suspend fun send(value: Buffer) { - input.send(value) - } -} - -@ExperimentalCoroutinesApi -fun Producer.chunked(chunkSize: Int, bufferFactory: BufferFactory) = - JoinProcessor(this, chunkSize, bufferFactory).also { connect(it) } - -@ExperimentalCoroutinesApi -inline fun Producer.chunked(chunkSize: Int) = - JoinProcessor(this, chunkSize, Buffer.Companion::auto).also { connect(it) } - - - diff --git a/kmath-streaming/src/commonMain/kotlin/scientifik/kmath/streaming/RingBuffer.kt b/kmath-streaming/src/commonMain/kotlin/scientifik/kmath/streaming/RingBuffer.kt deleted file mode 100644 index ac64f921f..000000000 --- a/kmath-streaming/src/commonMain/kotlin/scientifik/kmath/streaming/RingBuffer.kt +++ /dev/null @@ -1,89 +0,0 @@ -package scientifik.kmath.streaming - -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import scientifik.kmath.structures.Buffer -import scientifik.kmath.structures.MutableBuffer -import scientifik.kmath.structures.VirtualBuffer - -/** - * Thread-safe ring buffer - */ -internal class RingBuffer( - private val buffer: MutableBuffer, - private var startIndex: Int = 0, - size: Int = 0 -) : Buffer { - - private val mutex = Mutex() - - override var size: Int = size - private set - - override fun get(index: Int): T { - require(index >= 0) { "Index must be positive" } - require(index < size) { "Index $index is out of circular buffer size $size" } - return buffer[startIndex.forward(index)] - } - - fun isFull() = size == buffer.size - - /** - * Iterator could provide wrong results if buffer is changed in initialization (iteration is safe) - */ - override fun iterator(): Iterator = object : AbstractIterator() { - private var count = size - private var index = startIndex - val copy = buffer.copy() - - override fun computeNext() { - if (count == 0) { - done() - } else { - setNext(copy[index]) - index = index.forward(1) - count-- - } - } - } - - /** - * A safe snapshot operation - */ - suspend fun snapshot(): Buffer { - mutex.withLock { - val copy = buffer.copy() - return VirtualBuffer(size) { i -> copy[startIndex.forward(i)] } - } - } - - suspend fun push(element: T) { - mutex.withLock { - buffer[startIndex.forward(size)] = element - if (isFull()) { - startIndex++ - } else { - size++ - } - } - } - - - @Suppress("NOTHING_TO_INLINE") - private inline fun Int.forward(n: Int): Int = (this + n) % (buffer.size) - - companion object { - inline fun build(size: Int, empty: T): RingBuffer { - val buffer = MutableBuffer.auto(size) { empty } - return RingBuffer(buffer) - } - - /** - * Slow yet universal buffer - */ - fun boxing(size: Int): RingBuffer { - val buffer: MutableBuffer = MutableBuffer.boxing(size) { null } - return RingBuffer(buffer) - } - } -} \ No newline at end of file diff --git a/kmath-streaming/src/commonMain/kotlin/scientifik/kmath/streaming/Streaming.kt b/kmath-streaming/src/commonMain/kotlin/scientifik/kmath/streaming/Streaming.kt deleted file mode 100644 index 3cb3bcfdd..000000000 --- a/kmath-streaming/src/commonMain/kotlin/scientifik/kmath/streaming/Streaming.kt +++ /dev/null @@ -1,275 +0,0 @@ -package scientifik.kmath.streaming - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.channels.* -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import scientifik.kmath.structures.Buffer -import kotlin.coroutines.CoroutineContext - -/** - * Initial chain block. Could produce an element sequence and be connected to single [Consumer] - * - * The general rule is that channel is created on first call. Also each element is responsible for its connection so - * while the connections are symmetric, the scope, used for making the connection is responsible for cancelation. - * - * Also connections are not reversible. Once connected block stays faithful until it finishes processing. - * Manually putting elements to connected block could lead to undetermined behavior and must be avoided. - */ -interface Producer : CoroutineScope { - fun connect(consumer: Consumer) - - suspend fun receive(): T - - val consumer: Consumer? - - val outputIsConnected: Boolean get() = consumer != null - - //fun close() -} - -/** - * Terminal chain block. Could consume an element sequence and be connected to signle [Producer] - */ -interface Consumer : CoroutineScope { - fun connect(producer: Producer) - - suspend fun send(value: T) - - val producer: Producer? - - val inputIsConnected: Boolean get() = producer != null - - //fun close() -} - -interface Processor : Consumer, Producer - -abstract class AbstractProducer(scope: CoroutineScope) : Producer { - override val coroutineContext: CoroutineContext = scope.coroutineContext - - override var consumer: Consumer? = null - protected set - - override fun connect(consumer: Consumer) { - //Ignore if already connected to specific consumer - if (consumer != this.consumer) { - if (outputIsConnected) error("The output slot of producer is occupied") - if (consumer.inputIsConnected) error("The input slot of consumer is occupied") - this.consumer = consumer - if (consumer.producer != null) { - //No need to save the job, it will be canceled on scope cancel - connectOutput(consumer) - // connect back, consumer is already set so no circular reference - consumer.connect(this) - } else error("Unreachable statement") - } - } - - protected open fun connectOutput(consumer: Consumer) { - launch { - while (this.isActive) { - consumer.send(receive()) - } - } - } -} - -abstract class AbstractConsumer(scope: CoroutineScope) : Consumer { - override val coroutineContext: CoroutineContext = scope.coroutineContext - - override var producer: Producer? = null - protected set - - override fun connect(producer: Producer) { - //Ignore if already connected to specific consumer - if (producer != this.producer) { - if (inputIsConnected) error("The input slot of consumer is occupied") - if (producer.outputIsConnected) error("The input slot of producer is occupied") - this.producer = producer - //No need to save the job, it will be canceled on scope cancel - if (producer.consumer != null) { - connectInput(producer) - // connect back - producer.connect(this) - } else error("Unreachable statement") - } - } - - protected open fun connectInput(producer: Producer) { - launch { - while (isActive) { - send(producer.receive()) - } - } - } -} - -abstract class AbstractProcessor(scope: CoroutineScope) : Processor, AbstractProducer(scope) { - - override var producer: Producer? = null - protected set - - override fun connect(producer: Producer) { - //Ignore if already connected to specific consumer - if (producer != this.producer) { - if (inputIsConnected) error("The input slot of consumer is occupied") - if (producer.outputIsConnected) error("The input slot of producer is occupied") - this.producer = producer - //No need to save the job, it will be canceled on scope cancel - if (producer.consumer != null) { - connectInput(producer) - // connect back - producer.connect(this) - } else error("Unreachable statement") - } - } - - protected open fun connectInput(producer: Producer) { - launch { - while (isActive) { - send(producer.receive()) - } - } - } -} - -/** - * A simple [produce]-based producer - */ -class GenericProducer( - scope: CoroutineScope, - capacity: Int = Channel.UNLIMITED, - block: suspend ProducerScope.() -> Unit -) : AbstractProducer(scope) { - - private val channel: ReceiveChannel by lazy { produce(capacity = capacity, block = block) } - - override suspend fun receive(): T = channel.receive() -} - -/** - * A simple pipeline [Processor] block - */ -class PipeProcessor( - scope: CoroutineScope, - capacity: Int = Channel.RENDEZVOUS, - process: suspend (T) -> R -) : AbstractProcessor(scope) { - - private val input = Channel(capacity) - private val output: ReceiveChannel = input.map(coroutineContext, process) - - override suspend fun receive(): R = output.receive() - - override suspend fun send(value: T) { - input.send(value) - } -} - - -/** - * A moving window [Processor] with circular buffer - */ -class WindowedProcessor( - scope: CoroutineScope, - window: Int, - val process: suspend (Buffer) -> R -) : AbstractProcessor(scope) { - - private val ringBuffer = RingBuffer.boxing(window) - - private val channel = Channel(Channel.RENDEZVOUS) - - override suspend fun receive(): R { - return channel.receive() - } - - override suspend fun send(value: T) { - ringBuffer.push(value) - channel.send(process(ringBuffer.snapshot())) - } -} - -/** - * Thread-safe aggregator of values from input. The aggregator does not store all incoming values, it uses fold procedure - * to incorporate them into state on-arrival. - * The current aggregated state could be accessed by [state]. The input channel is inactive unless requested - * @param T - the type of the input element - * @param S - the type of the aggregator - */ -class Reducer( - scope: CoroutineScope, - initialState: S, - val fold: suspend (S, T) -> S -) : AbstractConsumer(scope) { - - var state: S = initialState - private set - - private val mutex = Mutex() - - override suspend fun send(value: T) = mutex.withLock { - state = fold(state, value) - } -} - -/** - * Collector that accumulates all values in a list. List could be accessed from non-suspending environment via [list] value. - */ -class Collector(scope: CoroutineScope) : AbstractConsumer(scope) { - - private val _list = ArrayList() - private val mutex = Mutex() - val list: List get() = _list - - override suspend fun send(value: T) { - mutex.withLock { - _list.add(value) - } - } -} - -/** - * Convert a sequence to [Producer] - */ -fun Sequence.produce(scope: CoroutineScope = GlobalScope) = - GenericProducer(scope) { forEach { send(it) } } - -/** - * Convert a [ReceiveChannel] to [Producer] - */ -fun ReceiveChannel.produce(scope: CoroutineScope = GlobalScope) = - GenericProducer(scope) { for (e in this@produce) send(e) } - - -fun > Producer.consumer(consumerFactory: () -> C): C = - consumerFactory().also { connect(it) } - -fun Producer.map(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) = - PipeProcessor(this, capacity, process).also { connect(it) } - -/** - * Create a reducer and connect this producer to reducer - */ -fun Producer.reduce(initialState: S, fold: suspend (S, T) -> S) = - Reducer(this, initialState, fold).also { connect(it) } - -/** - * Create a [Collector] and attach it to this [Producer] - */ -fun Producer.collect() = - Collector(this).also { connect(it) } - -fun > Producer.process(processorBuilder: () -> P): P = - processorBuilder().also { connect(it) } - -fun Producer.process(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) = - PipeProcessor(this, capacity, process).also { connect(it) } - - -fun Producer.windowed(window: Int, process: suspend (Buffer) -> R) = - WindowedProcessor(this, window, process).also { connect(it) } \ No newline at end of file