diff --git a/kmath-commons/src/main/kotlin/scientifik/kmath/transform/Transformations.kt b/kmath-commons/src/main/kotlin/scientifik/kmath/transform/Transformations.kt index 113b8ea75..ac6922262 100644 --- a/kmath-commons/src/main/kotlin/scientifik/kmath/transform/Transformations.kt +++ b/kmath-commons/src/main/kotlin/scientifik/kmath/transform/Transformations.kt @@ -5,6 +5,8 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map import org.apache.commons.math3.transform.* import scientifik.kmath.operations.Complex +import scientifik.kmath.streaming.chunked +import scientifik.kmath.streaming.spread import scientifik.kmath.structures.* @@ -33,40 +35,40 @@ object Transformations { fun fourier( normalization: DftNormalization = DftNormalization.STANDARD, direction: TransformType = TransformType.FORWARD - ): BufferTransform = { + ): SuspendBufferTransform = { FastFourierTransformer(normalization).transform(it.toArray(), direction).asBuffer() } fun realFourier( normalization: DftNormalization = DftNormalization.STANDARD, direction: TransformType = TransformType.FORWARD - ): BufferTransform = { + ): SuspendBufferTransform = { FastFourierTransformer(normalization).transform(it.asArray(), direction).asBuffer() } fun sine( normalization: DstNormalization = DstNormalization.STANDARD_DST_I, direction: TransformType = TransformType.FORWARD - ): BufferTransform = { + ): SuspendBufferTransform = { FastSineTransformer(normalization).transform(it.asArray(), direction).asBuffer() } fun cosine( normalization: DctNormalization = DctNormalization.STANDARD_DCT_I, direction: TransformType = TransformType.FORWARD - ): BufferTransform = { + ): SuspendBufferTransform = { FastCosineTransformer(normalization).transform(it.asArray(), direction).asBuffer() } fun hadamard( direction: TransformType = TransformType.FORWARD - ): BufferTransform = { + ): SuspendBufferTransform = { FastHadamardTransformer().transform(it.asArray(), direction).asBuffer() } } /** - * Process given [Producer] with commons-math fft transformation + * Process given [Flow] with commons-math fft transformation */ @FlowPreview fun Flow>.FFT( @@ -84,5 +86,24 @@ fun Flow>.FFT( direction: TransformType = TransformType.FORWARD ): Flow> { val transform = Transformations.realFourier(normalization, direction) - return map { transform(it) } -} \ No newline at end of file + return map(transform) +} + +/** + * Process a continous flow of real numbers in FFT splitting it in chunks of [bufferSize]. + */ +@FlowPreview +@JvmName("realFFT") +fun Flow.FFT( + bufferSize: Int = Int.MAX_VALUE, + normalization: DftNormalization = DftNormalization.STANDARD, + direction: TransformType = TransformType.FORWARD +): Flow { + return chunked(bufferSize).FFT(normalization,direction).spread() +} + +/** + * Map a complex flow into real flow by taking real part of each number + */ +@FlowPreview +fun Flow.real(): Flow = map{it.re} \ No newline at end of file diff --git a/kmath-core/src/commonMain/kotlin/scientifik/kmath/structures/Buffers.kt b/kmath-core/src/commonMain/kotlin/scientifik/kmath/structures/Buffers.kt index 4b127a5b4..e521df86e 100644 --- a/kmath-core/src/commonMain/kotlin/scientifik/kmath/structures/Buffers.kt +++ b/kmath-core/src/commonMain/kotlin/scientifik/kmath/structures/Buffers.kt @@ -277,4 +277,6 @@ fun Buffer.asReadOnly(): Buffer = if (this is MutableBuffer) { /** * Typealias for buffer transformations */ -typealias BufferTransform = (Buffer) -> Buffer \ No newline at end of file +typealias BufferTransform = (Buffer) -> Buffer + +typealias SuspendBufferTransform = suspend (Buffer) -> Buffer \ No newline at end of file diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt index aa0aca460..d5e94fc3b 100644 --- a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt @@ -37,6 +37,9 @@ fun Flow.chunked(bufferSize: Int, bufferFactory: BufferFactory) = flow counter = 0 } } + if (counter > 0) { + emit(bufferFactory(counter) { list[it] }) + } } /** @@ -54,8 +57,12 @@ fun Flow.chunked(bufferSize: Int) = flow { if (counter == bufferSize) { val buffer = DoubleBuffer(array) emit(buffer) + counter = 0 } } + if (counter > 0) { + emit(DoubleBuffer(counter) { array[it] }) + } } /**