Buffer streaming next iteration
This commit is contained in:
parent
9c416e185b
commit
472dfd7b88
@ -2,6 +2,9 @@ package scientifik.kmath.transform
|
|||||||
|
|
||||||
import org.apache.commons.math3.transform.*
|
import org.apache.commons.math3.transform.*
|
||||||
import scientifik.kmath.operations.Complex
|
import scientifik.kmath.operations.Complex
|
||||||
|
import scientifik.kmath.sequential.Processor
|
||||||
|
import scientifik.kmath.sequential.Producer
|
||||||
|
import scientifik.kmath.sequential.map
|
||||||
import scientifik.kmath.structures.*
|
import scientifik.kmath.structures.*
|
||||||
|
|
||||||
|
|
||||||
@ -60,4 +63,24 @@ object Transformations {
|
|||||||
): BufferTransform<Double, Double> = {
|
): BufferTransform<Double, Double> = {
|
||||||
FastHadamardTransformer().transform(it.asArray(), direction).asBuffer()
|
FastHadamardTransformer().transform(it.asArray(), direction).asBuffer()
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process given [Producer] with commons-math fft transformation
|
||||||
|
*/
|
||||||
|
fun Producer<Buffer<Complex>>.FFT(
|
||||||
|
normalization: DftNormalization = DftNormalization.STANDARD,
|
||||||
|
direction: TransformType = TransformType.FORWARD
|
||||||
|
): Processor<Buffer<Complex>, Buffer<Complex>> {
|
||||||
|
val transform = Transformations.fourier(normalization, direction)
|
||||||
|
return map { transform(it) }
|
||||||
|
}
|
||||||
|
|
||||||
|
@JvmName("realFFT")
|
||||||
|
fun Producer<Buffer<Double>>.FFT(
|
||||||
|
normalization: DftNormalization = DftNormalization.STANDARD,
|
||||||
|
direction: TransformType = TransformType.FORWARD
|
||||||
|
): Processor<Buffer<Double>, Buffer<Complex>> {
|
||||||
|
val transform = Transformations.realFourier(normalization, direction)
|
||||||
|
return map { transform(it) }
|
||||||
}
|
}
|
@ -1,178 +1,78 @@
|
|||||||
package scientifik.kmath.sequential
|
package scientifik.kmath.sequential
|
||||||
|
|
||||||
import kotlinx.atomicfu.atomic
|
|
||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.GlobalScope
|
import kotlinx.coroutines.channels.Channel
|
||||||
import kotlinx.coroutines.channels.*
|
import kotlinx.coroutines.channels.produce
|
||||||
import kotlinx.coroutines.isActive
|
import kotlinx.coroutines.isActive
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.sync.Mutex
|
||||||
|
import kotlinx.coroutines.sync.withLock
|
||||||
import scientifik.kmath.structures.Buffer
|
import scientifik.kmath.structures.Buffer
|
||||||
import scientifik.kmath.structures.asBuffer
|
import scientifik.kmath.structures.BufferFactory
|
||||||
import scientifik.kmath.structures.asSequence
|
|
||||||
|
|
||||||
fun <T> Buffer<T>.asChannel(scope: CoroutineScope): ReceiveChannel<T> = scope.produce {
|
|
||||||
for (i in (0 until size)) {
|
|
||||||
send(get(i))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
interface BufferProducer<T> : Producer<T> {
|
|
||||||
suspend fun receiveBuffer(): Buffer<T>
|
|
||||||
}
|
|
||||||
|
|
||||||
interface BufferConsumer<T> : Consumer<T> {
|
|
||||||
suspend fun sendBuffer(buffer: Buffer<T>)
|
|
||||||
}
|
|
||||||
|
|
||||||
abstract class AbstractBufferProducer<T>(scope: CoroutineScope) : AbstractProducer<T>(scope), BufferProducer<T> {
|
|
||||||
|
|
||||||
override fun connectOutput(consumer: Consumer<T>) {
|
|
||||||
if (consumer is BufferConsumer) {
|
|
||||||
launch {
|
|
||||||
while (this.isActive) {
|
|
||||||
consumer.sendBuffer(receiveBuffer())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
super.connectOutput(consumer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
abstract class AbstractBufferConsumer<T>(scope: CoroutineScope) : AbstractConsumer<T>(scope), BufferConsumer<T> {
|
|
||||||
override fun connectInput(producer: Producer<T>) {
|
|
||||||
if (producer is BufferProducer) {
|
|
||||||
launch {
|
|
||||||
while (isActive) {
|
|
||||||
sendBuffer(producer.receiveBuffer())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
super.connectInput(producer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
abstract class AbstractBufferProcessor<T, R>(scope: CoroutineScope) :
|
|
||||||
AbstractProcessor<T, R>(scope),
|
|
||||||
BufferProducer<R>,
|
|
||||||
BufferConsumer<T> {
|
|
||||||
|
|
||||||
override fun connectOutput(consumer: Consumer<R>) {
|
|
||||||
if (consumer is BufferConsumer) {
|
|
||||||
launch {
|
|
||||||
while (this.isActive) {
|
|
||||||
consumer.sendBuffer(receiveBuffer())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
super.connectOutput(consumer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun connectInput(producer: Producer<T>) {
|
|
||||||
if (producer is BufferProducer) {
|
|
||||||
launch {
|
|
||||||
while (isActive) {
|
|
||||||
sendBuffer(producer.receiveBuffer())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
super.connectInput(producer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The basic generic buffer producer supporting both arrays and element-by-element simultaneously
|
* A processor that collects incoming elements into fixed size buffers
|
||||||
*/
|
*/
|
||||||
class BasicBufferProducer<T>(
|
class JoinProcessor<T>(
|
||||||
scope: CoroutineScope,
|
scope: CoroutineScope,
|
||||||
capacity: Int = Channel.UNLIMITED,
|
bufferSize: Int,
|
||||||
block: suspend ProducerScope<Buffer<T>>.() -> Unit
|
bufferFactory: BufferFactory<T> = Buffer.Companion::boxing
|
||||||
) : AbstractBufferProducer<T>(scope) {
|
) : AbstractProcessor<T, Buffer<T>>(scope) {
|
||||||
|
|
||||||
|
private val input = Channel<T>(bufferSize)
|
||||||
|
|
||||||
private val currentArray = atomic<ReceiveChannel<T>?>(null)
|
private val output = produce(coroutineContext) {
|
||||||
private val channel: ReceiveChannel<Buffer<T>> by lazy { produce(capacity = capacity, block = block) }
|
val list = ArrayList<T>(bufferSize)
|
||||||
private val cachingChannel by lazy {
|
while (isActive) {
|
||||||
channel.map {
|
list.clear()
|
||||||
it.also { buffer -> currentArray.lazySet(buffer.asChannel(this)) }
|
repeat(bufferSize) {
|
||||||
}
|
list.add(input.receive())
|
||||||
}
|
|
||||||
|
|
||||||
private fun DoubleArray.asChannel() = produce {
|
|
||||||
for (value in this@asChannel) {
|
|
||||||
send(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override suspend fun receiveBuffer(): Buffer<T> = cachingChannel.receive()
|
|
||||||
|
|
||||||
override suspend fun receive(): T = (currentArray.value ?: cachingChannel.receive().asChannel(this)).receive()
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class BufferReducer<T, S>(
|
|
||||||
scope: CoroutineScope,
|
|
||||||
initialState: S,
|
|
||||||
val fold: suspend (S, Buffer<T>) -> S
|
|
||||||
) : AbstractBufferConsumer<T>(scope) {
|
|
||||||
|
|
||||||
var state: S = initialState
|
|
||||||
private set
|
|
||||||
|
|
||||||
override suspend fun sendBuffer(buffer: Buffer<T>) {
|
|
||||||
state = fold(state, buffer)
|
|
||||||
}
|
|
||||||
|
|
||||||
override suspend fun send(value: T) = sendBuffer(arrayOf(value).asBuffer())
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert a [Buffer] to single element producer, splitting it in chunks if necessary
|
|
||||||
*/
|
|
||||||
fun <T> Buffer<T>.produce(scope: CoroutineScope = GlobalScope, chunkSize: Int = Int.MAX_VALUE) =
|
|
||||||
if (size < chunkSize) {
|
|
||||||
BasicBufferProducer<T>(scope) { send(this@produce) }
|
|
||||||
} else {
|
|
||||||
BasicBufferProducer<T>(scope) {
|
|
||||||
//TODO optimize this!
|
|
||||||
asSequence().chunked(chunkSize).forEach {
|
|
||||||
send(it.asBuffer())
|
|
||||||
}
|
}
|
||||||
|
val buffer = bufferFactory(bufferSize) { list[it] }
|
||||||
|
send(buffer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override suspend fun receive(): Buffer<T> = output.receive()
|
||||||
/**
|
|
||||||
* A buffer processor that works with buffers but could accumulate at lest [accumulate] elements from single input before processing.
|
|
||||||
*
|
|
||||||
* This class combines functions from [ChunkProcessor] and single buffer processor
|
|
||||||
*/
|
|
||||||
class AccumulatingBufferProcessor<T, R>(
|
|
||||||
scope: CoroutineScope,
|
|
||||||
val accumulate: Int,
|
|
||||||
val process: suspend (Buffer<T>) -> Buffer<R>
|
|
||||||
) :
|
|
||||||
AbstractBufferProcessor<T, R>(scope) {
|
|
||||||
|
|
||||||
private val inputChannel = Channel<Buffer<T>>()
|
|
||||||
private val outputChannel = inputChannel.map { process(it) }
|
|
||||||
|
|
||||||
override suspend fun receive(): R {
|
|
||||||
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
|
|
||||||
}
|
|
||||||
|
|
||||||
override suspend fun send(value: T) {
|
override suspend fun send(value: T) {
|
||||||
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
|
input.send(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A processor that splits incoming buffers into individual elements
|
||||||
|
*/
|
||||||
|
class SplitProcessor<T>(scope: CoroutineScope) : AbstractProcessor<Buffer<T>, T>(scope) {
|
||||||
|
|
||||||
|
private val input = Channel<Buffer<T>>()
|
||||||
|
|
||||||
|
private val mutex = Mutex()
|
||||||
|
|
||||||
|
private var currentBuffer: Buffer<T>? = null
|
||||||
|
|
||||||
|
private var pos = 0
|
||||||
|
|
||||||
|
|
||||||
|
override suspend fun receive(): T {
|
||||||
|
mutex.withLock {
|
||||||
|
while (currentBuffer == null || pos == currentBuffer!!.size) {
|
||||||
|
currentBuffer = input.receive()
|
||||||
|
pos = 0
|
||||||
|
}
|
||||||
|
return currentBuffer!![pos].also { pos++ }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun receiveBuffer(): Buffer<R> = outputChannel.receive()
|
override suspend fun send(value: Buffer<T>) {
|
||||||
|
input.send(value)
|
||||||
override suspend fun sendBuffer(buffer: Buffer<T>) {
|
|
||||||
inputChannel.send(buffer)
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fun <T> Producer<T>.chunked(chunkSize: Int, bufferFactory: BufferFactory<T>) =
|
||||||
|
JoinProcessor<T>(this, chunkSize, bufferFactory).also { connect(it) }
|
||||||
|
|
||||||
|
inline fun <reified T : Any> Producer<T>.chunked(chunkSize: Int) =
|
||||||
|
JoinProcessor<T>(this, chunkSize, Buffer.Companion::auto).also { connect(it) }
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
@ -170,33 +170,6 @@ class PipeProcessor<T, R>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* A [Processor] that splits the input in fixed chunked size and transforms each chunked
|
|
||||||
*/
|
|
||||||
class ChunkProcessor<T, R>(
|
|
||||||
scope: CoroutineScope,
|
|
||||||
chunkSize: Int,
|
|
||||||
process: suspend (List<T>) -> R
|
|
||||||
) : AbstractProcessor<T, R>(scope) {
|
|
||||||
|
|
||||||
private val input = Channel<T>(chunkSize)
|
|
||||||
|
|
||||||
private val chunked = produce<List<T>>(coroutineContext) {
|
|
||||||
val list = ArrayList<T>(chunkSize)
|
|
||||||
repeat(chunkSize) {
|
|
||||||
list.add(input.receive())
|
|
||||||
}
|
|
||||||
send(list)
|
|
||||||
}
|
|
||||||
|
|
||||||
private val output: ReceiveChannel<R> = chunked.map(coroutineContext, process)
|
|
||||||
|
|
||||||
override suspend fun receive(): R = output.receive()
|
|
||||||
|
|
||||||
override suspend fun send(value: T) {
|
|
||||||
input.send(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A moving window [Processor] with circular buffer
|
* A moving window [Processor] with circular buffer
|
||||||
@ -276,6 +249,9 @@ fun <T> ReceiveChannel<T>.produce(scope: CoroutineScope = GlobalScope) =
|
|||||||
fun <T, C : Consumer<T>> Producer<T>.consumer(consumerFactory: () -> C): C =
|
fun <T, C : Consumer<T>> Producer<T>.consumer(consumerFactory: () -> C): C =
|
||||||
consumerFactory().also { connect(it) }
|
consumerFactory().also { connect(it) }
|
||||||
|
|
||||||
|
fun <T, R> Producer<T>.map(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) =
|
||||||
|
PipeProcessor(this, capacity, process).also { connect(it) }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a reducer and connect this producer to reducer
|
* Create a reducer and connect this producer to reducer
|
||||||
*/
|
*/
|
||||||
@ -294,7 +270,6 @@ fun <T, R, P : Processor<T, R>> Producer<T>.process(processorBuilder: () -> P):
|
|||||||
fun <T, R> Producer<T>.process(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) =
|
fun <T, R> Producer<T>.process(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) =
|
||||||
PipeProcessor<T, R>(this, capacity, process).also { connect(it) }
|
PipeProcessor<T, R>(this, capacity, process).also { connect(it) }
|
||||||
|
|
||||||
fun <T, R> Producer<T>.chunked(chunkSize: Int, process: suspend (List<T>) -> R) =
|
|
||||||
ChunkProcessor(this, chunkSize, process).also { connect(it) }
|
|
||||||
|
|
||||||
fun <T> Producer<T>.chunked(chunkSize: Int) = chunked(chunkSize) { it }
|
fun <T, R> Producer<T>.windowed(window: Int, process: suspend (Buffer<T?>) -> R) =
|
||||||
|
WindowedProcessor(this, window, process).also { connect(it) }
|
Loading…
Reference in New Issue
Block a user