diff --git a/kmath-core/src/commonMain/kotlin/scientifik/kmath/linear/Matrix.kt b/kmath-core/src/commonMain/kotlin/scientifik/kmath/linear/Matrix.kt index 646cbb5c1..92acf2bbb 100644 --- a/kmath-core/src/commonMain/kotlin/scientifik/kmath/linear/Matrix.kt +++ b/kmath-core/src/commonMain/kotlin/scientifik/kmath/linear/Matrix.kt @@ -2,6 +2,7 @@ package scientifik.kmath.linear import scientifik.kmath.operations.RealField import scientifik.kmath.operations.Ring +import scientifik.kmath.operations.sum import scientifik.kmath.structures.* import scientifik.kmath.structures.Buffer.Companion.DoubleBufferFactory import scientifik.kmath.structures.Buffer.Companion.boxing @@ -69,7 +70,7 @@ interface GenericMatrixContext> : MatrixContext { val row = rows[i] val column = other.columns[j] with(elementContext) { - row.asSequence().zip(column.asSequence(), ::multiply).sum() + sum(row.asSequence().zip(column.asSequence(), ::multiply)) } } } @@ -80,7 +81,7 @@ interface GenericMatrixContext> : MatrixContext { return point(rowNum) { i -> val row = rows[i] with(elementContext) { - row.asSequence().zip(vector.asSequence(), ::multiply).sum() + sum(row.asSequence().zip(vector.asSequence(), ::multiply)) } } } diff --git a/kmath-core/src/commonMain/kotlin/scientifik/kmath/structures/Buffers.kt b/kmath-core/src/commonMain/kotlin/scientifik/kmath/structures/Buffers.kt index 1d6eecf52..8256b596d 100644 --- a/kmath-core/src/commonMain/kotlin/scientifik/kmath/structures/Buffers.kt +++ b/kmath-core/src/commonMain/kotlin/scientifik/kmath/structures/Buffers.kt @@ -77,7 +77,7 @@ interface MutableBuffer : Buffer { /** * Create a boxing mutable buffer of given type */ - inline fun boxing(size: Int, initializer: (Int) -> T): MutableBuffer = + inline fun boxing(size: Int, initializer: (Int) -> T): MutableBuffer = MutableListBuffer(MutableList(size, initializer)) /** diff --git a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Cumulative.kt b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Cumulative.kt index f02c3ad64..b0e1e9ac5 100644 --- a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Cumulative.kt +++ b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Cumulative.kt @@ -1,5 +1,6 @@ package scientifik.kmath.sequential +import scientifik.kmath.operations.Space import kotlin.jvm.JvmName @@ -32,6 +33,10 @@ fun List.cumulative(initial: R, operation: (T, R) -> R): List = //Cumulative sum +fun Iterable.cumulativeSum(space: Space) = with(space) { + cumulative(zero) { element: T, sum: T -> sum + element } +} + @JvmName("cumulativeSumOfDouble") fun Iterable.cumulativeSum() = this.cumulative(0.0) { element, sum -> sum + element } @@ -41,6 +46,10 @@ fun Iterable.cumulativeSum() = this.cumulative(0) { element, sum -> sum + e @JvmName("cumulativeSumOfLong") fun Iterable.cumulativeSum() = this.cumulative(0L) { element, sum -> sum + element } +fun Sequence.cumulativeSum(space: Space) = with(space) { + cumulative(zero) { element: T, sum: T -> sum + element } +} + @JvmName("cumulativeSumOfDouble") fun Sequence.cumulativeSum() = this.cumulative(0.0) { element, sum -> sum + element } @@ -50,6 +59,10 @@ fun Sequence.cumulativeSum() = this.cumulative(0) { element, sum -> sum + e @JvmName("cumulativeSumOfLong") fun Sequence.cumulativeSum() = this.cumulative(0L) { element, sum -> sum + element } +fun List.cumulativeSum(space: Space) = with(space) { + cumulative(zero) { element: T, sum: T -> sum + element } +} + @JvmName("cumulativeSumOfDouble") fun List.cumulativeSum() = this.cumulative(0.0) { element, sum -> sum + element } diff --git a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/DoubleStreaming.kt b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/DoubleStreaming.kt new file mode 100644 index 000000000..d18c76634 --- /dev/null +++ b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/DoubleStreaming.kt @@ -0,0 +1,136 @@ +package scientifik.kmath.sequential + +import kotlinx.atomicfu.atomic +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex + +interface DoubleProducer : Producer { + suspend fun receiveArray(): DoubleArray +} + +interface DoubleConsumer : Consumer { + suspend fun sendArray(array: DoubleArray) +} + +abstract class AbstractDoubleProducer(scope: CoroutineScope) : AbstractProducer(scope), DoubleProducer { + + override fun connectOutput(consumer: Consumer) { + if (consumer is DoubleConsumer) { + launch { + while (this.isActive) { + consumer.sendArray(receiveArray()) + } + } + } else { + super.connectOutput(consumer) + } + } +} + +abstract class AbstractDoubleConsumer(scope: CoroutineScope) : AbstractConsumer(scope), DoubleConsumer { + override fun connectInput(producer: Producer) { + if (producer is DoubleProducer) { + launch { + while (isActive) { + sendArray(producer.receiveArray()) + } + } + } else { + super.connectInput(producer) + } + } +} + +abstract class AbstractDoubleProcessor(scope: CoroutineScope) : AbstractProcessor(scope), + DoubleProducer, DoubleConsumer { + + override fun connectOutput(consumer: Consumer) { + if (consumer is DoubleConsumer) { + launch { + while (this.isActive) { + consumer.sendArray(receiveArray()) + } + } + } else { + super.connectOutput(consumer) + } + } + + override fun connectInput(producer: Producer) { + if (producer is DoubleProducer) { + launch { + while (isActive) { + sendArray(producer.receiveArray()) + } + } + } else { + super.connectInput(producer) + } + } +} + +/** + * The basic [Double] producer supporting both arrays and element-by-element simultaneously + */ +class BasicDoubleProducer( + scope: CoroutineScope, + capacity: Int = Channel.UNLIMITED, + block: suspend ProducerScope.() -> Unit +) : AbstractDoubleProducer(scope) { + + + private val currentArray = atomic?>(null) + private val channel: ReceiveChannel by lazy { produce(capacity = capacity, block = block) } + private val cachingChannel by lazy { + channel.map { + it.also { doubles -> currentArray.lazySet(doubles.asChannel()) } + } + } + + private fun DoubleArray.asChannel() = produce { + for (value in this@asChannel) { + send(value) + } + } + + override suspend fun receiveArray(): DoubleArray = cachingChannel.receive() + + override suspend fun receive(): Double = (currentArray.value ?: cachingChannel.receive().asChannel()).receive() +} + + +class DoubleReducer( + scope: CoroutineScope, + initialState: S, + val fold: suspend (S, DoubleArray) -> S +) : AbstractDoubleConsumer(scope) { + + var state: S = initialState + private set + + private val mutex = Mutex() + + override suspend fun sendArray(array: DoubleArray) { + state = fold(state, array) + } + + override suspend fun send(value: Double) = sendArray(doubleArrayOf(value)) +} + +/** + * Convert an array to single element producer, splitting it in chunks if necessary + */ +fun DoubleArray.produce(scope: CoroutineScope = GlobalScope, chunkSize: Int = Int.MAX_VALUE) = if (size < chunkSize) { + BasicDoubleProducer(scope) { send(this@produce) } +} else { + BasicDoubleProducer(scope) { + //TODO optimize this! + asSequence().chunked(chunkSize).forEach { + send(it.toDoubleArray()) + } + } +} \ No newline at end of file diff --git a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/RingBuffer.kt b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/RingBuffer.kt new file mode 100644 index 000000000..108b7f828 --- /dev/null +++ b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/RingBuffer.kt @@ -0,0 +1,89 @@ +package scientifik.kmath.sequential + +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import scientifik.kmath.structures.Buffer +import scientifik.kmath.structures.MutableBuffer +import scientifik.kmath.structures.VirtualBuffer + +/** + * Thread-safe ring buffer + */ +internal class RingBuffer( + private val buffer: MutableBuffer, + private var startIndex: Int = 0, + size: Int = 0 +) : Buffer { + + private val mutex = Mutex() + + override var size: Int = size + private set + + override fun get(index: Int): T { + require(index >= 0) { "Index must be positive" } + require(index < size) { "Index $index is out of circular buffer size $size" } + return buffer[startIndex.forward(index)] + } + + fun isFull() = size == buffer.size + + /** + * Iterator could provide wrong results if buffer is changed in initialization (iteration is safe) + */ + override fun iterator(): Iterator = object : AbstractIterator() { + private var count = size + private var index = startIndex + val copy = buffer.copy() + + override fun computeNext() { + if (count == 0) { + done() + } else { + setNext(copy[index]) + index = index.forward(1) + count-- + } + } + } + + /** + * A safe snapshot operation + */ + suspend fun snapshot(): Buffer { + mutex.withLock { + val copy = buffer.copy() + return VirtualBuffer(size) { i -> copy[startIndex.forward(i)] } + } + } + + suspend fun push(element: T) { + mutex.withLock { + buffer[startIndex.forward(size)] = element + if (isFull()) { + startIndex++ + } else { + size++ + } + } + } + + + @Suppress("NOTHING_TO_INLINE") + private inline fun Int.forward(n: Int): Int = (this + n) % (buffer.size) + + companion object { + inline fun build(size: Int, empty: T): RingBuffer { + val buffer = MutableBuffer.auto(size) { empty } + return RingBuffer(buffer) + } + + /** + * Slow yet universal buffer + */ + fun boxing(size: Int): RingBuffer { + val buffer: MutableBuffer = MutableBuffer.boxing(size) { null } + return RingBuffer(buffer) + } + } +} \ No newline at end of file diff --git a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/SpecializedStreamingBlocks.kt b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/SpecializedStreamingBlocks.kt deleted file mode 100644 index a4f26113f..000000000 --- a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/SpecializedStreamingBlocks.kt +++ /dev/null @@ -1,79 +0,0 @@ -package scientifik.kmath.sequential - -import kotlinx.atomicfu.atomic -import kotlinx.atomicfu.update -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.channels.* -import kotlinx.coroutines.launch - -interface DoubleProducer : Producer { - suspend fun receiveArray(): DoubleArray -} - -interface DoubleConsumer : Consumer { - suspend fun sendArray(): DoubleArray -} - - -abstract class AbstractDoubleProducer(scope: CoroutineScope) : AbstractProducer(scope), DoubleProducer { - override suspend fun connectOutput(consumer: Consumer) { - if (consumer is DoubleConsumer) { - arrayOutput.toChannel(consumer.arrayInput) - } else { - connectOutput(super, consumer) - } - } -} - -abstract class AbstractDoubleConsumer(scope: CoroutineScope) : AbstractConsumer(scope), DoubleConsumer { - override suspend fun connectInput(producer: Producer) { - if (producer is DoubleProducer) { - producer.arrayOutput.toChannel(arrayInput) - } else { - super.connectInput(producer) - } - } -} - -abstract class AbstractDoubleProcessor(scope: CoroutineScope) : AbstractProcessor(scope), - DoubleProducer, DoubleConsumer { - - override suspend fun connectOutput(consumer: Consumer) { - if (consumer is DoubleConsumer) { - arrayOutput.toChannel(consumer.arrayInput) - } else { - connectOutput(super, consumer) - } - } - - override suspend fun connectInput(producer: Producer) { - if (producer is DoubleProducer) { - producer.arrayOutput.toChannel(arrayInput) - } else { - super.connectInput(producer) - } - } -} - -class DoubleReducer( - scope: CoroutineScope, - initialState: S, - fold: suspend (S, DoubleArray) -> S -) : AbstractDoubleConsumer(scope) { - private val state = atomic(initialState) - - val value: S = state.value - - override val arrayInput: SendChannel by lazy { - //create a channel and start process of reading all elements into aggregator - Channel(capacity = Channel.RENDEZVOUS).also { - launch { - it.consumeEach { value -> state.update { fold(it, value) } } - } - } - } - - override val input: SendChannel = object :Abstr - - -} \ No newline at end of file diff --git a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/StreamingBlocks.kt b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Streaming.kt similarity index 83% rename from kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/StreamingBlocks.kt rename to kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Streaming.kt index 2cc412a9a..cc4a68761 100644 --- a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/StreamingBlocks.kt +++ b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Streaming.kt @@ -1,7 +1,5 @@ package scientifik.kmath.sequential -import kotlinx.atomicfu.atomic -import kotlinx.atomicfu.update import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.* @@ -9,6 +7,7 @@ import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +import scientifik.kmath.structures.Buffer import kotlin.coroutines.CoroutineContext /** @@ -172,7 +171,7 @@ class PipeProcessor( } /** - * A [Processor] that splits the input in fixed chunk size and transforms each chunk + * A [Processor] that splits the input in fixed chunked size and transforms each chunked */ class ChunkProcessor( scope: CoroutineScope, @@ -205,47 +204,44 @@ class ChunkProcessor( class WindowedProcessor( scope: CoroutineScope, window: Int, - process: suspend (List) -> R + val process: suspend (Buffer) -> R ) : AbstractProcessor(scope) { + private val ringBuffer = RingBuffer.boxing(window) + + private val channel = Channel(Channel.RENDEZVOUS) + override suspend fun receive(): R { - TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + return channel.receive() } override suspend fun send(value: T) { - TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + ringBuffer.push(value) + channel.send(process(ringBuffer.snapshot())) } - - } /** * Thread-safe aggregator of values from input. The aggregator does not store all incoming values, it uses fold procedure * to incorporate them into state on-arrival. - * The current aggregated state could be accessed by [value]. The input channel is inactive unless requested + * The current aggregated state could be accessed by [state]. The input channel is inactive unless requested * @param T - the type of the input element * @param S - the type of the aggregator */ class Reducer( scope: CoroutineScope, initialState: S, - fold: suspend (S, T) -> S + val fold: suspend (S, T) -> S ) : AbstractConsumer(scope) { - private val state = atomic(initialState) + var state: S = initialState + private set - val value: S = state.value + private val mutex = Mutex() - private val input: SendChannel by lazy { - //create a channel and start process of reading all elements into aggregator - Channel(capacity = Channel.RENDEZVOUS).also { - launch { - it.consumeEach { value -> state.update { fold(it, value) } } - } - } + override suspend fun send(value: T) = mutex.withLock { + state = fold(state, value) } - - override suspend fun send(value: T) = input.send(value) } /** @@ -257,20 +253,11 @@ class Collector(scope: CoroutineScope) : AbstractConsumer(scope) { private val mutex = Mutex() val list: List get() = _list - private val input: SendChannel by lazy { - //create a channel and start process of reading all elements into aggregator - Channel(capacity = Channel.RENDEZVOUS).also { - launch { - it.consumeEach { value -> - mutex.withLock { - _list.add(value) - } - } - } + override suspend fun send(value: T) { + mutex.withLock { + _list.add(value) } } - - override suspend fun send(value: T) = input.send(value) } /** @@ -285,6 +272,10 @@ fun Sequence.produce(scope: CoroutineScope = GlobalScope) = fun ReceiveChannel.produce(scope: CoroutineScope = GlobalScope) = GenericProducer(scope) { for (e in this@produce) send(e) } + +fun > Producer.consumer(consumerFactory: () -> C): C = + consumerFactory().also { connect(it) } + /** * Create a reducer and connect this producer to reducer */ @@ -297,8 +288,13 @@ fun Producer.reduce(initialState: S, fold: suspend (S, T) -> S) = fun Producer.collect() = Collector(this).also { connect(it) } -fun Producer.process(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) = - PipeProcessor(this, capacity, process) +fun > Producer.process(processorBuilder: () -> P): P = + processorBuilder().also { connect(it) } -fun Producer.chunk(chunkSize: Int, process: suspend (List) -> R) = - ChunkProcessor(this, chunkSize, process) \ No newline at end of file +fun Producer.process(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) = + PipeProcessor(this, capacity, process).also { connect(it) } + +fun Producer.chunked(chunkSize: Int, process: suspend (List) -> R) = + ChunkProcessor(this, chunkSize, process).also { connect(it) } + +fun Producer.chunked(chunkSize: Int) = chunked(chunkSize) { it } diff --git a/kmath-sequential/src/commonTest/scientifik/kmath/sequential/CumulativeKtTest.kt b/kmath-sequential/src/commonTest/kotlin/scientifik/kmath/sequential/CumulativeKtTest.kt similarity index 86% rename from kmath-sequential/src/commonTest/scientifik/kmath/sequential/CumulativeKtTest.kt rename to kmath-sequential/src/commonTest/kotlin/scientifik/kmath/sequential/CumulativeKtTest.kt index e7c99e7d0..cafa0526f 100644 --- a/kmath-sequential/src/commonTest/scientifik/kmath/sequential/CumulativeKtTest.kt +++ b/kmath-sequential/src/commonTest/kotlin/scientifik/kmath/sequential/CumulativeKtTest.kt @@ -1,5 +1,6 @@ package scientifik.kmath.misc +import scientifik.kmath.sequential.cumulativeSum import kotlin.test.Test import kotlin.test.assertEquals diff --git a/kmath-sequential/src/commonTest/kotlin/scientifik/kmath/sequential/RingBufferTest.kt b/kmath-sequential/src/commonTest/kotlin/scientifik/kmath/sequential/RingBufferTest.kt new file mode 100644 index 000000000..e2bb18280 --- /dev/null +++ b/kmath-sequential/src/commonTest/kotlin/scientifik/kmath/sequential/RingBufferTest.kt @@ -0,0 +1,19 @@ +package scientifik.kmath.sequential + +import scientifik.kmath.structures.asSequence +import scientifik.kmath.structures.runBlocking +import kotlin.test.Test +import kotlin.test.assertEquals + +class RingBufferTest { + @Test + fun testPush() { + val buffer = RingBuffer.build(20, Double.NaN) + runBlocking { + for (i in 1..30) { + buffer.push(i.toDouble()) + } + assertEquals(410.0, buffer.asSequence().sum()) + } + } +} \ No newline at end of file diff --git a/kmath-sequential/src/jvmMain/kotlin/scientifik/kmath/sequential/AccumulatorsExt.kt b/kmath-sequential/src/jvmMain/kotlin/scientifik/kmath/sequential/AccumulatorsExt.kt deleted file mode 100644 index 37190f8f3..000000000 --- a/kmath-sequential/src/jvmMain/kotlin/scientifik/kmath/sequential/AccumulatorsExt.kt +++ /dev/null @@ -1,33 +0,0 @@ -package scientifik.kmath.sequential - -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import scientifik.kmath.operations.Space -import scientifik.kmath.structures.runBlocking -import java.util.* - -/** - * A moving average with fixed window - */ -class MovingAverage(val window: Int, val context: Space) : Accumulator { - private val outputChannel = Channel() - private val queue = ArrayDeque(window) - private val mutex = Mutex() - - override suspend fun send(value: T) { - mutex.withLock { - queue.add(value) - if (queue.size == window) { - val sum = queue.fold(context.zero) { a, b -> context.run { a + b } } - outputChannel.send(context.run { sum / window }) - queue.pop() - } - } - } - - override fun push(value: T) = runBlocking { send(value) } - - val output: ReceiveChannel = outputChannel -} \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index 7045009ca..b306d1c8d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -20,7 +20,7 @@ pluginManagement { rootProject.name = "kmath" include( ":kmath-core", - ":kmath-io", +// ":kmath-io", ":kmath-coroutines", ":kmath-commons", ":kmath-koma",