Sequential operations

This commit is contained in:
Alexander Nozik 2019-02-02 19:56:45 +03:00
parent 1e99e89c4c
commit 002ddbee48
4 changed files with 61 additions and 9 deletions

View File

@ -1,25 +1,50 @@
package scientifik.kmath.sequential package scientifik.kmath.sequential
import kotlinx.atomicfu.atomic import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.atomicArrayOfNulls
import kotlinx.atomicfu.getAndUpdate import kotlinx.atomicfu.getAndUpdate
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import scientifik.kmath.operations.Space import scientifik.kmath.operations.Space
/** /**
* An object with a state that accumulates incoming elements * An object with a state that accumulates incoming elements
*/ */
interface Accumulator<in T> { interface Accumulator<in T> {
//PENDING use suspend operations? /**
* Push a value to accumulator. Blocks if [Accumulator] can't access any more elements at that time
*/
fun push(value: T) fun push(value: T)
}
fun <T> Accumulator<T>.pushAll(values: Iterable<T>) { /**
values.forEach { push(it) } * Does the same as [push], but suspends instead of blocking if accumulator is full
*/
suspend fun send(value: T) = push(value)
} }
/** /**
* Generic thread-safe summator * Push all elements to accumulator
*/ */
class GenericSum<T : Any>(val context: Space<T>) : Accumulator<T> { fun <T> Accumulator<T>.pushAll(values: Iterable<T>) {
for (value in values) {
push(value)
}
}
/**
* Offer all elements from channel to accumulator
*/
suspend fun <T> Accumulator<T>.offerAll(channel: ReceiveChannel<T>) {
for (value in channel) {
send(value)
}
}
/**
* Generic thread-safe average
*/
class GenericMean<T : Any>(val context: Space<T>) : Accumulator<T> {
//TODO add guard against overflow //TODO add guard against overflow
val counter = atomic(0) val counter = atomic(0)
val sum = atomic(context.zero) val sum = atomic(context.zero)

View File

@ -16,6 +16,6 @@ inline fun <T, C, R> Array<T>.reduce(context: C, crossinline reducer: Reducer<T,
object Reducers { object Reducers {
fun <T : Any> mean(): Reducer<T, Space<T>, T> = { context, data -> fun <T : Any> mean(): Reducer<T, Space<T>, T> = { context, data ->
data.fold(GenericSum(context)) { sum, value -> sum.apply { push(value) } }.value data.fold(GenericMean(context)) { sum, value -> sum.apply { push(value) } }.value
} }
} }

View File

@ -0,0 +1,28 @@
package scientifik.kmath.sequential
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import scientifik.kmath.operations.Space
import scientifik.kmath.structures.runBlocking
import java.util.*
/**
* A moving average with fixed window
*/
class MovingAverage<T : Any>(val window: Int, val context: Space<T>) : Accumulator<T> {
private val outputChannel = Channel<T>()
private val queue = ArrayDeque<T>(window)
override suspend fun send(value: T) {
queue.add(value)
if (queue.size == window) {
val sum = queue.fold(context.zero) { a, b -> context.run { a + b } }
outputChannel.send(context.run { sum / window })
queue.pop()
}
}
override fun push(value: T) = runBlocking { send(value) }
val output: ReceiveChannel<T> = outputChannel
}

View File

@ -1,7 +1,6 @@
package scientifik.kmath.chains package scientifik.kmath.sequential
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import scientifik.kmath.sequential.Chain
import kotlin.sequences.Sequence import kotlin.sequences.Sequence
/** /**