From 1fe786c90f7141bcadc657ccf0168e1fad582bed Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Tue, 5 Feb 2019 22:31:41 +0300 Subject: [PATCH] Basic streaming blocks --- .../kmath/sequential/Accumulators.kt | 57 ------ .../scientifik/kmath/sequential/Reducers.kt | 21 --- .../kmath/sequential/StreamingBlocks.kt | 170 ++++++++++++++++++ 3 files changed, 170 insertions(+), 78 deletions(-) delete mode 100644 kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Accumulators.kt delete mode 100644 kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Reducers.kt create mode 100644 kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/StreamingBlocks.kt diff --git a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Accumulators.kt b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Accumulators.kt deleted file mode 100644 index ef7950519..000000000 --- a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Accumulators.kt +++ /dev/null @@ -1,57 +0,0 @@ -package scientifik.kmath.sequential - -import kotlinx.atomicfu.atomic -import kotlinx.atomicfu.getAndUpdate -import kotlinx.coroutines.channels.ReceiveChannel -import scientifik.kmath.operations.Space - -/** - * An object with a state that accumulates incoming elements - */ -interface Accumulator { - /** - * Push a value to accumulator. Blocks if [Accumulator] can't access any more elements at that time - */ - fun push(value: T) - - /** - * Does the same as [push], but suspends instead of blocking if accumulator is full - */ - suspend fun send(value: T) = push(value) -} - -/** - * Push all elements to accumulator - */ -fun Accumulator.pushAll(values: Iterable) { - for (value in values) { - push(value) - } -} - -/** - * Offer all elements from channel to accumulator - */ -suspend fun Accumulator.offerAll(channel: ReceiveChannel) { - for (value in channel) { - send(value) - } -} - -/** - * Generic thread-safe average - */ -class GenericMean(val context: Space) : Accumulator { - //TODO add guard against overflow - private val counter = atomic(0) - val sum = atomic(context.zero) - - val value get() = with(context) { sum.value / counter.value } - - override fun push(value: T) { - with(context) { - counter.incrementAndGet() - sum.getAndUpdate { it + value } - } - } -} \ No newline at end of file diff --git a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Reducers.kt b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Reducers.kt deleted file mode 100644 index cf0601135..000000000 --- a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Reducers.kt +++ /dev/null @@ -1,21 +0,0 @@ -package scientifik.kmath.sequential - -import scientifik.kmath.operations.Space - - -typealias Reducer = (C, Iterable) -> R - -inline fun Iterable.reduce(context: C, crossinline reducer: Reducer) = - reducer(context, this@reduce) - -inline fun Sequence.reduce(context: C, crossinline reducer: Reducer) = - asIterable().reduce(context, reducer) - -inline fun Array.reduce(context: C, crossinline reducer: Reducer) = - asIterable().reduce(context, reducer) - -object Reducers { - fun mean(): Reducer, T> = { context, data -> - data.fold(GenericMean(context)) { sum, value -> sum.apply { push(value) } }.value - } -} \ No newline at end of file diff --git a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/StreamingBlocks.kt b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/StreamingBlocks.kt new file mode 100644 index 000000000..baacead46 --- /dev/null +++ b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/StreamingBlocks.kt @@ -0,0 +1,170 @@ +package scientifik.kmath.sequential + +import kotlinx.atomicfu.atomic +import kotlinx.atomicfu.update +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +/** + * Initial chain block. Could produce an element sequence and be connected to single [Consumer] + * + * The general rule is that channel is created on first call. Also each element is responsible for its connection so + * while the connections are symmetric, the scope, used for making the connection is responsible for cancelation. + * + * Also connections are not reversible. Once connected block stays faithful until it finishes processing. + * Manually putting elements to connected block could lead to undetermined behavior and must be avoided. + */ +interface Producer { + val output: ReceiveChannel + fun connect(consumer: Consumer) + + val consumer: Consumer? + + val outputIsConnected: Boolean get() = consumer != null +} + +/** + * Terminal chain block. Could consume an element sequence and be connected to signle [Producer] + */ +interface Consumer { + val input: SendChannel + fun connect(producer: Producer) + + val producer: Producer? + + val inputIsConnected: Boolean get() = producer != null +} + +interface Processor : Consumer, Producer + +abstract class AbstractProducer(protected val scope: CoroutineScope) : Producer { + override var consumer: Consumer? = null + protected set + + override fun connect(consumer: Consumer) { + //Ignore if already connected to specific consumer + if (consumer != this.consumer) { + if (outputIsConnected) error("The output slot of producer is occupied") + if (consumer.inputIsConnected) error("The input slot of consumer is occupied") + this.consumer = consumer + if (consumer.producer != null) { + //No need to save the job, it will be canceled on scope cancel + scope.launch { + output.toChannel(consumer.input) + } + // connect back, consumer is already set so no circular reference + consumer.connect(this) + } else error("Unreachable statement") + } + } +} + +abstract class AbstractConsumer(protected val scope: CoroutineScope) : Consumer { + override var producer: Producer? = null + protected set + + override fun connect(producer: Producer) { + //Ignore if already connected to specific consumer + if (producer != this.producer) { + if (inputIsConnected) error("The input slot of consumer is occupied") + if (producer.outputIsConnected) error("The input slot of producer is occupied") + this.producer = producer + //No need to save the job, it will be canceled on scope cancel + if (producer.consumer != null) { + scope.launch { + producer.output.toChannel(input) + } + // connect back + producer.connect(this) + } else error("Unreachable statement") + } + } +} + +abstract class AbstracProcessor(scope: CoroutineScope) : Processor, AbstractProducer(scope) { + + override var producer: Producer? = null + protected set + + override fun connect(producer: Producer) { + //Ignore if already connected to specific consumer + if (producer != this.producer) { + if (inputIsConnected) error("The input slot of consumer is occupied") + if (producer.outputIsConnected) error("The input slot of producer is occupied") + this.producer = producer + //No need to save the job, it will be canceled on scope cancel + if (producer.consumer != null) { + scope.launch { + producer.output.toChannel(input) + } + // connect back + producer.connect(this) + } else error("Unreachable statement") + } + } +} + +/** + * A simple [produce]-based producer + */ +class GenericProducer( + scope: CoroutineScope, + capacity: Int = Channel.UNLIMITED, + block: suspend ProducerScope.() -> Unit +) : AbstractProducer(scope) { + //The generation begins on first request to output + override val output: ReceiveChannel by lazy { scope.produce(capacity = capacity, block = block) } +} + +/** + * Thread-safe aggregator of values from input. The aggregator does not store all incoming values, it uses fold procedure + * to incorporate them into state on-arrival. + * The current aggregated state could be accessed by [value]. The input channel is inactive unless requested + * @param T - the type of the input element + * @param S - the type of the aggregator + */ +class Reducer( + scope: CoroutineScope, + initialState: S, + fold: suspend (S, T) -> S +) : AbstractConsumer(scope) { + + private val state = atomic(initialState) + + val value: S = state.value + + override val input: SendChannel by lazy { + //create a channel and start process of reading all elements into aggregator + Channel(capacity = Channel.RENDEZVOUS).also { + scope.launch { + it.consumeEach { value -> state.update { fold(it, value) } } + } + } + } +} + +/** + * Collector that accumulates all values in a list. List could be accessed from non-suspending environment via [list] value. + */ +class Collector(scope: CoroutineScope) : AbstractConsumer(scope) { + + private val _list = ArrayList() + private val mutex = Mutex() + val list: List get() = _list + + override val input: SendChannel by lazy { + //create a channel and start process of reading all elements into aggregator + Channel(capacity = Channel.RENDEZVOUS).also { + scope.launch { + it.consumeEach { value -> + mutex.withLock { + _list.add(value) + } + } + } + } + } +} \ No newline at end of file