Hidden channels in streaming blocks.

This commit is contained in:
Alexander Nozik 2019-02-06 22:24:03 +03:00
parent c0bdacecb3
commit 28695148e9
2 changed files with 96 additions and 44 deletions

View File

@ -1,16 +1,17 @@
package scientifik.kmath.sequential
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.toChannel
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.launch
interface DoubleProducer : Producer<Double> {
val arrayOutput: ReceiveChannel<DoubleArray>
suspend fun receiveArray(): DoubleArray
}
interface DoubleConsumer : Consumer<Double> {
val arrayInput: SendChannel<DoubleArray>
suspend fun sendArray(): DoubleArray
}
@ -19,7 +20,7 @@ abstract class AbstractDoubleProducer(scope: CoroutineScope) : AbstractProducer<
if (consumer is DoubleConsumer) {
arrayOutput.toChannel(consumer.arrayInput)
} else {
super.connectOutput(consumer)
connectOutput(super, consumer)
}
}
}
@ -41,7 +42,7 @@ abstract class AbstractDoubleProcessor(scope: CoroutineScope) : AbstractProcesso
if (consumer is DoubleConsumer) {
arrayOutput.toChannel(consumer.arrayInput)
} else {
super.connectOutput(consumer)
connectOutput(super, consumer)
}
}
@ -53,3 +54,26 @@ abstract class AbstractDoubleProcessor(scope: CoroutineScope) : AbstractProcesso
}
}
}
class DoubleReducer<S>(
scope: CoroutineScope,
initialState: S,
fold: suspend (S, DoubleArray) -> S
) : AbstractDoubleConsumer(scope) {
private val state = atomic(initialState)
val value: S = state.value
override val arrayInput: SendChannel<DoubleArray> by lazy {
//create a channel and start process of reading all elements into aggregator
Channel<DoubleArray>(capacity = Channel.RENDEZVOUS).also {
launch {
it.consumeEach { value -> state.update { fold(it, value) } }
}
}
}
override val input: SendChannel<DoubleArray> = object :Abstr
}

View File

@ -5,6 +5,7 @@ import kotlinx.atomicfu.update
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
@ -20,24 +21,30 @@ import kotlin.coroutines.CoroutineContext
* Manually putting elements to connected block could lead to undetermined behavior and must be avoided.
*/
interface Producer<T> : CoroutineScope {
val output: ReceiveChannel<T>
fun connect(consumer: Consumer<T>)
suspend fun receive(): T
val consumer: Consumer<T>?
val outputIsConnected: Boolean get() = consumer != null
//fun close()
}
/**
* Terminal chain block. Could consume an element sequence and be connected to signle [Producer]
*/
interface Consumer<T> : CoroutineScope {
val input: SendChannel<T>
fun connect(producer: Producer<T>)
suspend fun send(value: T)
val producer: Producer<T>?
val inputIsConnected: Boolean get() = producer != null
//fun close()
}
interface Processor<T, R> : Consumer<T>, Producer<R>
@ -56,17 +63,19 @@ abstract class AbstractProducer<T>(scope: CoroutineScope) : Producer<T> {
this.consumer = consumer
if (consumer.producer != null) {
//No need to save the job, it will be canceled on scope cancel
launch {
connectOutput(consumer)
}
// connect back, consumer is already set so no circular reference
consumer.connect(this)
} else error("Unreachable statement")
}
}
protected open suspend fun connectOutput(consumer: Consumer<T>) {
output.toChannel(consumer.input)
protected open fun connectOutput(consumer: Consumer<T>) {
launch {
while (this.isActive) {
consumer.send(receive())
}
}
}
}
@ -84,17 +93,19 @@ abstract class AbstractConsumer<T>(scope: CoroutineScope) : Consumer<T> {
this.producer = producer
//No need to save the job, it will be canceled on scope cancel
if (producer.consumer != null) {
launch {
connectInput(producer)
}
// connect back
producer.connect(this)
} else error("Unreachable statement")
}
}
protected open suspend fun connectInput(producer: Producer<T>) {
producer.output.toChannel(input)
protected open fun connectInput(producer: Producer<T>) {
launch {
while (isActive) {
send(producer.receive())
}
}
}
}
@ -111,17 +122,19 @@ abstract class AbstractProcessor<T, R>(scope: CoroutineScope) : Processor<T, R>,
this.producer = producer
//No need to save the job, it will be canceled on scope cancel
if (producer.consumer != null) {
launch {
connectInput(producer)
}
// connect back
producer.connect(this)
} else error("Unreachable statement")
}
}
protected open suspend fun connectInput(producer: Producer<T>) {
producer.output.toChannel(input)
protected open fun connectInput(producer: Producer<T>) {
launch {
while (isActive) {
send(producer.receive())
}
}
}
}
@ -133,8 +146,10 @@ class GenericProducer<T>(
capacity: Int = Channel.UNLIMITED,
block: suspend ProducerScope<T>.() -> Unit
) : AbstractProducer<T>(scope) {
//The generation begins on first request to output
override val output: ReceiveChannel<T> by lazy { produce(capacity = capacity, block = block) }
private val channel: ReceiveChannel<T> by lazy { produce(capacity = capacity, block = block) }
override suspend fun receive(): T = channel.receive()
}
/**
@ -146,9 +161,14 @@ class PipeProcessor<T, R>(
process: suspend (T) -> R
) : AbstractProcessor<T, R>(scope) {
private val _input = Channel<T>(capacity)
override val input: SendChannel<T> get() = _input
override val output: ReceiveChannel<R> = _input.map(coroutineContext, process)
private val input = Channel<T>(capacity)
private val output: ReceiveChannel<R> = input.map(coroutineContext, process)
override suspend fun receive(): R = output.receive()
override suspend fun send(value: T) {
input.send(value)
}
}
/**
@ -160,41 +180,45 @@ class ChunkProcessor<T, R>(
process: suspend (List<T>) -> R
) : AbstractProcessor<T, R>(scope) {
private val _input = Channel<T>(chunkSize)
override val input: SendChannel<T> get() = _input
private val input = Channel<T>(chunkSize)
private val chunked = produce<List<T>>(coroutineContext) {
val list = ArrayList<T>(chunkSize)
repeat(chunkSize) {
list.add(_input.receive())
list.add(input.receive())
}
send(list)
}
override val output: ReceiveChannel<R> = chunked.map(coroutineContext, process)
private val output: ReceiveChannel<R> = chunked.map(coroutineContext, process)
override suspend fun receive(): R = output.receive()
override suspend fun send(value: T) {
input.send(value)
}
}
/**
* A moving window [Processor]
* A moving window [Processor] with circular buffer
*/
class WindowProcessor<T, R>(
class WindowedProcessor<T, R>(
scope: CoroutineScope,
window: Int,
process: suspend (List<T>) -> R
) : AbstractProcessor<T, R>(scope) {
override suspend fun receive(): R {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override val output: ReceiveChannel<R>
get() = TODO("not implemented") //To change initializer of created properties use File | Settings | File Templates.
override val input: SendChannel<T>
get() = TODO("not implemented") //To change initializer of created properties use File | Settings | File Templates.
override suspend fun send(value: T) {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
}
//TODO add circular buffer processor
/**
* Thread-safe aggregator of values from input. The aggregator does not store all incoming values, it uses fold procedure
* to incorporate them into state on-arrival.
@ -212,7 +236,7 @@ class Reducer<T, S>(
val value: S = state.value
override val input: SendChannel<T> by lazy {
private val input: SendChannel<T> by lazy {
//create a channel and start process of reading all elements into aggregator
Channel<T>(capacity = Channel.RENDEZVOUS).also {
launch {
@ -220,6 +244,8 @@ class Reducer<T, S>(
}
}
}
override suspend fun send(value: T) = input.send(value)
}
/**
@ -231,7 +257,7 @@ class Collector<T>(scope: CoroutineScope) : AbstractConsumer<T>(scope) {
private val mutex = Mutex()
val list: List<T> get() = _list
override val input: SendChannel<T> by lazy {
private val input: SendChannel<T> by lazy {
//create a channel and start process of reading all elements into aggregator
Channel<T>(capacity = Channel.RENDEZVOUS).also {
launch {
@ -243,6 +269,8 @@ class Collector<T>(scope: CoroutineScope) : AbstractConsumer<T>(scope) {
}
}
}
override suspend fun send(value: T) = input.send(value)
}
/**