diff --git a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Accumulators.kt b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Accumulators.kt index 283db0809..94ac0ac2e 100644 --- a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Accumulators.kt +++ b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Accumulators.kt @@ -1,25 +1,50 @@ package scientifik.kmath.sequential import kotlinx.atomicfu.atomic +import kotlinx.atomicfu.atomicArrayOfNulls import kotlinx.atomicfu.getAndUpdate +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel import scientifik.kmath.operations.Space /** * An object with a state that accumulates incoming elements */ interface Accumulator { - //PENDING use suspend operations? + /** + * Push a value to accumulator. Blocks if [Accumulator] can't access any more elements at that time + */ fun push(value: T) -} -fun Accumulator.pushAll(values: Iterable) { - values.forEach { push(it) } + /** + * Does the same as [push], but suspends instead of blocking if accumulator is full + */ + suspend fun send(value: T) = push(value) } /** - * Generic thread-safe summator + * Push all elements to accumulator */ -class GenericSum(val context: Space) : Accumulator { +fun Accumulator.pushAll(values: Iterable) { + for (value in values) { + push(value) + } +} + +/** + * Offer all elements from channel to accumulator + */ +suspend fun Accumulator.offerAll(channel: ReceiveChannel) { + for (value in channel) { + send(value) + } +} + +/** + * Generic thread-safe average + */ +class GenericMean(val context: Space) : Accumulator { //TODO add guard against overflow val counter = atomic(0) val sum = atomic(context.zero) diff --git a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Reducers.kt b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Reducers.kt index 65dea5ac0..cf0601135 100644 --- a/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Reducers.kt +++ b/kmath-sequential/src/commonMain/kotlin/scientifik/kmath/sequential/Reducers.kt @@ -16,6 +16,6 @@ inline fun Array.reduce(context: C, crossinline reducer: Reducer mean(): Reducer, T> = { context, data -> - data.fold(GenericSum(context)) { sum, value -> sum.apply { push(value) } }.value + data.fold(GenericMean(context)) { sum, value -> sum.apply { push(value) } }.value } } \ No newline at end of file diff --git a/kmath-sequential/src/jvmMain/kotlin/scientifik/kmath/sequential/AccumulatorsExt.kt b/kmath-sequential/src/jvmMain/kotlin/scientifik/kmath/sequential/AccumulatorsExt.kt new file mode 100644 index 000000000..cc9a07668 --- /dev/null +++ b/kmath-sequential/src/jvmMain/kotlin/scientifik/kmath/sequential/AccumulatorsExt.kt @@ -0,0 +1,28 @@ +package scientifik.kmath.sequential + +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import scientifik.kmath.operations.Space +import scientifik.kmath.structures.runBlocking +import java.util.* + +/** + * A moving average with fixed window + */ +class MovingAverage(val window: Int, val context: Space) : Accumulator { + private val outputChannel = Channel() + private val queue = ArrayDeque(window) + + override suspend fun send(value: T) { + queue.add(value) + if (queue.size == window) { + val sum = queue.fold(context.zero) { a, b -> context.run { a + b } } + outputChannel.send(context.run { sum / window }) + queue.pop() + } + } + + override fun push(value: T) = runBlocking { send(value) } + + val output: ReceiveChannel = outputChannel +} \ No newline at end of file diff --git a/kmath-sequential/src/jvmMain/kotlin/scientifik/kmath/chains/ChainExt.kt b/kmath-sequential/src/jvmMain/kotlin/scientifik/kmath/sequential/ChainExt.kt similarity index 93% rename from kmath-sequential/src/jvmMain/kotlin/scientifik/kmath/chains/ChainExt.kt rename to kmath-sequential/src/jvmMain/kotlin/scientifik/kmath/sequential/ChainExt.kt index af61a5059..74cb6bc6d 100644 --- a/kmath-sequential/src/jvmMain/kotlin/scientifik/kmath/chains/ChainExt.kt +++ b/kmath-sequential/src/jvmMain/kotlin/scientifik/kmath/sequential/ChainExt.kt @@ -1,7 +1,6 @@ -package scientifik.kmath.chains +package scientifik.kmath.sequential import kotlinx.coroutines.runBlocking -import scientifik.kmath.sequential.Chain import kotlin.sequences.Sequence /**