diff --git a/kmath-commons/src/main/kotlin/scientifik/kmath/transform/Transformations.kt b/kmath-commons/src/main/kotlin/scientifik/kmath/transform/Transformations.kt index a4db4b1bd..17907adbe 100644 --- a/kmath-commons/src/main/kotlin/scientifik/kmath/transform/Transformations.kt +++ b/kmath-commons/src/main/kotlin/scientifik/kmath/transform/Transformations.kt @@ -2,6 +2,9 @@ package scientifik.kmath.transform import org.apache.commons.math3.transform.* import scientifik.kmath.operations.Complex +import scientifik.kmath.sequential.Processor +import scientifik.kmath.sequential.Producer +import scientifik.kmath.sequential.map import scientifik.kmath.structures.* @@ -60,4 +63,24 @@ object Transformations { ): BufferTransform = { FastHadamardTransformer().transform(it.asArray(), direction).asBuffer() } +} + +/** + * Process given [Producer] with commons-math fft transformation + */ +fun Producer>.FFT( + normalization: DftNormalization = DftNormalization.STANDARD, + direction: TransformType = TransformType.FORWARD +): Processor, Buffer> { + val transform = Transformations.fourier(normalization, direction) + return map { transform(it) } +} + +@JvmName("realFFT") +fun Producer>.FFT( + normalization: DftNormalization = DftNormalization.STANDARD, + direction: TransformType = TransformType.FORWARD +): Processor, Buffer> { + val transform = Transformations.realFourier(normalization, direction) + return map { transform(it) } } \ No newline at end of file diff --git a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/BufferStreaming.kt b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/BufferStreaming.kt index 9ab23fbb4..ad9e4f259 100644 --- a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/BufferStreaming.kt +++ b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/BufferStreaming.kt @@ -1,178 +1,78 @@ package scientifik.kmath.sequential -import kotlinx.atomicfu.atomic import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.channels.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.produce import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import scientifik.kmath.structures.Buffer -import scientifik.kmath.structures.asBuffer -import scientifik.kmath.structures.asSequence - -fun Buffer.asChannel(scope: CoroutineScope): ReceiveChannel = scope.produce { - for (i in (0 until size)) { - send(get(i)) - } -} - - -interface BufferProducer : Producer { - suspend fun receiveBuffer(): Buffer -} - -interface BufferConsumer : Consumer { - suspend fun sendBuffer(buffer: Buffer) -} - -abstract class AbstractBufferProducer(scope: CoroutineScope) : AbstractProducer(scope), BufferProducer { - - override fun connectOutput(consumer: Consumer) { - if (consumer is BufferConsumer) { - launch { - while (this.isActive) { - consumer.sendBuffer(receiveBuffer()) - } - } - } else { - super.connectOutput(consumer) - } - } -} - -abstract class AbstractBufferConsumer(scope: CoroutineScope) : AbstractConsumer(scope), BufferConsumer { - override fun connectInput(producer: Producer) { - if (producer is BufferProducer) { - launch { - while (isActive) { - sendBuffer(producer.receiveBuffer()) - } - } - } else { - super.connectInput(producer) - } - } -} - -abstract class AbstractBufferProcessor(scope: CoroutineScope) : - AbstractProcessor(scope), - BufferProducer, - BufferConsumer { - - override fun connectOutput(consumer: Consumer) { - if (consumer is BufferConsumer) { - launch { - while (this.isActive) { - consumer.sendBuffer(receiveBuffer()) - } - } - } else { - super.connectOutput(consumer) - } - } - - override fun connectInput(producer: Producer) { - if (producer is BufferProducer) { - launch { - while (isActive) { - sendBuffer(producer.receiveBuffer()) - } - } - } else { - super.connectInput(producer) - } - } -} +import scientifik.kmath.structures.BufferFactory /** - * The basic generic buffer producer supporting both arrays and element-by-element simultaneously + * A processor that collects incoming elements into fixed size buffers */ -class BasicBufferProducer( +class JoinProcessor( scope: CoroutineScope, - capacity: Int = Channel.UNLIMITED, - block: suspend ProducerScope>.() -> Unit -) : AbstractBufferProducer(scope) { + bufferSize: Int, + bufferFactory: BufferFactory = Buffer.Companion::boxing +) : AbstractProcessor>(scope) { + private val input = Channel(bufferSize) - private val currentArray = atomic?>(null) - private val channel: ReceiveChannel> by lazy { produce(capacity = capacity, block = block) } - private val cachingChannel by lazy { - channel.map { - it.also { buffer -> currentArray.lazySet(buffer.asChannel(this)) } - } - } - - private fun DoubleArray.asChannel() = produce { - for (value in this@asChannel) { - send(value) - } - } - - override suspend fun receiveBuffer(): Buffer = cachingChannel.receive() - - override suspend fun receive(): T = (currentArray.value ?: cachingChannel.receive().asChannel(this)).receive() -} - - -class BufferReducer( - scope: CoroutineScope, - initialState: S, - val fold: suspend (S, Buffer) -> S -) : AbstractBufferConsumer(scope) { - - var state: S = initialState - private set - - override suspend fun sendBuffer(buffer: Buffer) { - state = fold(state, buffer) - } - - override suspend fun send(value: T) = sendBuffer(arrayOf(value).asBuffer()) -} - -/** - * Convert a [Buffer] to single element producer, splitting it in chunks if necessary - */ -fun Buffer.produce(scope: CoroutineScope = GlobalScope, chunkSize: Int = Int.MAX_VALUE) = - if (size < chunkSize) { - BasicBufferProducer(scope) { send(this@produce) } - } else { - BasicBufferProducer(scope) { - //TODO optimize this! - asSequence().chunked(chunkSize).forEach { - send(it.asBuffer()) + private val output = produce(coroutineContext) { + val list = ArrayList(bufferSize) + while (isActive) { + list.clear() + repeat(bufferSize) { + list.add(input.receive()) } + val buffer = bufferFactory(bufferSize) { list[it] } + send(buffer) } } - -/** - * A buffer processor that works with buffers but could accumulate at lest [accumulate] elements from single input before processing. - * - * This class combines functions from [ChunkProcessor] and single buffer processor - */ -class AccumulatingBufferProcessor( - scope: CoroutineScope, - val accumulate: Int, - val process: suspend (Buffer) -> Buffer -) : - AbstractBufferProcessor(scope) { - - private val inputChannel = Channel>() - private val outputChannel = inputChannel.map { process(it) } - - override suspend fun receive(): R { - TODO("not implemented") //To change body of created functions use File | Settings | File Templates. - } + override suspend fun receive(): Buffer = output.receive() override suspend fun send(value: T) { - TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + input.send(value) + } +} + +/** + * A processor that splits incoming buffers into individual elements + */ +class SplitProcessor(scope: CoroutineScope) : AbstractProcessor, T>(scope) { + + private val input = Channel>() + + private val mutex = Mutex() + + private var currentBuffer: Buffer? = null + + private var pos = 0 + + + override suspend fun receive(): T { + mutex.withLock { + while (currentBuffer == null || pos == currentBuffer!!.size) { + currentBuffer = input.receive() + pos = 0 + } + return currentBuffer!![pos].also { pos++ } + } } - override suspend fun receiveBuffer(): Buffer = outputChannel.receive() - - override suspend fun sendBuffer(buffer: Buffer) { - inputChannel.send(buffer) + override suspend fun send(value: Buffer) { + input.send(value) } +} + +fun Producer.chunked(chunkSize: Int, bufferFactory: BufferFactory) = + JoinProcessor(this, chunkSize, bufferFactory).also { connect(it) } + +inline fun Producer.chunked(chunkSize: Int) = + JoinProcessor(this, chunkSize, Buffer.Companion::auto).also { connect(it) } + + -} \ No newline at end of file diff --git a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Streaming.kt b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Streaming.kt index cc4a68761..c0332d639 100644 --- a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Streaming.kt +++ b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Streaming.kt @@ -170,33 +170,6 @@ class PipeProcessor( } } -/** - * A [Processor] that splits the input in fixed chunked size and transforms each chunked - */ -class ChunkProcessor( - scope: CoroutineScope, - chunkSize: Int, - process: suspend (List) -> R -) : AbstractProcessor(scope) { - - private val input = Channel(chunkSize) - - private val chunked = produce>(coroutineContext) { - val list = ArrayList(chunkSize) - repeat(chunkSize) { - list.add(input.receive()) - } - send(list) - } - - private val output: ReceiveChannel = chunked.map(coroutineContext, process) - - override suspend fun receive(): R = output.receive() - - override suspend fun send(value: T) { - input.send(value) - } -} /** * A moving window [Processor] with circular buffer @@ -276,6 +249,9 @@ fun ReceiveChannel.produce(scope: CoroutineScope = GlobalScope) = fun > Producer.consumer(consumerFactory: () -> C): C = consumerFactory().also { connect(it) } +fun Producer.map(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) = + PipeProcessor(this, capacity, process).also { connect(it) } + /** * Create a reducer and connect this producer to reducer */ @@ -294,7 +270,6 @@ fun > Producer.process(processorBuilder: () -> P): fun Producer.process(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) = PipeProcessor(this, capacity, process).also { connect(it) } -fun Producer.chunked(chunkSize: Int, process: suspend (List) -> R) = - ChunkProcessor(this, chunkSize, process).also { connect(it) } -fun Producer.chunked(chunkSize: Int) = chunked(chunkSize) { it } +fun Producer.windowed(window: Int, process: suspend (Buffer) -> R) = + WindowedProcessor(this, window, process).also { connect(it) } \ No newline at end of file