diff --git a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/SpecializedStreamingBlocks.kt b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/SpecializedStreamingBlocks.kt new file mode 100644 index 000000000..43752af70 --- /dev/null +++ b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/SpecializedStreamingBlocks.kt @@ -0,0 +1,55 @@ +package scientifik.kmath.sequential + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.channels.toChannel + +interface DoubleProducer : Producer { + val arrayOutput: ReceiveChannel +} + +interface DoubleConsumer : Consumer { + val arrayInput: SendChannel +} + + +abstract class AbstractDoubleProducer(scope: CoroutineScope) : AbstractProducer(scope), DoubleProducer { + override suspend fun connectOutput(consumer: Consumer) { + if (consumer is DoubleConsumer) { + arrayOutput.toChannel(consumer.arrayInput) + } else { + super.connectOutput(consumer) + } + } +} + +abstract class AbstractDoubleConsumer(scope: CoroutineScope) : AbstractConsumer(scope), DoubleConsumer { + override suspend fun connectInput(producer: Producer) { + if (producer is DoubleProducer) { + producer.arrayOutput.toChannel(arrayInput) + } else { + super.connectInput(producer) + } + } +} + +abstract class AbstractDoubleProcessor(scope: CoroutineScope) : AbstractProcessor(scope), + DoubleProducer, DoubleConsumer { + + override suspend fun connectOutput(consumer: Consumer) { + if (consumer is DoubleConsumer) { + arrayOutput.toChannel(consumer.arrayInput) + } else { + super.connectOutput(consumer) + } + } + + override suspend fun connectInput(producer: Producer) { + if (producer is DoubleProducer) { + producer.arrayOutput.toChannel(arrayInput) + } else { + super.connectInput(producer) + } + } +} \ No newline at end of file diff --git a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/StreamingBlocks.kt b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/StreamingBlocks.kt index baacead46..852a0d682 100644 --- a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/StreamingBlocks.kt +++ b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/StreamingBlocks.kt @@ -3,10 +3,12 @@ package scientifik.kmath.sequential import kotlinx.atomicfu.atomic import kotlinx.atomicfu.update import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.* import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +import kotlin.coroutines.CoroutineContext /** * Initial chain block. Could produce an element sequence and be connected to single [Consumer] @@ -17,7 +19,7 @@ import kotlinx.coroutines.sync.withLock * Also connections are not reversible. Once connected block stays faithful until it finishes processing. * Manually putting elements to connected block could lead to undetermined behavior and must be avoided. */ -interface Producer { +interface Producer : CoroutineScope { val output: ReceiveChannel fun connect(consumer: Consumer) @@ -29,7 +31,7 @@ interface Producer { /** * Terminal chain block. Could consume an element sequence and be connected to signle [Producer] */ -interface Consumer { +interface Consumer : CoroutineScope { val input: SendChannel fun connect(producer: Producer) @@ -40,7 +42,9 @@ interface Consumer { interface Processor : Consumer, Producer -abstract class AbstractProducer(protected val scope: CoroutineScope) : Producer { +abstract class AbstractProducer(scope: CoroutineScope) : Producer { + override val coroutineContext: CoroutineContext = scope.coroutineContext + override var consumer: Consumer? = null protected set @@ -52,17 +56,23 @@ abstract class AbstractProducer(protected val scope: CoroutineScope) : Produc this.consumer = consumer if (consumer.producer != null) { //No need to save the job, it will be canceled on scope cancel - scope.launch { - output.toChannel(consumer.input) + launch { + connectOutput(consumer) } // connect back, consumer is already set so no circular reference consumer.connect(this) } else error("Unreachable statement") } } + + protected open suspend fun connectOutput(consumer: Consumer) { + output.toChannel(consumer.input) + } } -abstract class AbstractConsumer(protected val scope: CoroutineScope) : Consumer { +abstract class AbstractConsumer(scope: CoroutineScope) : Consumer { + override val coroutineContext: CoroutineContext = scope.coroutineContext + override var producer: Producer? = null protected set @@ -74,17 +84,21 @@ abstract class AbstractConsumer(protected val scope: CoroutineScope) : Consum this.producer = producer //No need to save the job, it will be canceled on scope cancel if (producer.consumer != null) { - scope.launch { - producer.output.toChannel(input) + launch { + connectInput(producer) } // connect back producer.connect(this) } else error("Unreachable statement") } } + + protected open suspend fun connectInput(producer: Producer) { + producer.output.toChannel(input) + } } -abstract class AbstracProcessor(scope: CoroutineScope) : Processor, AbstractProducer(scope) { +abstract class AbstractProcessor(scope: CoroutineScope) : Processor, AbstractProducer(scope) { override var producer: Producer? = null protected set @@ -97,14 +111,18 @@ abstract class AbstracProcessor(scope: CoroutineScope) : Processor, this.producer = producer //No need to save the job, it will be canceled on scope cancel if (producer.consumer != null) { - scope.launch { - producer.output.toChannel(input) + launch { + connectInput(producer) } // connect back producer.connect(this) } else error("Unreachable statement") } } + + protected open suspend fun connectInput(producer: Producer) { + producer.output.toChannel(input) + } } /** @@ -116,9 +134,67 @@ class GenericProducer( block: suspend ProducerScope.() -> Unit ) : AbstractProducer(scope) { //The generation begins on first request to output - override val output: ReceiveChannel by lazy { scope.produce(capacity = capacity, block = block) } + override val output: ReceiveChannel by lazy { produce(capacity = capacity, block = block) } } +/** + * A simple pipeline [Processor] block + */ +class PipeProcessor( + scope: CoroutineScope, + capacity: Int = Channel.RENDEZVOUS, + process: suspend (T) -> R +) : AbstractProcessor(scope) { + + private val _input = Channel(capacity) + override val input: SendChannel get() = _input + override val output: ReceiveChannel = _input.map(coroutineContext, process) +} + +/** + * A [Processor] that splits the input in fixed chunk size and transforms each chunk + */ +class ChunkProcessor( + scope: CoroutineScope, + chunkSize: Int, + process: suspend (List) -> R +) : AbstractProcessor(scope) { + + private val _input = Channel(chunkSize) + + override val input: SendChannel get() = _input + + private val chunked = produce>(coroutineContext) { + val list = ArrayList(chunkSize) + repeat(chunkSize) { + list.add(_input.receive()) + } + send(list) + } + + override val output: ReceiveChannel = chunked.map(coroutineContext, process) +} + +/** + * A moving window [Processor] + */ +class WindowProcessor( + scope: CoroutineScope, + window: Int, + process: suspend (List) -> R +) : AbstractProcessor(scope) { + + + override val output: ReceiveChannel + get() = TODO("not implemented") //To change initializer of created properties use File | Settings | File Templates. + override val input: SendChannel + get() = TODO("not implemented") //To change initializer of created properties use File | Settings | File Templates. + + +} + +//TODO add circular buffer processor + /** * Thread-safe aggregator of values from input. The aggregator does not store all incoming values, it uses fold procedure * to incorporate them into state on-arrival. @@ -139,7 +215,7 @@ class Reducer( override val input: SendChannel by lazy { //create a channel and start process of reading all elements into aggregator Channel(capacity = Channel.RENDEZVOUS).also { - scope.launch { + launch { it.consumeEach { value -> state.update { fold(it, value) } } } } @@ -158,7 +234,7 @@ class Collector(scope: CoroutineScope) : AbstractConsumer(scope) { override val input: SendChannel by lazy { //create a channel and start process of reading all elements into aggregator Channel(capacity = Channel.RENDEZVOUS).also { - scope.launch { + launch { it.consumeEach { value -> mutex.withLock { _list.add(value) @@ -167,4 +243,34 @@ class Collector(scope: CoroutineScope) : AbstractConsumer(scope) { } } } -} \ No newline at end of file +} + +/** + * Convert a sequence to [Producer] + */ +fun Sequence.produce(scope: CoroutineScope = GlobalScope) = + GenericProducer(scope) { forEach { send(it) } } + +/** + * Convert a [ReceiveChannel] to [Producer] + */ +fun ReceiveChannel.produce(scope: CoroutineScope = GlobalScope) = + GenericProducer(scope) { for (e in this@produce) send(e) } + +/** + * Create a reducer and connect this producer to reducer + */ +fun Producer.reduce(initialState: S, fold: suspend (S, T) -> S) = + Reducer(this, initialState, fold).also { connect(it) } + +/** + * Create a [Collector] and attach it to this [Producer] + */ +fun Producer.collect() = + Collector(this).also { connect(it) } + +fun Producer.process(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) = + PipeProcessor(this, capacity, process) + +fun Producer.chunk(chunkSize: Int, process: suspend (List) -> R) = + ChunkProcessor(this, chunkSize, process) \ No newline at end of file