Fixes in flow transformations

This commit is contained in:
Alexander Nozik 2019-04-30 09:15:31 +03:00
parent f79a9e86a1
commit fcab05b683
3 changed files with 39 additions and 9 deletions

View File

@ -5,6 +5,8 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import org.apache.commons.math3.transform.*
import scientifik.kmath.operations.Complex
import scientifik.kmath.streaming.chunked
import scientifik.kmath.streaming.spread
import scientifik.kmath.structures.*
@ -33,40 +35,40 @@ object Transformations {
fun fourier(
normalization: DftNormalization = DftNormalization.STANDARD,
direction: TransformType = TransformType.FORWARD
): BufferTransform<Complex, Complex> = {
): SuspendBufferTransform<Complex, Complex> = {
FastFourierTransformer(normalization).transform(it.toArray(), direction).asBuffer()
}
fun realFourier(
normalization: DftNormalization = DftNormalization.STANDARD,
direction: TransformType = TransformType.FORWARD
): BufferTransform<Double, Complex> = {
): SuspendBufferTransform<Double, Complex> = {
FastFourierTransformer(normalization).transform(it.asArray(), direction).asBuffer()
}
fun sine(
normalization: DstNormalization = DstNormalization.STANDARD_DST_I,
direction: TransformType = TransformType.FORWARD
): BufferTransform<Double, Double> = {
): SuspendBufferTransform<Double, Double> = {
FastSineTransformer(normalization).transform(it.asArray(), direction).asBuffer()
}
fun cosine(
normalization: DctNormalization = DctNormalization.STANDARD_DCT_I,
direction: TransformType = TransformType.FORWARD
): BufferTransform<Double, Double> = {
): SuspendBufferTransform<Double, Double> = {
FastCosineTransformer(normalization).transform(it.asArray(), direction).asBuffer()
}
fun hadamard(
direction: TransformType = TransformType.FORWARD
): BufferTransform<Double, Double> = {
): SuspendBufferTransform<Double, Double> = {
FastHadamardTransformer().transform(it.asArray(), direction).asBuffer()
}
}
/**
* Process given [Producer] with commons-math fft transformation
* Process given [Flow] with commons-math fft transformation
*/
@FlowPreview
fun Flow<Buffer<Complex>>.FFT(
@ -84,5 +86,24 @@ fun Flow<Buffer<Double>>.FFT(
direction: TransformType = TransformType.FORWARD
): Flow<Buffer<Complex>> {
val transform = Transformations.realFourier(normalization, direction)
return map { transform(it) }
}
return map(transform)
}
/**
* Process a continous flow of real numbers in FFT splitting it in chunks of [bufferSize].
*/
@FlowPreview
@JvmName("realFFT")
fun Flow<Double>.FFT(
bufferSize: Int = Int.MAX_VALUE,
normalization: DftNormalization = DftNormalization.STANDARD,
direction: TransformType = TransformType.FORWARD
): Flow<Complex> {
return chunked(bufferSize).FFT(normalization,direction).spread()
}
/**
* Map a complex flow into real flow by taking real part of each number
*/
@FlowPreview
fun Flow<Complex>.real(): Flow<Double> = map{it.re}

View File

@ -277,4 +277,6 @@ fun <T> Buffer<T>.asReadOnly(): Buffer<T> = if (this is MutableBuffer) {
/**
* Typealias for buffer transformations
*/
typealias BufferTransform<T, R> = (Buffer<T>) -> Buffer<R>
typealias BufferTransform<T, R> = (Buffer<T>) -> Buffer<R>
typealias SuspendBufferTransform<T, R> = suspend (Buffer<T>) -> Buffer<R>

View File

@ -37,6 +37,9 @@ fun <T> Flow<T>.chunked(bufferSize: Int, bufferFactory: BufferFactory<T>) = flow
counter = 0
}
}
if (counter > 0) {
emit(bufferFactory(counter) { list[it] })
}
}
/**
@ -54,8 +57,12 @@ fun Flow<Double>.chunked(bufferSize: Int) = flow {
if (counter == bufferSize) {
val buffer = DoubleBuffer(array)
emit(buffer)
counter = 0
}
}
if (counter > 0) {
emit(DoubleBuffer(counter) { array[it] })
}
}
/**