Specialized streaming blocks for doubles

This commit is contained in:
Alexander Nozik 2019-02-06 18:13:30 +03:00
parent 1fe786c90f
commit f8f7aa2e44
2 changed files with 176 additions and 15 deletions

View File

@ -0,0 +1,55 @@
package scientifik.kmath.sequential
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.toChannel
interface DoubleProducer : Producer<Double> {
val arrayOutput: ReceiveChannel<DoubleArray>
}
interface DoubleConsumer : Consumer<Double> {
val arrayInput: SendChannel<DoubleArray>
}
abstract class AbstractDoubleProducer(scope: CoroutineScope) : AbstractProducer<Double>(scope), DoubleProducer {
override suspend fun connectOutput(consumer: Consumer<Double>) {
if (consumer is DoubleConsumer) {
arrayOutput.toChannel(consumer.arrayInput)
} else {
super.connectOutput(consumer)
}
}
}
abstract class AbstractDoubleConsumer(scope: CoroutineScope) : AbstractConsumer<Double>(scope), DoubleConsumer {
override suspend fun connectInput(producer: Producer<Double>) {
if (producer is DoubleProducer) {
producer.arrayOutput.toChannel(arrayInput)
} else {
super.connectInput(producer)
}
}
}
abstract class AbstractDoubleProcessor(scope: CoroutineScope) : AbstractProcessor<Double, Double>(scope),
DoubleProducer, DoubleConsumer {
override suspend fun connectOutput(consumer: Consumer<Double>) {
if (consumer is DoubleConsumer) {
arrayOutput.toChannel(consumer.arrayInput)
} else {
super.connectOutput(consumer)
}
}
override suspend fun connectInput(producer: Producer<Double>) {
if (producer is DoubleProducer) {
producer.arrayOutput.toChannel(arrayInput)
} else {
super.connectInput(producer)
}
}
}

View File

@ -3,10 +3,12 @@ package scientifik.kmath.sequential
import kotlinx.atomicfu.atomic import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update import kotlinx.atomicfu.update
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.* import kotlinx.coroutines.channels.*
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import kotlin.coroutines.CoroutineContext
/** /**
* Initial chain block. Could produce an element sequence and be connected to single [Consumer] * Initial chain block. Could produce an element sequence and be connected to single [Consumer]
@ -17,7 +19,7 @@ import kotlinx.coroutines.sync.withLock
* Also connections are not reversible. Once connected block stays faithful until it finishes processing. * Also connections are not reversible. Once connected block stays faithful until it finishes processing.
* Manually putting elements to connected block could lead to undetermined behavior and must be avoided. * Manually putting elements to connected block could lead to undetermined behavior and must be avoided.
*/ */
interface Producer<T> { interface Producer<T> : CoroutineScope {
val output: ReceiveChannel<T> val output: ReceiveChannel<T>
fun connect(consumer: Consumer<T>) fun connect(consumer: Consumer<T>)
@ -29,7 +31,7 @@ interface Producer<T> {
/** /**
* Terminal chain block. Could consume an element sequence and be connected to signle [Producer] * Terminal chain block. Could consume an element sequence and be connected to signle [Producer]
*/ */
interface Consumer<T> { interface Consumer<T> : CoroutineScope {
val input: SendChannel<T> val input: SendChannel<T>
fun connect(producer: Producer<T>) fun connect(producer: Producer<T>)
@ -40,7 +42,9 @@ interface Consumer<T> {
interface Processor<T, R> : Consumer<T>, Producer<R> interface Processor<T, R> : Consumer<T>, Producer<R>
abstract class AbstractProducer<T>(protected val scope: CoroutineScope) : Producer<T> { abstract class AbstractProducer<T>(scope: CoroutineScope) : Producer<T> {
override val coroutineContext: CoroutineContext = scope.coroutineContext
override var consumer: Consumer<T>? = null override var consumer: Consumer<T>? = null
protected set protected set
@ -52,17 +56,23 @@ abstract class AbstractProducer<T>(protected val scope: CoroutineScope) : Produc
this.consumer = consumer this.consumer = consumer
if (consumer.producer != null) { if (consumer.producer != null) {
//No need to save the job, it will be canceled on scope cancel //No need to save the job, it will be canceled on scope cancel
scope.launch { launch {
output.toChannel(consumer.input) connectOutput(consumer)
} }
// connect back, consumer is already set so no circular reference // connect back, consumer is already set so no circular reference
consumer.connect(this) consumer.connect(this)
} else error("Unreachable statement") } else error("Unreachable statement")
} }
} }
protected open suspend fun connectOutput(consumer: Consumer<T>) {
output.toChannel(consumer.input)
}
} }
abstract class AbstractConsumer<T>(protected val scope: CoroutineScope) : Consumer<T> { abstract class AbstractConsumer<T>(scope: CoroutineScope) : Consumer<T> {
override val coroutineContext: CoroutineContext = scope.coroutineContext
override var producer: Producer<T>? = null override var producer: Producer<T>? = null
protected set protected set
@ -74,17 +84,21 @@ abstract class AbstractConsumer<T>(protected val scope: CoroutineScope) : Consum
this.producer = producer this.producer = producer
//No need to save the job, it will be canceled on scope cancel //No need to save the job, it will be canceled on scope cancel
if (producer.consumer != null) { if (producer.consumer != null) {
scope.launch { launch {
producer.output.toChannel(input) connectInput(producer)
} }
// connect back // connect back
producer.connect(this) producer.connect(this)
} else error("Unreachable statement") } else error("Unreachable statement")
} }
} }
protected open suspend fun connectInput(producer: Producer<T>) {
producer.output.toChannel(input)
}
} }
abstract class AbstracProcessor<T, R>(scope: CoroutineScope) : Processor<T, R>, AbstractProducer<R>(scope) { abstract class AbstractProcessor<T, R>(scope: CoroutineScope) : Processor<T, R>, AbstractProducer<R>(scope) {
override var producer: Producer<T>? = null override var producer: Producer<T>? = null
protected set protected set
@ -97,14 +111,18 @@ abstract class AbstracProcessor<T, R>(scope: CoroutineScope) : Processor<T, R>,
this.producer = producer this.producer = producer
//No need to save the job, it will be canceled on scope cancel //No need to save the job, it will be canceled on scope cancel
if (producer.consumer != null) { if (producer.consumer != null) {
scope.launch { launch {
producer.output.toChannel(input) connectInput(producer)
} }
// connect back // connect back
producer.connect(this) producer.connect(this)
} else error("Unreachable statement") } else error("Unreachable statement")
} }
} }
protected open suspend fun connectInput(producer: Producer<T>) {
producer.output.toChannel(input)
}
} }
/** /**
@ -116,9 +134,67 @@ class GenericProducer<T>(
block: suspend ProducerScope<T>.() -> Unit block: suspend ProducerScope<T>.() -> Unit
) : AbstractProducer<T>(scope) { ) : AbstractProducer<T>(scope) {
//The generation begins on first request to output //The generation begins on first request to output
override val output: ReceiveChannel<T> by lazy { scope.produce(capacity = capacity, block = block) } override val output: ReceiveChannel<T> by lazy { produce(capacity = capacity, block = block) }
} }
/**
* A simple pipeline [Processor] block
*/
class PipeProcessor<T, R>(
scope: CoroutineScope,
capacity: Int = Channel.RENDEZVOUS,
process: suspend (T) -> R
) : AbstractProcessor<T, R>(scope) {
private val _input = Channel<T>(capacity)
override val input: SendChannel<T> get() = _input
override val output: ReceiveChannel<R> = _input.map(coroutineContext, process)
}
/**
* A [Processor] that splits the input in fixed chunk size and transforms each chunk
*/
class ChunkProcessor<T, R>(
scope: CoroutineScope,
chunkSize: Int,
process: suspend (List<T>) -> R
) : AbstractProcessor<T, R>(scope) {
private val _input = Channel<T>(chunkSize)
override val input: SendChannel<T> get() = _input
private val chunked = produce<List<T>>(coroutineContext) {
val list = ArrayList<T>(chunkSize)
repeat(chunkSize) {
list.add(_input.receive())
}
send(list)
}
override val output: ReceiveChannel<R> = chunked.map(coroutineContext, process)
}
/**
* A moving window [Processor]
*/
class WindowProcessor<T, R>(
scope: CoroutineScope,
window: Int,
process: suspend (List<T>) -> R
) : AbstractProcessor<T, R>(scope) {
override val output: ReceiveChannel<R>
get() = TODO("not implemented") //To change initializer of created properties use File | Settings | File Templates.
override val input: SendChannel<T>
get() = TODO("not implemented") //To change initializer of created properties use File | Settings | File Templates.
}
//TODO add circular buffer processor
/** /**
* Thread-safe aggregator of values from input. The aggregator does not store all incoming values, it uses fold procedure * Thread-safe aggregator of values from input. The aggregator does not store all incoming values, it uses fold procedure
* to incorporate them into state on-arrival. * to incorporate them into state on-arrival.
@ -139,7 +215,7 @@ class Reducer<T, S>(
override val input: SendChannel<T> by lazy { override val input: SendChannel<T> by lazy {
//create a channel and start process of reading all elements into aggregator //create a channel and start process of reading all elements into aggregator
Channel<T>(capacity = Channel.RENDEZVOUS).also { Channel<T>(capacity = Channel.RENDEZVOUS).also {
scope.launch { launch {
it.consumeEach { value -> state.update { fold(it, value) } } it.consumeEach { value -> state.update { fold(it, value) } }
} }
} }
@ -158,7 +234,7 @@ class Collector<T>(scope: CoroutineScope) : AbstractConsumer<T>(scope) {
override val input: SendChannel<T> by lazy { override val input: SendChannel<T> by lazy {
//create a channel and start process of reading all elements into aggregator //create a channel and start process of reading all elements into aggregator
Channel<T>(capacity = Channel.RENDEZVOUS).also { Channel<T>(capacity = Channel.RENDEZVOUS).also {
scope.launch { launch {
it.consumeEach { value -> it.consumeEach { value ->
mutex.withLock { mutex.withLock {
_list.add(value) _list.add(value)
@ -168,3 +244,33 @@ class Collector<T>(scope: CoroutineScope) : AbstractConsumer<T>(scope) {
} }
} }
} }
/**
* Convert a sequence to [Producer]
*/
fun <T> Sequence<T>.produce(scope: CoroutineScope = GlobalScope) =
GenericProducer<T>(scope) { forEach { send(it) } }
/**
* Convert a [ReceiveChannel] to [Producer]
*/
fun <T> ReceiveChannel<T>.produce(scope: CoroutineScope = GlobalScope) =
GenericProducer<T>(scope) { for (e in this@produce) send(e) }
/**
* Create a reducer and connect this producer to reducer
*/
fun <T, S> Producer<T>.reduce(initialState: S, fold: suspend (S, T) -> S) =
Reducer(this, initialState, fold).also { connect(it) }
/**
* Create a [Collector] and attach it to this [Producer]
*/
fun <T> Producer<T>.collect() =
Collector<T>(this).also { connect(it) }
fun <T, R> Producer<T>.process(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) =
PipeProcessor(this, capacity, process)
fun <T, R> Producer<T>.chunk(chunkSize: Int, process: suspend (List<T>) -> R) =
ChunkProcessor(this, chunkSize, process)