diff --git a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/BufferStreaming.kt b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/BufferStreaming.kt new file mode 100644 index 000000000..9ab23fbb4 --- /dev/null +++ b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/BufferStreaming.kt @@ -0,0 +1,178 @@ +package scientifik.kmath.sequential + +import kotlinx.atomicfu.atomic +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import scientifik.kmath.structures.Buffer +import scientifik.kmath.structures.asBuffer +import scientifik.kmath.structures.asSequence + +fun Buffer.asChannel(scope: CoroutineScope): ReceiveChannel = scope.produce { + for (i in (0 until size)) { + send(get(i)) + } +} + + +interface BufferProducer : Producer { + suspend fun receiveBuffer(): Buffer +} + +interface BufferConsumer : Consumer { + suspend fun sendBuffer(buffer: Buffer) +} + +abstract class AbstractBufferProducer(scope: CoroutineScope) : AbstractProducer(scope), BufferProducer { + + override fun connectOutput(consumer: Consumer) { + if (consumer is BufferConsumer) { + launch { + while (this.isActive) { + consumer.sendBuffer(receiveBuffer()) + } + } + } else { + super.connectOutput(consumer) + } + } +} + +abstract class AbstractBufferConsumer(scope: CoroutineScope) : AbstractConsumer(scope), BufferConsumer { + override fun connectInput(producer: Producer) { + if (producer is BufferProducer) { + launch { + while (isActive) { + sendBuffer(producer.receiveBuffer()) + } + } + } else { + super.connectInput(producer) + } + } +} + +abstract class AbstractBufferProcessor(scope: CoroutineScope) : + AbstractProcessor(scope), + BufferProducer, + BufferConsumer { + + override fun connectOutput(consumer: Consumer) { + if (consumer is BufferConsumer) { + launch { + while (this.isActive) { + consumer.sendBuffer(receiveBuffer()) + } + } + } else { + super.connectOutput(consumer) + } + } + + override fun connectInput(producer: Producer) { + if (producer is BufferProducer) { + launch { + while (isActive) { + sendBuffer(producer.receiveBuffer()) + } + } + } else { + super.connectInput(producer) + } + } +} + +/** + * The basic generic buffer producer supporting both arrays and element-by-element simultaneously + */ +class BasicBufferProducer( + scope: CoroutineScope, + capacity: Int = Channel.UNLIMITED, + block: suspend ProducerScope>.() -> Unit +) : AbstractBufferProducer(scope) { + + + private val currentArray = atomic?>(null) + private val channel: ReceiveChannel> by lazy { produce(capacity = capacity, block = block) } + private val cachingChannel by lazy { + channel.map { + it.also { buffer -> currentArray.lazySet(buffer.asChannel(this)) } + } + } + + private fun DoubleArray.asChannel() = produce { + for (value in this@asChannel) { + send(value) + } + } + + override suspend fun receiveBuffer(): Buffer = cachingChannel.receive() + + override suspend fun receive(): T = (currentArray.value ?: cachingChannel.receive().asChannel(this)).receive() +} + + +class BufferReducer( + scope: CoroutineScope, + initialState: S, + val fold: suspend (S, Buffer) -> S +) : AbstractBufferConsumer(scope) { + + var state: S = initialState + private set + + override suspend fun sendBuffer(buffer: Buffer) { + state = fold(state, buffer) + } + + override suspend fun send(value: T) = sendBuffer(arrayOf(value).asBuffer()) +} + +/** + * Convert a [Buffer] to single element producer, splitting it in chunks if necessary + */ +fun Buffer.produce(scope: CoroutineScope = GlobalScope, chunkSize: Int = Int.MAX_VALUE) = + if (size < chunkSize) { + BasicBufferProducer(scope) { send(this@produce) } + } else { + BasicBufferProducer(scope) { + //TODO optimize this! + asSequence().chunked(chunkSize).forEach { + send(it.asBuffer()) + } + } + } + + +/** + * A buffer processor that works with buffers but could accumulate at lest [accumulate] elements from single input before processing. + * + * This class combines functions from [ChunkProcessor] and single buffer processor + */ +class AccumulatingBufferProcessor( + scope: CoroutineScope, + val accumulate: Int, + val process: suspend (Buffer) -> Buffer +) : + AbstractBufferProcessor(scope) { + + private val inputChannel = Channel>() + private val outputChannel = inputChannel.map { process(it) } + + override suspend fun receive(): R { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override suspend fun send(value: T) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override suspend fun receiveBuffer(): Buffer = outputChannel.receive() + + override suspend fun sendBuffer(buffer: Buffer) { + inputChannel.send(buffer) + } + +} \ No newline at end of file diff --git a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/DoubleProcessors.kt b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/DoubleProcessors.kt deleted file mode 100644 index bdeb2d319..000000000 --- a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/DoubleProcessors.kt +++ /dev/null @@ -1,7 +0,0 @@ -package scientifik.kmath.sequential - -import kotlinx.coroutines.CoroutineScope - -//class FFTProcessor(scope: CoroutineScope): AbstractDoubleProcessor(scope){ -// -//} \ No newline at end of file diff --git a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/DoubleStreaming.kt b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/DoubleStreaming.kt deleted file mode 100644 index c2f048c34..000000000 --- a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/DoubleStreaming.kt +++ /dev/null @@ -1,144 +0,0 @@ -package scientifik.kmath.sequential - -import kotlinx.atomicfu.atomic -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.channels.* -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import scientifik.kmath.structures.Buffer -import scientifik.kmath.structures.asBuffer -import scientifik.kmath.structures.asSequence - -fun Buffer.asChannel(scope: CoroutineScope): ReceiveChannel = scope.produce { - for (i in (0 until size)) { - send(get(i)) - } -} - -interface DoubleProducer : Producer { - suspend fun receiveArray(): Buffer -} - -interface DoubleConsumer : Consumer { - suspend fun sendArray(array: Buffer) -} - -abstract class AbstractDoubleProducer(scope: CoroutineScope) : AbstractProducer(scope), DoubleProducer { - - override fun connectOutput(consumer: Consumer) { - if (consumer is DoubleConsumer) { - launch { - while (this.isActive) { - consumer.sendArray(receiveArray()) - } - } - } else { - super.connectOutput(consumer) - } - } -} - -abstract class AbstractDoubleConsumer(scope: CoroutineScope) : AbstractConsumer(scope), DoubleConsumer { - override fun connectInput(producer: Producer) { - if (producer is DoubleProducer) { - launch { - while (isActive) { - sendArray(producer.receiveArray()) - } - } - } else { - super.connectInput(producer) - } - } -} - -abstract class AbstractDoubleProcessor(scope: CoroutineScope) : AbstractProcessor(scope), - DoubleProducer, DoubleConsumer { - - override fun connectOutput(consumer: Consumer) { - if (consumer is DoubleConsumer) { - launch { - while (this.isActive) { - consumer.sendArray(receiveArray()) - } - } - } else { - super.connectOutput(consumer) - } - } - - override fun connectInput(producer: Producer) { - if (producer is DoubleProducer) { - launch { - while (isActive) { - sendArray(producer.receiveArray()) - } - } - } else { - super.connectInput(producer) - } - } -} - -/** - * The basic [Double] producer supporting both arrays and element-by-element simultaneously - */ -class BasicDoubleProducer( - scope: CoroutineScope, - capacity: Int = Channel.UNLIMITED, - block: suspend ProducerScope>.() -> Unit -) : AbstractDoubleProducer(scope) { - - - private val currentArray = atomic?>(null) - private val channel: ReceiveChannel> by lazy { produce(capacity = capacity, block = block) } - private val cachingChannel by lazy { - channel.map { - it.also { doubles -> currentArray.lazySet(doubles.asChannel(this)) } - } - } - - private fun DoubleArray.asChannel() = produce { - for (value in this@asChannel) { - send(value) - } - } - - override suspend fun receiveArray(): Buffer = cachingChannel.receive() - - override suspend fun receive(): Double = (currentArray.value ?: cachingChannel.receive().asChannel(this)).receive() -} - - -class DoubleReducer( - scope: CoroutineScope, - initialState: S, - val fold: suspend (S, Buffer) -> S -) : AbstractDoubleConsumer(scope) { - - var state: S = initialState - private set - - override suspend fun sendArray(array: Buffer) { - state = fold(state, array) - } - - override suspend fun send(value: Double) = sendArray(doubleArrayOf(value).asBuffer()) -} - -/** - * Convert an array to single element producer, splitting it in chunks if necessary - */ -fun Buffer.produce(scope: CoroutineScope = GlobalScope, chunkSize: Int = Int.MAX_VALUE) = - if (size < chunkSize) { - BasicDoubleProducer(scope) { send(this@produce) } - } else { - BasicDoubleProducer(scope) { - //TODO optimize this! - asSequence().chunked(chunkSize).forEach { - send(it.asBuffer()) - } - } - } \ No newline at end of file