Windowed flow test
This commit is contained in:
parent
d138ce3889
commit
3ddff86e24
@ -1,10 +1,10 @@
|
|||||||
package scientifik.kmath.transform
|
package scientifik.kmath.transform
|
||||||
|
|
||||||
|
import kotlinx.coroutines.FlowPreview
|
||||||
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
import kotlinx.coroutines.flow.map
|
||||||
import org.apache.commons.math3.transform.*
|
import org.apache.commons.math3.transform.*
|
||||||
import scientifik.kmath.operations.Complex
|
import scientifik.kmath.operations.Complex
|
||||||
import scientifik.kmath.streaming.Processor
|
|
||||||
import scientifik.kmath.streaming.Producer
|
|
||||||
import scientifik.kmath.streaming.map
|
|
||||||
import scientifik.kmath.structures.*
|
import scientifik.kmath.structures.*
|
||||||
|
|
||||||
|
|
||||||
@ -68,19 +68,21 @@ object Transformations {
|
|||||||
/**
|
/**
|
||||||
* Process given [Producer] with commons-math fft transformation
|
* Process given [Producer] with commons-math fft transformation
|
||||||
*/
|
*/
|
||||||
fun Producer<Buffer<Complex>>.FFT(
|
@FlowPreview
|
||||||
|
fun Flow<Buffer<Complex>>.FFT(
|
||||||
normalization: DftNormalization = DftNormalization.STANDARD,
|
normalization: DftNormalization = DftNormalization.STANDARD,
|
||||||
direction: TransformType = TransformType.FORWARD
|
direction: TransformType = TransformType.FORWARD
|
||||||
): Processor<Buffer<Complex>, Buffer<Complex>> {
|
): Flow<Buffer<Complex>> {
|
||||||
val transform = Transformations.fourier(normalization, direction)
|
val transform = Transformations.fourier(normalization, direction)
|
||||||
return map { transform(it) }
|
return map { transform(it) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@FlowPreview
|
||||||
@JvmName("realFFT")
|
@JvmName("realFFT")
|
||||||
fun Producer<Buffer<Double>>.FFT(
|
fun Flow<Buffer<Double>>.FFT(
|
||||||
normalization: DftNormalization = DftNormalization.STANDARD,
|
normalization: DftNormalization = DftNormalization.STANDARD,
|
||||||
direction: TransformType = TransformType.FORWARD
|
direction: TransformType = TransformType.FORWARD
|
||||||
): Processor<Buffer<Double>, Buffer<Complex>> {
|
): Flow<Buffer<Complex>> {
|
||||||
val transform = Transformations.realFourier(normalization, direction)
|
val transform = Transformations.realFourier(normalization, direction)
|
||||||
return map { transform(it) }
|
return map { transform(it) }
|
||||||
}
|
}
|
@ -249,7 +249,10 @@ inline class ReadOnlyBuffer<T>(val buffer: MutableBuffer<T>) : Buffer<T> {
|
|||||||
* Useful when one needs single element from the buffer.
|
* Useful when one needs single element from the buffer.
|
||||||
*/
|
*/
|
||||||
class VirtualBuffer<T>(override val size: Int, private val generator: (Int) -> T) : Buffer<T> {
|
class VirtualBuffer<T>(override val size: Int, private val generator: (Int) -> T) : Buffer<T> {
|
||||||
override fun get(index: Int): T = generator(index)
|
override fun get(index: Int): T {
|
||||||
|
if (index < 0 || index >= size) throw IndexOutOfBoundsException("Expected index from 0 to ${size - 1}, but found $index")
|
||||||
|
return generator(index)
|
||||||
|
}
|
||||||
|
|
||||||
override fun iterator(): Iterator<T> = (0 until size).asSequence().map(generator).iterator()
|
override fun iterator(): Iterator<T> = (0 until size).asSequence().map(generator).iterator()
|
||||||
|
|
||||||
|
@ -13,12 +13,12 @@ val Dispatchers.Math: CoroutineDispatcher get() = Dispatchers.Default
|
|||||||
/**
|
/**
|
||||||
* An imitator of [Deferred] which holds a suspended function block and dispatcher
|
* An imitator of [Deferred] which holds a suspended function block and dispatcher
|
||||||
*/
|
*/
|
||||||
class LazyDeferred<T>(val dispatcher: CoroutineDispatcher, val block: suspend CoroutineScope.() -> T) {
|
internal class LazyDeferred<T>(val dispatcher: CoroutineDispatcher, val block: suspend CoroutineScope.() -> T) {
|
||||||
private var deferred: Deferred<T>? = null
|
private var deferred: Deferred<T>? = null
|
||||||
|
|
||||||
fun CoroutineScope.start() {
|
internal fun start(scope: CoroutineScope) {
|
||||||
if(deferred==null) {
|
if(deferred==null) {
|
||||||
deferred = async(dispatcher, block = block)
|
deferred = scope.async(dispatcher, block = block)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -26,7 +26,7 @@ class LazyDeferred<T>(val dispatcher: CoroutineDispatcher, val block: suspend Co
|
|||||||
}
|
}
|
||||||
|
|
||||||
@FlowPreview
|
@FlowPreview
|
||||||
inline class AsyncFlow<T>(val deferredFlow: Flow<LazyDeferred<T>>) : Flow<T> {
|
class AsyncFlow<T> internal constructor(internal val deferredFlow: Flow<LazyDeferred<T>>) : Flow<T> {
|
||||||
override suspend fun collect(collector: FlowCollector<T>) {
|
override suspend fun collect(collector: FlowCollector<T>) {
|
||||||
deferredFlow.collect {
|
deferredFlow.collect {
|
||||||
collector.emit((it.await()))
|
collector.emit((it.await()))
|
||||||
@ -46,23 +46,23 @@ fun <T, R> Flow<T>.async(
|
|||||||
}
|
}
|
||||||
|
|
||||||
@FlowPreview
|
@FlowPreview
|
||||||
fun <T, R> AsyncFlow<T>.map(action: (T) -> R) = deferredFlow.map { input ->
|
fun <T, R> AsyncFlow<T>.map(action: (T) -> R) = AsyncFlow(deferredFlow.map { input ->
|
||||||
//TODO add actual composition
|
//TODO add function composition
|
||||||
LazyDeferred(input.dispatcher) {
|
LazyDeferred(input.dispatcher) {
|
||||||
input.run { start() }
|
input.start(this)
|
||||||
action(input.await())
|
action(input.await())
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
|
|
||||||
@ExperimentalCoroutinesApi
|
@ExperimentalCoroutinesApi
|
||||||
@FlowPreview
|
@FlowPreview
|
||||||
suspend fun <T> AsyncFlow<T>.collect(concurrency: Int, collector: FlowCollector<T>) {
|
suspend fun <T> AsyncFlow<T>.collect(concurrency: Int, collector: FlowCollector<T>) {
|
||||||
require(concurrency >= 0) { "Buffer size should be positive, but was $concurrency" }
|
require(concurrency >= 1) { "Buffer size should be more than 1, but was $concurrency" }
|
||||||
coroutineScope {
|
coroutineScope {
|
||||||
//Starting up to N deferred coroutines ahead of time
|
//Starting up to N deferred coroutines ahead of time
|
||||||
val channel = produce(capacity = concurrency) {
|
val channel = produce(capacity = concurrency-1) {
|
||||||
deferredFlow.collect { value ->
|
deferredFlow.collect { value ->
|
||||||
value.run { start() }
|
value.start(this@coroutineScope)
|
||||||
send(value)
|
send(value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -91,3 +91,31 @@ suspend fun <T> AsyncFlow<T>.collect(concurrency: Int, action: suspend (value: T
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//suspend fun <T> Flow<T>.collect(concurrency: Int, dispatcher: CoroutineDispatcher, collector: FlowCollector<T>){
|
||||||
|
// require(concurrency >= 1) { "Buffer size should be more than 1, but was $concurrency" }
|
||||||
|
// coroutineScope {
|
||||||
|
// //Starting up to N deferred coroutines ahead of time
|
||||||
|
// val channel = produce(capacity = concurrency-1) {
|
||||||
|
// this@collect.
|
||||||
|
// deferredFlow.collect { value ->
|
||||||
|
// value.start(this@coroutineScope)
|
||||||
|
// send(value)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// (channel as Job).invokeOnCompletion {
|
||||||
|
// if (it is CancellationException && it.cause == null) cancel()
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// for (element in channel) {
|
||||||
|
// collector.emit(element.await())
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// val producer = channel as Job
|
||||||
|
// if (producer.isCancelled) {
|
||||||
|
// producer.join()
|
||||||
|
// //throw producer.getCancellationException()
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
|
||||||
|
@ -1,13 +1,10 @@
|
|||||||
package scientifik.kmath.streaming
|
package scientifik.kmath.streaming
|
||||||
|
|
||||||
import kotlinx.coroutines.*
|
import kotlinx.coroutines.FlowPreview
|
||||||
import kotlinx.coroutines.channels.ReceiveChannel
|
|
||||||
import kotlinx.coroutines.channels.produce
|
|
||||||
import kotlinx.coroutines.flow.*
|
import kotlinx.coroutines.flow.*
|
||||||
import scientifik.kmath.structures.Buffer
|
import scientifik.kmath.structures.Buffer
|
||||||
import scientifik.kmath.structures.BufferFactory
|
import scientifik.kmath.structures.BufferFactory
|
||||||
import scientifik.kmath.structures.DoubleBuffer
|
import scientifik.kmath.structures.DoubleBuffer
|
||||||
import kotlin.coroutines.coroutineContext
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a [Flow] from buffer
|
* Create a [Flow] from buffer
|
||||||
@ -60,3 +57,17 @@ fun Flow<Double>.chunked(bufferSize: Int) = flow {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Map a flow to a moving window buffer. The window step is one.
|
||||||
|
* In order to get different steps, one could use skip operation.
|
||||||
|
*/
|
||||||
|
@FlowPreview
|
||||||
|
fun <T> Flow<T>.windowed(window: Int): Flow<Buffer<T>> = flow {
|
||||||
|
require(window > 1) { "Window size must be more than one" }
|
||||||
|
val ringBuffer = RingBuffer.boxing<T>(window)
|
||||||
|
this@windowed.collect { element ->
|
||||||
|
ringBuffer.push(element)
|
||||||
|
emit(ringBuffer.snapshot())
|
||||||
|
}
|
||||||
|
}
|
@ -1,82 +0,0 @@
|
|||||||
package scientifik.kmath.streaming
|
|
||||||
|
|
||||||
import kotlinx.coroutines.CoroutineScope
|
|
||||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
|
||||||
import kotlinx.coroutines.channels.Channel
|
|
||||||
import kotlinx.coroutines.channels.produce
|
|
||||||
import kotlinx.coroutines.isActive
|
|
||||||
import kotlinx.coroutines.sync.Mutex
|
|
||||||
import kotlinx.coroutines.sync.withLock
|
|
||||||
import scientifik.kmath.structures.Buffer
|
|
||||||
import scientifik.kmath.structures.BufferFactory
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A processor that collects incoming elements into fixed size buffers
|
|
||||||
*/
|
|
||||||
@ExperimentalCoroutinesApi
|
|
||||||
class JoinProcessor<T>(
|
|
||||||
scope: CoroutineScope,
|
|
||||||
bufferSize: Int,
|
|
||||||
bufferFactory: BufferFactory<T> = Buffer.Companion::boxing
|
|
||||||
) : AbstractProcessor<T, Buffer<T>>(scope) {
|
|
||||||
|
|
||||||
private val input = Channel<T>(bufferSize)
|
|
||||||
|
|
||||||
private val output = produce(coroutineContext) {
|
|
||||||
val list = ArrayList<T>(bufferSize)
|
|
||||||
while (isActive) {
|
|
||||||
list.clear()
|
|
||||||
repeat(bufferSize) {
|
|
||||||
list.add(input.receive())
|
|
||||||
}
|
|
||||||
val buffer = bufferFactory(bufferSize) { list[it] }
|
|
||||||
send(buffer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override suspend fun receive(): Buffer<T> = output.receive()
|
|
||||||
|
|
||||||
override suspend fun send(value: T) {
|
|
||||||
input.send(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A processor that splits incoming buffers into individual elements
|
|
||||||
*/
|
|
||||||
class SplitProcessor<T>(scope: CoroutineScope) : AbstractProcessor<Buffer<T>, T>(scope) {
|
|
||||||
|
|
||||||
private val input = Channel<Buffer<T>>()
|
|
||||||
|
|
||||||
private val mutex = Mutex()
|
|
||||||
|
|
||||||
private var currentBuffer: Buffer<T>? = null
|
|
||||||
|
|
||||||
private var pos = 0
|
|
||||||
|
|
||||||
|
|
||||||
override suspend fun receive(): T {
|
|
||||||
mutex.withLock {
|
|
||||||
while (currentBuffer == null || pos == currentBuffer!!.size) {
|
|
||||||
currentBuffer = input.receive()
|
|
||||||
pos = 0
|
|
||||||
}
|
|
||||||
return currentBuffer!![pos].also { pos++ }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override suspend fun send(value: Buffer<T>) {
|
|
||||||
input.send(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@ExperimentalCoroutinesApi
|
|
||||||
fun <T> Producer<T>.chunked(chunkSize: Int, bufferFactory: BufferFactory<T>) =
|
|
||||||
JoinProcessor<T>(this, chunkSize, bufferFactory).also { connect(it) }
|
|
||||||
|
|
||||||
@ExperimentalCoroutinesApi
|
|
||||||
inline fun <reified T : Any> Producer<T>.chunked(chunkSize: Int) =
|
|
||||||
JoinProcessor<T>(this, chunkSize, Buffer.Companion::auto).also { connect(it) }
|
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -5,12 +5,14 @@ import kotlinx.coroutines.sync.withLock
|
|||||||
import scientifik.kmath.structures.Buffer
|
import scientifik.kmath.structures.Buffer
|
||||||
import scientifik.kmath.structures.MutableBuffer
|
import scientifik.kmath.structures.MutableBuffer
|
||||||
import scientifik.kmath.structures.VirtualBuffer
|
import scientifik.kmath.structures.VirtualBuffer
|
||||||
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Thread-safe ring buffer
|
* Thread-safe ring buffer
|
||||||
*/
|
*/
|
||||||
|
@Suppress("UNCHECKED_CAST")
|
||||||
internal class RingBuffer<T>(
|
internal class RingBuffer<T>(
|
||||||
private val buffer: MutableBuffer<T>,
|
private val buffer: MutableBuffer<T?>,
|
||||||
private var startIndex: Int = 0,
|
private var startIndex: Int = 0,
|
||||||
size: Int = 0
|
size: Int = 0
|
||||||
) : Buffer<T> {
|
) : Buffer<T> {
|
||||||
@ -23,7 +25,7 @@ internal class RingBuffer<T>(
|
|||||||
override fun get(index: Int): T {
|
override fun get(index: Int): T {
|
||||||
require(index >= 0) { "Index must be positive" }
|
require(index >= 0) { "Index must be positive" }
|
||||||
require(index < size) { "Index $index is out of circular buffer size $size" }
|
require(index < size) { "Index $index is out of circular buffer size $size" }
|
||||||
return buffer[startIndex.forward(index)]
|
return buffer[startIndex.forward(index)] as T
|
||||||
}
|
}
|
||||||
|
|
||||||
fun isFull() = size == buffer.size
|
fun isFull() = size == buffer.size
|
||||||
@ -40,7 +42,7 @@ internal class RingBuffer<T>(
|
|||||||
if (count == 0) {
|
if (count == 0) {
|
||||||
done()
|
done()
|
||||||
} else {
|
} else {
|
||||||
setNext(copy[index])
|
setNext(copy[index] as T)
|
||||||
index = index.forward(1)
|
index = index.forward(1)
|
||||||
count--
|
count--
|
||||||
}
|
}
|
||||||
@ -53,7 +55,9 @@ internal class RingBuffer<T>(
|
|||||||
suspend fun snapshot(): Buffer<T> {
|
suspend fun snapshot(): Buffer<T> {
|
||||||
mutex.withLock {
|
mutex.withLock {
|
||||||
val copy = buffer.copy()
|
val copy = buffer.copy()
|
||||||
return VirtualBuffer(size) { i -> copy[startIndex.forward(i)] }
|
return VirtualBuffer(size) { i ->
|
||||||
|
copy[startIndex.forward(i)] as T
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -74,14 +78,14 @@ internal class RingBuffer<T>(
|
|||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
inline fun <reified T : Any> build(size: Int, empty: T): RingBuffer<T> {
|
inline fun <reified T : Any> build(size: Int, empty: T): RingBuffer<T> {
|
||||||
val buffer = MutableBuffer.auto(size) { empty }
|
val buffer = MutableBuffer.auto(size) { empty } as MutableBuffer<T?>
|
||||||
return RingBuffer(buffer)
|
return RingBuffer(buffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Slow yet universal buffer
|
* Slow yet universal buffer
|
||||||
*/
|
*/
|
||||||
fun <T> boxing(size: Int): RingBuffer<T?> {
|
fun <T> boxing(size: Int): RingBuffer<T> {
|
||||||
val buffer: MutableBuffer<T?> = MutableBuffer.boxing(size) { null }
|
val buffer: MutableBuffer<T?> = MutableBuffer.boxing(size) { null }
|
||||||
return RingBuffer(buffer)
|
return RingBuffer(buffer)
|
||||||
}
|
}
|
||||||
|
@ -1,273 +0,0 @@
|
|||||||
package scientifik.kmath.streaming
|
|
||||||
|
|
||||||
import kotlinx.coroutines.*
|
|
||||||
import kotlinx.coroutines.channels.*
|
|
||||||
import kotlinx.coroutines.sync.Mutex
|
|
||||||
import kotlinx.coroutines.sync.withLock
|
|
||||||
import scientifik.kmath.structures.Buffer
|
|
||||||
import kotlin.coroutines.CoroutineContext
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Initial chain block. Could produce an element sequence and be connected to single [Consumer]
|
|
||||||
*
|
|
||||||
* The general rule is that channel is created on first call. Also each element is responsible for its connection so
|
|
||||||
* while the connections are symmetric, the scope, used for making the connection is responsible for cancelation.
|
|
||||||
*
|
|
||||||
* Also connections are not reversible. Once connected block stays faithful until it finishes processing.
|
|
||||||
* Manually putting elements to connected block could lead to undetermined behavior and must be avoided.
|
|
||||||
*/
|
|
||||||
interface Producer<T> : CoroutineScope {
|
|
||||||
fun connect(consumer: Consumer<T>)
|
|
||||||
|
|
||||||
suspend fun receive(): T
|
|
||||||
|
|
||||||
val consumer: Consumer<T>?
|
|
||||||
|
|
||||||
val outputIsConnected: Boolean get() = consumer != null
|
|
||||||
|
|
||||||
//fun close()
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Terminal chain block. Could consume an element sequence and be connected to signle [Producer]
|
|
||||||
*/
|
|
||||||
interface Consumer<T> : CoroutineScope {
|
|
||||||
fun connect(producer: Producer<T>)
|
|
||||||
|
|
||||||
suspend fun send(value: T)
|
|
||||||
|
|
||||||
val producer: Producer<T>?
|
|
||||||
|
|
||||||
val inputIsConnected: Boolean get() = producer != null
|
|
||||||
|
|
||||||
//fun close()
|
|
||||||
}
|
|
||||||
|
|
||||||
interface Processor<T, R> : Consumer<T>, Producer<R>
|
|
||||||
|
|
||||||
abstract class AbstractProducer<T>(scope: CoroutineScope) : Producer<T> {
|
|
||||||
override val coroutineContext: CoroutineContext = scope.coroutineContext
|
|
||||||
|
|
||||||
override var consumer: Consumer<T>? = null
|
|
||||||
protected set
|
|
||||||
|
|
||||||
override fun connect(consumer: Consumer<T>) {
|
|
||||||
//Ignore if already connected to specific consumer
|
|
||||||
if (consumer != this.consumer) {
|
|
||||||
if (outputIsConnected) error("The output slot of producer is occupied")
|
|
||||||
if (consumer.inputIsConnected) error("The input slot of consumer is occupied")
|
|
||||||
this.consumer = consumer
|
|
||||||
if (consumer.producer != null) {
|
|
||||||
//No need to save the job, it will be canceled on scope cancel
|
|
||||||
connectOutput(consumer)
|
|
||||||
// connect back, consumer is already set so no circular reference
|
|
||||||
consumer.connect(this)
|
|
||||||
} else error("Unreachable statement")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected open fun connectOutput(consumer: Consumer<T>) {
|
|
||||||
launch {
|
|
||||||
while (this.isActive) {
|
|
||||||
consumer.send(receive())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
abstract class AbstractConsumer<T>(scope: CoroutineScope) : Consumer<T> {
|
|
||||||
override val coroutineContext: CoroutineContext = scope.coroutineContext
|
|
||||||
|
|
||||||
override var producer: Producer<T>? = null
|
|
||||||
protected set
|
|
||||||
|
|
||||||
override fun connect(producer: Producer<T>) {
|
|
||||||
//Ignore if already connected to specific consumer
|
|
||||||
if (producer != this.producer) {
|
|
||||||
if (inputIsConnected) error("The input slot of consumer is occupied")
|
|
||||||
if (producer.outputIsConnected) error("The input slot of producer is occupied")
|
|
||||||
this.producer = producer
|
|
||||||
//No need to save the job, it will be canceled on scope cancel
|
|
||||||
if (producer.consumer != null) {
|
|
||||||
connectInput(producer)
|
|
||||||
// connect back
|
|
||||||
producer.connect(this)
|
|
||||||
} else error("Unreachable statement")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected open fun connectInput(producer: Producer<T>) {
|
|
||||||
launch {
|
|
||||||
while (isActive) {
|
|
||||||
send(producer.receive())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
abstract class AbstractProcessor<T, R>(scope: CoroutineScope) : Processor<T, R>, AbstractProducer<R>(scope) {
|
|
||||||
|
|
||||||
override var producer: Producer<T>? = null
|
|
||||||
protected set
|
|
||||||
|
|
||||||
override fun connect(producer: Producer<T>) {
|
|
||||||
//Ignore if already connected to specific consumer
|
|
||||||
if (producer != this.producer) {
|
|
||||||
if (inputIsConnected) error("The input slot of consumer is occupied")
|
|
||||||
if (producer.outputIsConnected) error("The input slot of producer is occupied")
|
|
||||||
this.producer = producer
|
|
||||||
//No need to save the job, it will be canceled on scope cancel
|
|
||||||
if (producer.consumer != null) {
|
|
||||||
connectInput(producer)
|
|
||||||
// connect back
|
|
||||||
producer.connect(this)
|
|
||||||
} else error("Unreachable statement")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected open fun connectInput(producer: Producer<T>) {
|
|
||||||
launch {
|
|
||||||
while (isActive) {
|
|
||||||
send(producer.receive())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A simple [produce]-based producer
|
|
||||||
*/
|
|
||||||
@ExperimentalCoroutinesApi
|
|
||||||
class GenericProducer<T>(
|
|
||||||
scope: CoroutineScope,
|
|
||||||
capacity: Int = Channel.UNLIMITED,
|
|
||||||
block: suspend ProducerScope<T>.() -> Unit
|
|
||||||
) : AbstractProducer<T>(scope) {
|
|
||||||
|
|
||||||
private val channel: ReceiveChannel<T> by lazy { produce(capacity = capacity, block = block) }
|
|
||||||
|
|
||||||
override suspend fun receive(): T = channel.receive()
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A simple pipeline [Processor] block
|
|
||||||
*/
|
|
||||||
class PipeProcessor<T, R>(
|
|
||||||
scope: CoroutineScope,
|
|
||||||
capacity: Int = Channel.RENDEZVOUS,
|
|
||||||
process: suspend (T) -> R
|
|
||||||
) : AbstractProcessor<T, R>(scope) {
|
|
||||||
|
|
||||||
private val input = Channel<T>(capacity)
|
|
||||||
private val output: ReceiveChannel<R> = input.map(coroutineContext, process)
|
|
||||||
|
|
||||||
override suspend fun receive(): R = output.receive()
|
|
||||||
|
|
||||||
override suspend fun send(value: T) {
|
|
||||||
input.send(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A moving window [Processor] with circular buffer
|
|
||||||
*/
|
|
||||||
class WindowedProcessor<T, R>(
|
|
||||||
scope: CoroutineScope,
|
|
||||||
window: Int,
|
|
||||||
val process: suspend (Buffer<T?>) -> R
|
|
||||||
) : AbstractProcessor<T, R>(scope) {
|
|
||||||
|
|
||||||
private val ringBuffer = RingBuffer.boxing<T>(window)
|
|
||||||
|
|
||||||
private val channel = Channel<R>(Channel.RENDEZVOUS)
|
|
||||||
|
|
||||||
override suspend fun receive(): R {
|
|
||||||
return channel.receive()
|
|
||||||
}
|
|
||||||
|
|
||||||
override suspend fun send(value: T) {
|
|
||||||
ringBuffer.push(value)
|
|
||||||
channel.send(process(ringBuffer.snapshot()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Thread-safe aggregator of values from input. The aggregator does not store all incoming values, it uses fold procedure
|
|
||||||
* to incorporate them into state on-arrival.
|
|
||||||
* The current aggregated state could be accessed by [state]. The input channel is inactive unless requested
|
|
||||||
* @param T - the type of the input element
|
|
||||||
* @param S - the type of the aggregator
|
|
||||||
*/
|
|
||||||
class Reducer<T, S>(
|
|
||||||
scope: CoroutineScope,
|
|
||||||
initialState: S,
|
|
||||||
val fold: suspend (S, T) -> S
|
|
||||||
) : AbstractConsumer<T>(scope) {
|
|
||||||
|
|
||||||
var state: S = initialState
|
|
||||||
private set
|
|
||||||
|
|
||||||
private val mutex = Mutex()
|
|
||||||
|
|
||||||
override suspend fun send(value: T) = mutex.withLock {
|
|
||||||
state = fold(state, value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Collector that accumulates all values in a list. List could be accessed from non-suspending environment via [list] value.
|
|
||||||
*/
|
|
||||||
class Collector<T>(scope: CoroutineScope) : AbstractConsumer<T>(scope) {
|
|
||||||
|
|
||||||
private val _list = ArrayList<T>()
|
|
||||||
private val mutex = Mutex()
|
|
||||||
val list: List<T> get() = _list
|
|
||||||
|
|
||||||
override suspend fun send(value: T) {
|
|
||||||
mutex.withLock {
|
|
||||||
_list.add(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert a sequence to [Producer]
|
|
||||||
*/
|
|
||||||
fun <T> Sequence<T>.produce(scope: CoroutineScope = GlobalScope) =
|
|
||||||
GenericProducer<T>(scope) { forEach { send(it) } }
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert a [ReceiveChannel] to [Producer]
|
|
||||||
*/
|
|
||||||
fun <T> ReceiveChannel<T>.produce(scope: CoroutineScope = GlobalScope) =
|
|
||||||
GenericProducer<T>(scope) { for (e in this@produce) send(e) }
|
|
||||||
|
|
||||||
|
|
||||||
fun <T, C : Consumer<T>> Producer<T>.consumer(consumerFactory: () -> C): C =
|
|
||||||
consumerFactory().also { connect(it) }
|
|
||||||
|
|
||||||
fun <T, R> Producer<T>.map(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) =
|
|
||||||
PipeProcessor(this, capacity, process).also { connect(it) }
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a reducer and connect this producer to reducer
|
|
||||||
*/
|
|
||||||
fun <T, S> Producer<T>.reduce(initialState: S, fold: suspend (S, T) -> S) =
|
|
||||||
Reducer(this, initialState, fold).also { connect(it) }
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a [Collector] and attach it to this [Producer]
|
|
||||||
*/
|
|
||||||
fun <T> Producer<T>.collect() =
|
|
||||||
Collector<T>(this).also { connect(it) }
|
|
||||||
|
|
||||||
fun <T, R, P : Processor<T, R>> Producer<T>.process(processorBuilder: () -> P): P =
|
|
||||||
processorBuilder().also { connect(it) }
|
|
||||||
|
|
||||||
fun <T, R> Producer<T>.process(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) =
|
|
||||||
PipeProcessor<T, R>(this, capacity, process).also { connect(it) }
|
|
||||||
|
|
||||||
|
|
||||||
fun <T, R> Producer<T>.windowed(window: Int, process: suspend (Buffer<T?>) -> R) =
|
|
||||||
WindowedProcessor(this, window, process).also { connect(it) }
|
|
@ -1,5 +1,6 @@
|
|||||||
package scientifik.kmath.streaming
|
package scientifik.kmath.streaming
|
||||||
|
|
||||||
|
import kotlinx.coroutines.flow.*
|
||||||
import kotlinx.coroutines.runBlocking
|
import kotlinx.coroutines.runBlocking
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import scientifik.kmath.structures.asSequence
|
import scientifik.kmath.structures.asSequence
|
||||||
@ -7,7 +8,7 @@ import kotlin.test.assertEquals
|
|||||||
|
|
||||||
class RingBufferTest {
|
class RingBufferTest {
|
||||||
@Test
|
@Test
|
||||||
fun testPush() {
|
fun push() {
|
||||||
val buffer = RingBuffer.build(20, Double.NaN)
|
val buffer = RingBuffer.build(20, Double.NaN)
|
||||||
runBlocking {
|
runBlocking {
|
||||||
for (i in 1..30) {
|
for (i in 1..30) {
|
||||||
@ -16,4 +17,22 @@ class RingBufferTest {
|
|||||||
assertEquals(410.0, buffer.asSequence().sum())
|
assertEquals(410.0, buffer.asSequence().sum())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun windowed(){
|
||||||
|
val flow = flow{
|
||||||
|
var i = 0
|
||||||
|
while(true){
|
||||||
|
emit(i++)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
val windowed = flow.windowed(10)
|
||||||
|
runBlocking {
|
||||||
|
val first = windowed.take(1).single()
|
||||||
|
val res = windowed.take(15).map { it -> it.asSequence().average() }.toList()
|
||||||
|
assertEquals(0.0, res[0])
|
||||||
|
assertEquals(4.5, res[9])
|
||||||
|
assertEquals(9.5, res[14])
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user