From 28695148e92c6e41fd92ed187fe66c42f7b41ba8 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Wed, 6 Feb 2019 22:24:03 +0300 Subject: [PATCH] Hidden channels in streaming blocks. --- .../sequential/SpecializedStreamingBlocks.kt | 38 +++++-- .../kmath/sequential/StreamingBlocks.kt | 102 +++++++++++------- 2 files changed, 96 insertions(+), 44 deletions(-) diff --git a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/SpecializedStreamingBlocks.kt b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/SpecializedStreamingBlocks.kt index 43752af70..a4f26113f 100644 --- a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/SpecializedStreamingBlocks.kt +++ b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/SpecializedStreamingBlocks.kt @@ -1,16 +1,17 @@ package scientifik.kmath.sequential +import kotlinx.atomicfu.atomic +import kotlinx.atomicfu.update import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.coroutines.channels.SendChannel -import kotlinx.coroutines.channels.toChannel +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.launch interface DoubleProducer : Producer { - val arrayOutput: ReceiveChannel + suspend fun receiveArray(): DoubleArray } interface DoubleConsumer : Consumer { - val arrayInput: SendChannel + suspend fun sendArray(): DoubleArray } @@ -19,7 +20,7 @@ abstract class AbstractDoubleProducer(scope: CoroutineScope) : AbstractProducer< if (consumer is DoubleConsumer) { arrayOutput.toChannel(consumer.arrayInput) } else { - super.connectOutput(consumer) + connectOutput(super, consumer) } } } @@ -41,7 +42,7 @@ abstract class AbstractDoubleProcessor(scope: CoroutineScope) : AbstractProcesso if (consumer is DoubleConsumer) { arrayOutput.toChannel(consumer.arrayInput) } else { - super.connectOutput(consumer) + connectOutput(super, consumer) } } @@ -52,4 +53,27 @@ abstract class AbstractDoubleProcessor(scope: CoroutineScope) : AbstractProcesso super.connectInput(producer) } } +} + +class DoubleReducer( + scope: CoroutineScope, + initialState: S, + fold: suspend (S, DoubleArray) -> S +) : AbstractDoubleConsumer(scope) { + private val state = atomic(initialState) + + val value: S = state.value + + override val arrayInput: SendChannel by lazy { + //create a channel and start process of reading all elements into aggregator + Channel(capacity = Channel.RENDEZVOUS).also { + launch { + it.consumeEach { value -> state.update { fold(it, value) } } + } + } + } + + override val input: SendChannel = object :Abstr + + } \ No newline at end of file diff --git a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/StreamingBlocks.kt b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/StreamingBlocks.kt index 852a0d682..2cc412a9a 100644 --- a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/StreamingBlocks.kt +++ b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/StreamingBlocks.kt @@ -5,6 +5,7 @@ import kotlinx.atomicfu.update import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.* +import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock @@ -20,24 +21,30 @@ import kotlin.coroutines.CoroutineContext * Manually putting elements to connected block could lead to undetermined behavior and must be avoided. */ interface Producer : CoroutineScope { - val output: ReceiveChannel fun connect(consumer: Consumer) + suspend fun receive(): T + val consumer: Consumer? val outputIsConnected: Boolean get() = consumer != null + + //fun close() } /** * Terminal chain block. Could consume an element sequence and be connected to signle [Producer] */ interface Consumer : CoroutineScope { - val input: SendChannel fun connect(producer: Producer) + suspend fun send(value: T) + val producer: Producer? val inputIsConnected: Boolean get() = producer != null + + //fun close() } interface Processor : Consumer, Producer @@ -56,17 +63,19 @@ abstract class AbstractProducer(scope: CoroutineScope) : Producer { this.consumer = consumer if (consumer.producer != null) { //No need to save the job, it will be canceled on scope cancel - launch { - connectOutput(consumer) - } + connectOutput(consumer) // connect back, consumer is already set so no circular reference consumer.connect(this) } else error("Unreachable statement") } } - protected open suspend fun connectOutput(consumer: Consumer) { - output.toChannel(consumer.input) + protected open fun connectOutput(consumer: Consumer) { + launch { + while (this.isActive) { + consumer.send(receive()) + } + } } } @@ -84,17 +93,19 @@ abstract class AbstractConsumer(scope: CoroutineScope) : Consumer { this.producer = producer //No need to save the job, it will be canceled on scope cancel if (producer.consumer != null) { - launch { - connectInput(producer) - } + connectInput(producer) // connect back producer.connect(this) } else error("Unreachable statement") } } - protected open suspend fun connectInput(producer: Producer) { - producer.output.toChannel(input) + protected open fun connectInput(producer: Producer) { + launch { + while (isActive) { + send(producer.receive()) + } + } } } @@ -111,17 +122,19 @@ abstract class AbstractProcessor(scope: CoroutineScope) : Processor, this.producer = producer //No need to save the job, it will be canceled on scope cancel if (producer.consumer != null) { - launch { - connectInput(producer) - } + connectInput(producer) // connect back producer.connect(this) } else error("Unreachable statement") } } - protected open suspend fun connectInput(producer: Producer) { - producer.output.toChannel(input) + protected open fun connectInput(producer: Producer) { + launch { + while (isActive) { + send(producer.receive()) + } + } } } @@ -133,8 +146,10 @@ class GenericProducer( capacity: Int = Channel.UNLIMITED, block: suspend ProducerScope.() -> Unit ) : AbstractProducer(scope) { - //The generation begins on first request to output - override val output: ReceiveChannel by lazy { produce(capacity = capacity, block = block) } + + private val channel: ReceiveChannel by lazy { produce(capacity = capacity, block = block) } + + override suspend fun receive(): T = channel.receive() } /** @@ -146,9 +161,14 @@ class PipeProcessor( process: suspend (T) -> R ) : AbstractProcessor(scope) { - private val _input = Channel(capacity) - override val input: SendChannel get() = _input - override val output: ReceiveChannel = _input.map(coroutineContext, process) + private val input = Channel(capacity) + private val output: ReceiveChannel = input.map(coroutineContext, process) + + override suspend fun receive(): R = output.receive() + + override suspend fun send(value: T) { + input.send(value) + } } /** @@ -160,41 +180,45 @@ class ChunkProcessor( process: suspend (List) -> R ) : AbstractProcessor(scope) { - private val _input = Channel(chunkSize) - - override val input: SendChannel get() = _input + private val input = Channel(chunkSize) private val chunked = produce>(coroutineContext) { val list = ArrayList(chunkSize) repeat(chunkSize) { - list.add(_input.receive()) + list.add(input.receive()) } send(list) } - override val output: ReceiveChannel = chunked.map(coroutineContext, process) + private val output: ReceiveChannel = chunked.map(coroutineContext, process) + + override suspend fun receive(): R = output.receive() + + override suspend fun send(value: T) { + input.send(value) + } } /** - * A moving window [Processor] + * A moving window [Processor] with circular buffer */ -class WindowProcessor( +class WindowedProcessor( scope: CoroutineScope, window: Int, process: suspend (List) -> R ) : AbstractProcessor(scope) { + override suspend fun receive(): R { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } - override val output: ReceiveChannel - get() = TODO("not implemented") //To change initializer of created properties use File | Settings | File Templates. - override val input: SendChannel - get() = TODO("not implemented") //To change initializer of created properties use File | Settings | File Templates. + override suspend fun send(value: T) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } } -//TODO add circular buffer processor - /** * Thread-safe aggregator of values from input. The aggregator does not store all incoming values, it uses fold procedure * to incorporate them into state on-arrival. @@ -212,7 +236,7 @@ class Reducer( val value: S = state.value - override val input: SendChannel by lazy { + private val input: SendChannel by lazy { //create a channel and start process of reading all elements into aggregator Channel(capacity = Channel.RENDEZVOUS).also { launch { @@ -220,6 +244,8 @@ class Reducer( } } } + + override suspend fun send(value: T) = input.send(value) } /** @@ -231,7 +257,7 @@ class Collector(scope: CoroutineScope) : AbstractConsumer(scope) { private val mutex = Mutex() val list: List get() = _list - override val input: SendChannel by lazy { + private val input: SendChannel by lazy { //create a channel and start process of reading all elements into aggregator Channel(capacity = Channel.RENDEZVOUS).also { launch { @@ -243,6 +269,8 @@ class Collector(scope: CoroutineScope) : AbstractConsumer(scope) { } } } + + override suspend fun send(value: T) = input.send(value) } /**