Ring buffer. At last.

This commit is contained in:
Alexander Nozik 2019-02-09 10:44:28 +03:00
parent 28695148e9
commit 58a22e4338
11 changed files with 296 additions and 153 deletions

View File

@ -2,6 +2,7 @@ package scientifik.kmath.linear
import scientifik.kmath.operations.RealField import scientifik.kmath.operations.RealField
import scientifik.kmath.operations.Ring import scientifik.kmath.operations.Ring
import scientifik.kmath.operations.sum
import scientifik.kmath.structures.* import scientifik.kmath.structures.*
import scientifik.kmath.structures.Buffer.Companion.DoubleBufferFactory import scientifik.kmath.structures.Buffer.Companion.DoubleBufferFactory
import scientifik.kmath.structures.Buffer.Companion.boxing import scientifik.kmath.structures.Buffer.Companion.boxing
@ -69,7 +70,7 @@ interface GenericMatrixContext<T : Any, R : Ring<T>> : MatrixContext<T> {
val row = rows[i] val row = rows[i]
val column = other.columns[j] val column = other.columns[j]
with(elementContext) { with(elementContext) {
row.asSequence().zip(column.asSequence(), ::multiply).sum() sum(row.asSequence().zip(column.asSequence(), ::multiply))
} }
} }
} }
@ -80,7 +81,7 @@ interface GenericMatrixContext<T : Any, R : Ring<T>> : MatrixContext<T> {
return point(rowNum) { i -> return point(rowNum) { i ->
val row = rows[i] val row = rows[i]
with(elementContext) { with(elementContext) {
row.asSequence().zip(vector.asSequence(), ::multiply).sum() sum(row.asSequence().zip(vector.asSequence(), ::multiply))
} }
} }
} }

View File

@ -77,7 +77,7 @@ interface MutableBuffer<T> : Buffer<T> {
/** /**
* Create a boxing mutable buffer of given type * Create a boxing mutable buffer of given type
*/ */
inline fun <T : Any> boxing(size: Int, initializer: (Int) -> T): MutableBuffer<T> = inline fun <T> boxing(size: Int, initializer: (Int) -> T): MutableBuffer<T> =
MutableListBuffer(MutableList(size, initializer)) MutableListBuffer(MutableList(size, initializer))
/** /**

View File

@ -1,5 +1,6 @@
package scientifik.kmath.sequential package scientifik.kmath.sequential
import scientifik.kmath.operations.Space
import kotlin.jvm.JvmName import kotlin.jvm.JvmName
@ -32,6 +33,10 @@ fun <T, R> List<T>.cumulative(initial: R, operation: (T, R) -> R): List<R> =
//Cumulative sum //Cumulative sum
fun <T> Iterable<T>.cumulativeSum(space: Space<T>) = with(space) {
cumulative(zero) { element: T, sum: T -> sum + element }
}
@JvmName("cumulativeSumOfDouble") @JvmName("cumulativeSumOfDouble")
fun Iterable<Double>.cumulativeSum() = this.cumulative(0.0) { element, sum -> sum + element } fun Iterable<Double>.cumulativeSum() = this.cumulative(0.0) { element, sum -> sum + element }
@ -41,6 +46,10 @@ fun Iterable<Int>.cumulativeSum() = this.cumulative(0) { element, sum -> sum + e
@JvmName("cumulativeSumOfLong") @JvmName("cumulativeSumOfLong")
fun Iterable<Long>.cumulativeSum() = this.cumulative(0L) { element, sum -> sum + element } fun Iterable<Long>.cumulativeSum() = this.cumulative(0L) { element, sum -> sum + element }
fun <T> Sequence<T>.cumulativeSum(space: Space<T>) = with(space) {
cumulative(zero) { element: T, sum: T -> sum + element }
}
@JvmName("cumulativeSumOfDouble") @JvmName("cumulativeSumOfDouble")
fun Sequence<Double>.cumulativeSum() = this.cumulative(0.0) { element, sum -> sum + element } fun Sequence<Double>.cumulativeSum() = this.cumulative(0.0) { element, sum -> sum + element }
@ -50,6 +59,10 @@ fun Sequence<Int>.cumulativeSum() = this.cumulative(0) { element, sum -> sum + e
@JvmName("cumulativeSumOfLong") @JvmName("cumulativeSumOfLong")
fun Sequence<Long>.cumulativeSum() = this.cumulative(0L) { element, sum -> sum + element } fun Sequence<Long>.cumulativeSum() = this.cumulative(0L) { element, sum -> sum + element }
fun <T> List<T>.cumulativeSum(space: Space<T>) = with(space) {
cumulative(zero) { element: T, sum: T -> sum + element }
}
@JvmName("cumulativeSumOfDouble") @JvmName("cumulativeSumOfDouble")
fun List<Double>.cumulativeSum() = this.cumulative(0.0) { element, sum -> sum + element } fun List<Double>.cumulativeSum() = this.cumulative(0.0) { element, sum -> sum + element }

View File

@ -0,0 +1,136 @@
package scientifik.kmath.sequential
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
interface DoubleProducer : Producer<Double> {
suspend fun receiveArray(): DoubleArray
}
interface DoubleConsumer : Consumer<Double> {
suspend fun sendArray(array: DoubleArray)
}
abstract class AbstractDoubleProducer(scope: CoroutineScope) : AbstractProducer<Double>(scope), DoubleProducer {
override fun connectOutput(consumer: Consumer<Double>) {
if (consumer is DoubleConsumer) {
launch {
while (this.isActive) {
consumer.sendArray(receiveArray())
}
}
} else {
super.connectOutput(consumer)
}
}
}
abstract class AbstractDoubleConsumer(scope: CoroutineScope) : AbstractConsumer<Double>(scope), DoubleConsumer {
override fun connectInput(producer: Producer<Double>) {
if (producer is DoubleProducer) {
launch {
while (isActive) {
sendArray(producer.receiveArray())
}
}
} else {
super.connectInput(producer)
}
}
}
abstract class AbstractDoubleProcessor(scope: CoroutineScope) : AbstractProcessor<Double, Double>(scope),
DoubleProducer, DoubleConsumer {
override fun connectOutput(consumer: Consumer<Double>) {
if (consumer is DoubleConsumer) {
launch {
while (this.isActive) {
consumer.sendArray(receiveArray())
}
}
} else {
super.connectOutput(consumer)
}
}
override fun connectInput(producer: Producer<Double>) {
if (producer is DoubleProducer) {
launch {
while (isActive) {
sendArray(producer.receiveArray())
}
}
} else {
super.connectInput(producer)
}
}
}
/**
* The basic [Double] producer supporting both arrays and element-by-element simultaneously
*/
class BasicDoubleProducer(
scope: CoroutineScope,
capacity: Int = Channel.UNLIMITED,
block: suspend ProducerScope<DoubleArray>.() -> Unit
) : AbstractDoubleProducer(scope) {
private val currentArray = atomic<ReceiveChannel<Double>?>(null)
private val channel: ReceiveChannel<DoubleArray> by lazy { produce(capacity = capacity, block = block) }
private val cachingChannel by lazy {
channel.map {
it.also { doubles -> currentArray.lazySet(doubles.asChannel()) }
}
}
private fun DoubleArray.asChannel() = produce {
for (value in this@asChannel) {
send(value)
}
}
override suspend fun receiveArray(): DoubleArray = cachingChannel.receive()
override suspend fun receive(): Double = (currentArray.value ?: cachingChannel.receive().asChannel()).receive()
}
class DoubleReducer<S>(
scope: CoroutineScope,
initialState: S,
val fold: suspend (S, DoubleArray) -> S
) : AbstractDoubleConsumer(scope) {
var state: S = initialState
private set
private val mutex = Mutex()
override suspend fun sendArray(array: DoubleArray) {
state = fold(state, array)
}
override suspend fun send(value: Double) = sendArray(doubleArrayOf(value))
}
/**
* Convert an array to single element producer, splitting it in chunks if necessary
*/
fun DoubleArray.produce(scope: CoroutineScope = GlobalScope, chunkSize: Int = Int.MAX_VALUE) = if (size < chunkSize) {
BasicDoubleProducer(scope) { send(this@produce) }
} else {
BasicDoubleProducer(scope) {
//TODO optimize this!
asSequence().chunked(chunkSize).forEach {
send(it.toDoubleArray())
}
}
}

View File

@ -0,0 +1,89 @@
package scientifik.kmath.sequential
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import scientifik.kmath.structures.Buffer
import scientifik.kmath.structures.MutableBuffer
import scientifik.kmath.structures.VirtualBuffer
/**
* Thread-safe ring buffer
*/
internal class RingBuffer<T>(
private val buffer: MutableBuffer<T>,
private var startIndex: Int = 0,
size: Int = 0
) : Buffer<T> {
private val mutex = Mutex()
override var size: Int = size
private set
override fun get(index: Int): T {
require(index >= 0) { "Index must be positive" }
require(index < size) { "Index $index is out of circular buffer size $size" }
return buffer[startIndex.forward(index)]
}
fun isFull() = size == buffer.size
/**
* Iterator could provide wrong results if buffer is changed in initialization (iteration is safe)
*/
override fun iterator(): Iterator<T> = object : AbstractIterator<T>() {
private var count = size
private var index = startIndex
val copy = buffer.copy()
override fun computeNext() {
if (count == 0) {
done()
} else {
setNext(copy[index])
index = index.forward(1)
count--
}
}
}
/**
* A safe snapshot operation
*/
suspend fun snapshot(): Buffer<T> {
mutex.withLock {
val copy = buffer.copy()
return VirtualBuffer(size) { i -> copy[startIndex.forward(i)] }
}
}
suspend fun push(element: T) {
mutex.withLock {
buffer[startIndex.forward(size)] = element
if (isFull()) {
startIndex++
} else {
size++
}
}
}
@Suppress("NOTHING_TO_INLINE")
private inline fun Int.forward(n: Int): Int = (this + n) % (buffer.size)
companion object {
inline fun <reified T : Any> build(size: Int, empty: T): RingBuffer<T> {
val buffer = MutableBuffer.auto(size) { empty }
return RingBuffer(buffer)
}
/**
* Slow yet universal buffer
*/
fun <T> boxing(size: Int): RingBuffer<T?> {
val buffer: MutableBuffer<T?> = MutableBuffer.boxing(size) { null }
return RingBuffer(buffer)
}
}
}

View File

@ -1,79 +0,0 @@
package scientifik.kmath.sequential
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.launch
interface DoubleProducer : Producer<Double> {
suspend fun receiveArray(): DoubleArray
}
interface DoubleConsumer : Consumer<Double> {
suspend fun sendArray(): DoubleArray
}
abstract class AbstractDoubleProducer(scope: CoroutineScope) : AbstractProducer<Double>(scope), DoubleProducer {
override suspend fun connectOutput(consumer: Consumer<Double>) {
if (consumer is DoubleConsumer) {
arrayOutput.toChannel(consumer.arrayInput)
} else {
connectOutput(super, consumer)
}
}
}
abstract class AbstractDoubleConsumer(scope: CoroutineScope) : AbstractConsumer<Double>(scope), DoubleConsumer {
override suspend fun connectInput(producer: Producer<Double>) {
if (producer is DoubleProducer) {
producer.arrayOutput.toChannel(arrayInput)
} else {
super.connectInput(producer)
}
}
}
abstract class AbstractDoubleProcessor(scope: CoroutineScope) : AbstractProcessor<Double, Double>(scope),
DoubleProducer, DoubleConsumer {
override suspend fun connectOutput(consumer: Consumer<Double>) {
if (consumer is DoubleConsumer) {
arrayOutput.toChannel(consumer.arrayInput)
} else {
connectOutput(super, consumer)
}
}
override suspend fun connectInput(producer: Producer<Double>) {
if (producer is DoubleProducer) {
producer.arrayOutput.toChannel(arrayInput)
} else {
super.connectInput(producer)
}
}
}
class DoubleReducer<S>(
scope: CoroutineScope,
initialState: S,
fold: suspend (S, DoubleArray) -> S
) : AbstractDoubleConsumer(scope) {
private val state = atomic(initialState)
val value: S = state.value
override val arrayInput: SendChannel<DoubleArray> by lazy {
//create a channel and start process of reading all elements into aggregator
Channel<DoubleArray>(capacity = Channel.RENDEZVOUS).also {
launch {
it.consumeEach { value -> state.update { fold(it, value) } }
}
}
}
override val input: SendChannel<DoubleArray> = object :Abstr
}

View File

@ -1,7 +1,5 @@
package scientifik.kmath.sequential package scientifik.kmath.sequential
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.* import kotlinx.coroutines.channels.*
@ -9,6 +7,7 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import scientifik.kmath.structures.Buffer
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
/** /**
@ -172,7 +171,7 @@ class PipeProcessor<T, R>(
} }
/** /**
* A [Processor] that splits the input in fixed chunk size and transforms each chunk * A [Processor] that splits the input in fixed chunked size and transforms each chunked
*/ */
class ChunkProcessor<T, R>( class ChunkProcessor<T, R>(
scope: CoroutineScope, scope: CoroutineScope,
@ -205,48 +204,45 @@ class ChunkProcessor<T, R>(
class WindowedProcessor<T, R>( class WindowedProcessor<T, R>(
scope: CoroutineScope, scope: CoroutineScope,
window: Int, window: Int,
process: suspend (List<T>) -> R val process: suspend (Buffer<T?>) -> R
) : AbstractProcessor<T, R>(scope) { ) : AbstractProcessor<T, R>(scope) {
private val ringBuffer = RingBuffer.boxing<T>(window)
private val channel = Channel<R>(Channel.RENDEZVOUS)
override suspend fun receive(): R { override suspend fun receive(): R {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates. return channel.receive()
} }
override suspend fun send(value: T) { override suspend fun send(value: T) {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates. ringBuffer.push(value)
channel.send(process(ringBuffer.snapshot()))
} }
} }
/** /**
* Thread-safe aggregator of values from input. The aggregator does not store all incoming values, it uses fold procedure * Thread-safe aggregator of values from input. The aggregator does not store all incoming values, it uses fold procedure
* to incorporate them into state on-arrival. * to incorporate them into state on-arrival.
* The current aggregated state could be accessed by [value]. The input channel is inactive unless requested * The current aggregated state could be accessed by [state]. The input channel is inactive unless requested
* @param T - the type of the input element * @param T - the type of the input element
* @param S - the type of the aggregator * @param S - the type of the aggregator
*/ */
class Reducer<T, S>( class Reducer<T, S>(
scope: CoroutineScope, scope: CoroutineScope,
initialState: S, initialState: S,
fold: suspend (S, T) -> S val fold: suspend (S, T) -> S
) : AbstractConsumer<T>(scope) { ) : AbstractConsumer<T>(scope) {
private val state = atomic(initialState) var state: S = initialState
private set
val value: S = state.value private val mutex = Mutex()
private val input: SendChannel<T> by lazy { override suspend fun send(value: T) = mutex.withLock {
//create a channel and start process of reading all elements into aggregator state = fold(state, value)
Channel<T>(capacity = Channel.RENDEZVOUS).also {
launch {
it.consumeEach { value -> state.update { fold(it, value) } }
} }
} }
}
override suspend fun send(value: T) = input.send(value)
}
/** /**
* Collector that accumulates all values in a list. List could be accessed from non-suspending environment via [list] value. * Collector that accumulates all values in a list. List could be accessed from non-suspending environment via [list] value.
@ -257,21 +253,12 @@ class Collector<T>(scope: CoroutineScope) : AbstractConsumer<T>(scope) {
private val mutex = Mutex() private val mutex = Mutex()
val list: List<T> get() = _list val list: List<T> get() = _list
private val input: SendChannel<T> by lazy { override suspend fun send(value: T) {
//create a channel and start process of reading all elements into aggregator
Channel<T>(capacity = Channel.RENDEZVOUS).also {
launch {
it.consumeEach { value ->
mutex.withLock { mutex.withLock {
_list.add(value) _list.add(value)
} }
} }
} }
}
}
override suspend fun send(value: T) = input.send(value)
}
/** /**
* Convert a sequence to [Producer] * Convert a sequence to [Producer]
@ -285,6 +272,10 @@ fun <T> Sequence<T>.produce(scope: CoroutineScope = GlobalScope) =
fun <T> ReceiveChannel<T>.produce(scope: CoroutineScope = GlobalScope) = fun <T> ReceiveChannel<T>.produce(scope: CoroutineScope = GlobalScope) =
GenericProducer<T>(scope) { for (e in this@produce) send(e) } GenericProducer<T>(scope) { for (e in this@produce) send(e) }
fun <T, C : Consumer<T>> Producer<T>.consumer(consumerFactory: () -> C): C =
consumerFactory().also { connect(it) }
/** /**
* Create a reducer and connect this producer to reducer * Create a reducer and connect this producer to reducer
*/ */
@ -297,8 +288,13 @@ fun <T, S> Producer<T>.reduce(initialState: S, fold: suspend (S, T) -> S) =
fun <T> Producer<T>.collect() = fun <T> Producer<T>.collect() =
Collector<T>(this).also { connect(it) } Collector<T>(this).also { connect(it) }
fun <T, R> Producer<T>.process(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) = fun <T, R, P : Processor<T, R>> Producer<T>.process(processorBuilder: () -> P): P =
PipeProcessor(this, capacity, process) processorBuilder().also { connect(it) }
fun <T, R> Producer<T>.chunk(chunkSize: Int, process: suspend (List<T>) -> R) = fun <T, R> Producer<T>.process(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) =
ChunkProcessor(this, chunkSize, process) PipeProcessor<T, R>(this, capacity, process).also { connect(it) }
fun <T, R> Producer<T>.chunked(chunkSize: Int, process: suspend (List<T>) -> R) =
ChunkProcessor(this, chunkSize, process).also { connect(it) }
fun <T> Producer<T>.chunked(chunkSize: Int) = chunked(chunkSize) { it }

View File

@ -1,5 +1,6 @@
package scientifik.kmath.misc package scientifik.kmath.misc
import scientifik.kmath.sequential.cumulativeSum
import kotlin.test.Test import kotlin.test.Test
import kotlin.test.assertEquals import kotlin.test.assertEquals

View File

@ -0,0 +1,19 @@
package scientifik.kmath.sequential
import scientifik.kmath.structures.asSequence
import scientifik.kmath.structures.runBlocking
import kotlin.test.Test
import kotlin.test.assertEquals
class RingBufferTest {
@Test
fun testPush() {
val buffer = RingBuffer.build(20, Double.NaN)
runBlocking {
for (i in 1..30) {
buffer.push(i.toDouble())
}
assertEquals(410.0, buffer.asSequence().sum())
}
}
}

View File

@ -1,33 +0,0 @@
package scientifik.kmath.sequential
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import scientifik.kmath.operations.Space
import scientifik.kmath.structures.runBlocking
import java.util.*
/**
* A moving average with fixed window
*/
class MovingAverage<T : Any>(val window: Int, val context: Space<T>) : Accumulator<T> {
private val outputChannel = Channel<T>()
private val queue = ArrayDeque<T>(window)
private val mutex = Mutex()
override suspend fun send(value: T) {
mutex.withLock {
queue.add(value)
if (queue.size == window) {
val sum = queue.fold(context.zero) { a, b -> context.run { a + b } }
outputChannel.send(context.run { sum / window })
queue.pop()
}
}
}
override fun push(value: T) = runBlocking { send(value) }
val output: ReceiveChannel<T> = outputChannel
}

View File

@ -20,7 +20,7 @@ pluginManagement {
rootProject.name = "kmath" rootProject.name = "kmath"
include( include(
":kmath-core", ":kmath-core",
":kmath-io", // ":kmath-io",
":kmath-coroutines", ":kmath-coroutines",
":kmath-commons", ":kmath-commons",
":kmath-koma", ":kmath-koma",