diff --git a/kmath-commons/src/main/kotlin/scientifik/kmath/transform/Transformations.kt b/kmath-commons/src/main/kotlin/scientifik/kmath/transform/Transformations.kt index 89ecf5f9f..113b8ea75 100644 --- a/kmath-commons/src/main/kotlin/scientifik/kmath/transform/Transformations.kt +++ b/kmath-commons/src/main/kotlin/scientifik/kmath/transform/Transformations.kt @@ -1,10 +1,10 @@ package scientifik.kmath.transform +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map import org.apache.commons.math3.transform.* import scientifik.kmath.operations.Complex -import scientifik.kmath.streaming.Processor -import scientifik.kmath.streaming.Producer -import scientifik.kmath.streaming.map import scientifik.kmath.structures.* @@ -68,19 +68,21 @@ object Transformations { /** * Process given [Producer] with commons-math fft transformation */ -fun Producer>.FFT( +@FlowPreview +fun Flow>.FFT( normalization: DftNormalization = DftNormalization.STANDARD, direction: TransformType = TransformType.FORWARD -): Processor, Buffer> { +): Flow> { val transform = Transformations.fourier(normalization, direction) return map { transform(it) } } +@FlowPreview @JvmName("realFFT") -fun Producer>.FFT( +fun Flow>.FFT( normalization: DftNormalization = DftNormalization.STANDARD, direction: TransformType = TransformType.FORWARD -): Processor, Buffer> { +): Flow> { val transform = Transformations.realFourier(normalization, direction) return map { transform(it) } } \ No newline at end of file diff --git a/kmath-core/src/commonMain/kotlin/scientifik/kmath/structures/Buffers.kt b/kmath-core/src/commonMain/kotlin/scientifik/kmath/structures/Buffers.kt index c8d091017..4b127a5b4 100644 --- a/kmath-core/src/commonMain/kotlin/scientifik/kmath/structures/Buffers.kt +++ b/kmath-core/src/commonMain/kotlin/scientifik/kmath/structures/Buffers.kt @@ -249,7 +249,10 @@ inline class ReadOnlyBuffer(val buffer: MutableBuffer) : Buffer { * Useful when one needs single element from the buffer. */ class VirtualBuffer(override val size: Int, private val generator: (Int) -> T) : Buffer { - override fun get(index: Int): T = generator(index) + override fun get(index: Int): T { + if (index < 0 || index >= size) throw IndexOutOfBoundsException("Expected index from 0 to ${size - 1}, but found $index") + return generator(index) + } override fun iterator(): Iterator = (0 until size).asSequence().map(generator).iterator() diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/CoroutinesExtra.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/CoroutinesExtra.kt index 8a48893eb..db44f878b 100644 --- a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/CoroutinesExtra.kt +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/CoroutinesExtra.kt @@ -13,12 +13,12 @@ val Dispatchers.Math: CoroutineDispatcher get() = Dispatchers.Default /** * An imitator of [Deferred] which holds a suspended function block and dispatcher */ -class LazyDeferred(val dispatcher: CoroutineDispatcher, val block: suspend CoroutineScope.() -> T) { +internal class LazyDeferred(val dispatcher: CoroutineDispatcher, val block: suspend CoroutineScope.() -> T) { private var deferred: Deferred? = null - fun CoroutineScope.start() { + internal fun start(scope: CoroutineScope) { if(deferred==null) { - deferred = async(dispatcher, block = block) + deferred = scope.async(dispatcher, block = block) } } @@ -26,7 +26,7 @@ class LazyDeferred(val dispatcher: CoroutineDispatcher, val block: suspend Co } @FlowPreview -inline class AsyncFlow(val deferredFlow: Flow>) : Flow { +class AsyncFlow internal constructor(internal val deferredFlow: Flow>) : Flow { override suspend fun collect(collector: FlowCollector) { deferredFlow.collect { collector.emit((it.await())) @@ -46,23 +46,23 @@ fun Flow.async( } @FlowPreview -fun AsyncFlow.map(action: (T) -> R) = deferredFlow.map { input -> - //TODO add actual composition +fun AsyncFlow.map(action: (T) -> R) = AsyncFlow(deferredFlow.map { input -> + //TODO add function composition LazyDeferred(input.dispatcher) { - input.run { start() } + input.start(this) action(input.await()) } -} +}) @ExperimentalCoroutinesApi @FlowPreview suspend fun AsyncFlow.collect(concurrency: Int, collector: FlowCollector) { - require(concurrency >= 0) { "Buffer size should be positive, but was $concurrency" } + require(concurrency >= 1) { "Buffer size should be more than 1, but was $concurrency" } coroutineScope { //Starting up to N deferred coroutines ahead of time - val channel = produce(capacity = concurrency) { + val channel = produce(capacity = concurrency-1) { deferredFlow.collect { value -> - value.run { start() } + value.start(this@coroutineScope) send(value) } } @@ -91,3 +91,31 @@ suspend fun AsyncFlow.collect(concurrency: Int, action: suspend (value: T }) } +//suspend fun Flow.collect(concurrency: Int, dispatcher: CoroutineDispatcher, collector: FlowCollector){ +// require(concurrency >= 1) { "Buffer size should be more than 1, but was $concurrency" } +// coroutineScope { +// //Starting up to N deferred coroutines ahead of time +// val channel = produce(capacity = concurrency-1) { +// this@collect. +// deferredFlow.collect { value -> +// value.start(this@coroutineScope) +// send(value) +// } +// } +// +// (channel as Job).invokeOnCompletion { +// if (it is CancellationException && it.cause == null) cancel() +// } +// +// for (element in channel) { +// collector.emit(element.await()) +// } +// +// val producer = channel as Job +// if (producer.isCancelled) { +// producer.join() +// //throw producer.getCancellationException() +// } +// } +//} + diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt index 67b74d7de..aa0aca460 100644 --- a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt @@ -1,13 +1,10 @@ package scientifik.kmath.streaming -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.coroutines.channels.produce +import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.flow.* import scientifik.kmath.structures.Buffer import scientifik.kmath.structures.BufferFactory import scientifik.kmath.structures.DoubleBuffer -import kotlin.coroutines.coroutineContext /** * Create a [Flow] from buffer @@ -59,4 +56,18 @@ fun Flow.chunked(bufferSize: Int) = flow { emit(buffer) } } +} + +/** + * Map a flow to a moving window buffer. The window step is one. + * In order to get different steps, one could use skip operation. + */ +@FlowPreview +fun Flow.windowed(window: Int): Flow> = flow { + require(window > 1) { "Window size must be more than one" } + val ringBuffer = RingBuffer.boxing(window) + this@windowed.collect { element -> + ringBuffer.push(element) + emit(ringBuffer.snapshot()) + } } \ No newline at end of file diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferStreaming.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferStreaming.kt deleted file mode 100644 index 023199452..000000000 --- a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferStreaming.kt +++ /dev/null @@ -1,82 +0,0 @@ -package scientifik.kmath.streaming - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.produce -import kotlinx.coroutines.isActive -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import scientifik.kmath.structures.Buffer -import scientifik.kmath.structures.BufferFactory - -/** - * A processor that collects incoming elements into fixed size buffers - */ -@ExperimentalCoroutinesApi -class JoinProcessor( - scope: CoroutineScope, - bufferSize: Int, - bufferFactory: BufferFactory = Buffer.Companion::boxing -) : AbstractProcessor>(scope) { - - private val input = Channel(bufferSize) - - private val output = produce(coroutineContext) { - val list = ArrayList(bufferSize) - while (isActive) { - list.clear() - repeat(bufferSize) { - list.add(input.receive()) - } - val buffer = bufferFactory(bufferSize) { list[it] } - send(buffer) - } - } - - override suspend fun receive(): Buffer = output.receive() - - override suspend fun send(value: T) { - input.send(value) - } -} - -/** - * A processor that splits incoming buffers into individual elements - */ -class SplitProcessor(scope: CoroutineScope) : AbstractProcessor, T>(scope) { - - private val input = Channel>() - - private val mutex = Mutex() - - private var currentBuffer: Buffer? = null - - private var pos = 0 - - - override suspend fun receive(): T { - mutex.withLock { - while (currentBuffer == null || pos == currentBuffer!!.size) { - currentBuffer = input.receive() - pos = 0 - } - return currentBuffer!![pos].also { pos++ } - } - } - - override suspend fun send(value: Buffer) { - input.send(value) - } -} - -@ExperimentalCoroutinesApi -fun Producer.chunked(chunkSize: Int, bufferFactory: BufferFactory) = - JoinProcessor(this, chunkSize, bufferFactory).also { connect(it) } - -@ExperimentalCoroutinesApi -inline fun Producer.chunked(chunkSize: Int) = - JoinProcessor(this, chunkSize, Buffer.Companion::auto).also { connect(it) } - - - diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/RingBuffer.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/RingBuffer.kt index ac64f921f..6b99e34ff 100644 --- a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/RingBuffer.kt +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/RingBuffer.kt @@ -5,12 +5,14 @@ import kotlinx.coroutines.sync.withLock import scientifik.kmath.structures.Buffer import scientifik.kmath.structures.MutableBuffer import scientifik.kmath.structures.VirtualBuffer +import kotlin.reflect.KClass /** * Thread-safe ring buffer */ +@Suppress("UNCHECKED_CAST") internal class RingBuffer( - private val buffer: MutableBuffer, + private val buffer: MutableBuffer, private var startIndex: Int = 0, size: Int = 0 ) : Buffer { @@ -23,7 +25,7 @@ internal class RingBuffer( override fun get(index: Int): T { require(index >= 0) { "Index must be positive" } require(index < size) { "Index $index is out of circular buffer size $size" } - return buffer[startIndex.forward(index)] + return buffer[startIndex.forward(index)] as T } fun isFull() = size == buffer.size @@ -40,7 +42,7 @@ internal class RingBuffer( if (count == 0) { done() } else { - setNext(copy[index]) + setNext(copy[index] as T) index = index.forward(1) count-- } @@ -53,7 +55,9 @@ internal class RingBuffer( suspend fun snapshot(): Buffer { mutex.withLock { val copy = buffer.copy() - return VirtualBuffer(size) { i -> copy[startIndex.forward(i)] } + return VirtualBuffer(size) { i -> + copy[startIndex.forward(i)] as T + } } } @@ -74,14 +78,14 @@ internal class RingBuffer( companion object { inline fun build(size: Int, empty: T): RingBuffer { - val buffer = MutableBuffer.auto(size) { empty } + val buffer = MutableBuffer.auto(size) { empty } as MutableBuffer return RingBuffer(buffer) } /** * Slow yet universal buffer */ - fun boxing(size: Int): RingBuffer { + fun boxing(size: Int): RingBuffer { val buffer: MutableBuffer = MutableBuffer.boxing(size) { null } return RingBuffer(buffer) } diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/Streaming.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/Streaming.kt deleted file mode 100644 index 0382b6a89..000000000 --- a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/Streaming.kt +++ /dev/null @@ -1,273 +0,0 @@ -package scientifik.kmath.streaming - -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import scientifik.kmath.structures.Buffer -import kotlin.coroutines.CoroutineContext - -/** - * Initial chain block. Could produce an element sequence and be connected to single [Consumer] - * - * The general rule is that channel is created on first call. Also each element is responsible for its connection so - * while the connections are symmetric, the scope, used for making the connection is responsible for cancelation. - * - * Also connections are not reversible. Once connected block stays faithful until it finishes processing. - * Manually putting elements to connected block could lead to undetermined behavior and must be avoided. - */ -interface Producer : CoroutineScope { - fun connect(consumer: Consumer) - - suspend fun receive(): T - - val consumer: Consumer? - - val outputIsConnected: Boolean get() = consumer != null - - //fun close() -} - -/** - * Terminal chain block. Could consume an element sequence and be connected to signle [Producer] - */ -interface Consumer : CoroutineScope { - fun connect(producer: Producer) - - suspend fun send(value: T) - - val producer: Producer? - - val inputIsConnected: Boolean get() = producer != null - - //fun close() -} - -interface Processor : Consumer, Producer - -abstract class AbstractProducer(scope: CoroutineScope) : Producer { - override val coroutineContext: CoroutineContext = scope.coroutineContext - - override var consumer: Consumer? = null - protected set - - override fun connect(consumer: Consumer) { - //Ignore if already connected to specific consumer - if (consumer != this.consumer) { - if (outputIsConnected) error("The output slot of producer is occupied") - if (consumer.inputIsConnected) error("The input slot of consumer is occupied") - this.consumer = consumer - if (consumer.producer != null) { - //No need to save the job, it will be canceled on scope cancel - connectOutput(consumer) - // connect back, consumer is already set so no circular reference - consumer.connect(this) - } else error("Unreachable statement") - } - } - - protected open fun connectOutput(consumer: Consumer) { - launch { - while (this.isActive) { - consumer.send(receive()) - } - } - } -} - -abstract class AbstractConsumer(scope: CoroutineScope) : Consumer { - override val coroutineContext: CoroutineContext = scope.coroutineContext - - override var producer: Producer? = null - protected set - - override fun connect(producer: Producer) { - //Ignore if already connected to specific consumer - if (producer != this.producer) { - if (inputIsConnected) error("The input slot of consumer is occupied") - if (producer.outputIsConnected) error("The input slot of producer is occupied") - this.producer = producer - //No need to save the job, it will be canceled on scope cancel - if (producer.consumer != null) { - connectInput(producer) - // connect back - producer.connect(this) - } else error("Unreachable statement") - } - } - - protected open fun connectInput(producer: Producer) { - launch { - while (isActive) { - send(producer.receive()) - } - } - } -} - -abstract class AbstractProcessor(scope: CoroutineScope) : Processor, AbstractProducer(scope) { - - override var producer: Producer? = null - protected set - - override fun connect(producer: Producer) { - //Ignore if already connected to specific consumer - if (producer != this.producer) { - if (inputIsConnected) error("The input slot of consumer is occupied") - if (producer.outputIsConnected) error("The input slot of producer is occupied") - this.producer = producer - //No need to save the job, it will be canceled on scope cancel - if (producer.consumer != null) { - connectInput(producer) - // connect back - producer.connect(this) - } else error("Unreachable statement") - } - } - - protected open fun connectInput(producer: Producer) { - launch { - while (isActive) { - send(producer.receive()) - } - } - } -} - -/** - * A simple [produce]-based producer - */ -@ExperimentalCoroutinesApi -class GenericProducer( - scope: CoroutineScope, - capacity: Int = Channel.UNLIMITED, - block: suspend ProducerScope.() -> Unit -) : AbstractProducer(scope) { - - private val channel: ReceiveChannel by lazy { produce(capacity = capacity, block = block) } - - override suspend fun receive(): T = channel.receive() -} - -/** - * A simple pipeline [Processor] block - */ -class PipeProcessor( - scope: CoroutineScope, - capacity: Int = Channel.RENDEZVOUS, - process: suspend (T) -> R -) : AbstractProcessor(scope) { - - private val input = Channel(capacity) - private val output: ReceiveChannel = input.map(coroutineContext, process) - - override suspend fun receive(): R = output.receive() - - override suspend fun send(value: T) { - input.send(value) - } -} - - -/** - * A moving window [Processor] with circular buffer - */ -class WindowedProcessor( - scope: CoroutineScope, - window: Int, - val process: suspend (Buffer) -> R -) : AbstractProcessor(scope) { - - private val ringBuffer = RingBuffer.boxing(window) - - private val channel = Channel(Channel.RENDEZVOUS) - - override suspend fun receive(): R { - return channel.receive() - } - - override suspend fun send(value: T) { - ringBuffer.push(value) - channel.send(process(ringBuffer.snapshot())) - } -} - -/** - * Thread-safe aggregator of values from input. The aggregator does not store all incoming values, it uses fold procedure - * to incorporate them into state on-arrival. - * The current aggregated state could be accessed by [state]. The input channel is inactive unless requested - * @param T - the type of the input element - * @param S - the type of the aggregator - */ -class Reducer( - scope: CoroutineScope, - initialState: S, - val fold: suspend (S, T) -> S -) : AbstractConsumer(scope) { - - var state: S = initialState - private set - - private val mutex = Mutex() - - override suspend fun send(value: T) = mutex.withLock { - state = fold(state, value) - } -} - -/** - * Collector that accumulates all values in a list. List could be accessed from non-suspending environment via [list] value. - */ -class Collector(scope: CoroutineScope) : AbstractConsumer(scope) { - - private val _list = ArrayList() - private val mutex = Mutex() - val list: List get() = _list - - override suspend fun send(value: T) { - mutex.withLock { - _list.add(value) - } - } -} - -/** - * Convert a sequence to [Producer] - */ -fun Sequence.produce(scope: CoroutineScope = GlobalScope) = - GenericProducer(scope) { forEach { send(it) } } - -/** - * Convert a [ReceiveChannel] to [Producer] - */ -fun ReceiveChannel.produce(scope: CoroutineScope = GlobalScope) = - GenericProducer(scope) { for (e in this@produce) send(e) } - - -fun > Producer.consumer(consumerFactory: () -> C): C = - consumerFactory().also { connect(it) } - -fun Producer.map(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) = - PipeProcessor(this, capacity, process).also { connect(it) } - -/** - * Create a reducer and connect this producer to reducer - */ -fun Producer.reduce(initialState: S, fold: suspend (S, T) -> S) = - Reducer(this, initialState, fold).also { connect(it) } - -/** - * Create a [Collector] and attach it to this [Producer] - */ -fun Producer.collect() = - Collector(this).also { connect(it) } - -fun > Producer.process(processorBuilder: () -> P): P = - processorBuilder().also { connect(it) } - -fun Producer.process(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) = - PipeProcessor(this, capacity, process).also { connect(it) } - - -fun Producer.windowed(window: Int, process: suspend (Buffer) -> R) = - WindowedProcessor(this, window, process).also { connect(it) } \ No newline at end of file diff --git a/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/RingBufferTest.kt b/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/RingBufferTest.kt index 068859eb5..25af1f589 100644 --- a/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/RingBufferTest.kt +++ b/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/RingBufferTest.kt @@ -1,5 +1,6 @@ package scientifik.kmath.streaming +import kotlinx.coroutines.flow.* import kotlinx.coroutines.runBlocking import org.junit.Test import scientifik.kmath.structures.asSequence @@ -7,7 +8,7 @@ import kotlin.test.assertEquals class RingBufferTest { @Test - fun testPush() { + fun push() { val buffer = RingBuffer.build(20, Double.NaN) runBlocking { for (i in 1..30) { @@ -16,4 +17,22 @@ class RingBufferTest { assertEquals(410.0, buffer.asSequence().sum()) } } + + @Test + fun windowed(){ + val flow = flow{ + var i = 0 + while(true){ + emit(i++) + } + } + val windowed = flow.windowed(10) + runBlocking { + val first = windowed.take(1).single() + val res = windowed.take(15).map { it -> it.asSequence().average() }.toList() + assertEquals(0.0, res[0]) + assertEquals(4.5, res[9]) + assertEquals(9.5, res[14]) + } + } } \ No newline at end of file