FFT buffer transformations for commons-math

This commit is contained in:
Alexander Nozik 2019-02-15 20:00:42 +03:00
parent cdcba85ada
commit 815066cf6c
6 changed files with 111 additions and 27 deletions

View File

@ -0,0 +1 @@
**TODO**

View File

@ -6,6 +6,7 @@ description = "Commons math binding for kmath"
dependencies { dependencies {
api(project(":kmath-core")) api(project(":kmath-core"))
api(project(":kmath-sequential"))
api("org.apache.commons:commons-math3:3.6.1") api("org.apache.commons:commons-math3:3.6.1")
testImplementation("org.jetbrains.kotlin:kotlin-test") testImplementation("org.jetbrains.kotlin:kotlin-test")
testImplementation("org.jetbrains.kotlin:kotlin-test-junit") testImplementation("org.jetbrains.kotlin:kotlin-test-junit")

View File

@ -0,0 +1,63 @@
package scientifik.kmath.transform
import org.apache.commons.math3.transform.*
import scientifik.kmath.operations.Complex
import scientifik.kmath.structures.*
/**
*
*/
object Transformations {
private fun Buffer<Complex>.toArray(): Array<org.apache.commons.math3.complex.Complex> =
Array(size) { org.apache.commons.math3.complex.Complex(get(it).re, get(it).im) }
private fun Buffer<Double>.asArray() = if (this is DoubleBuffer) {
array
} else {
DoubleArray(size) { i -> get(i) }
}
/**
* Create a virtual buffer on top of array
*/
private fun Array<org.apache.commons.math3.complex.Complex>.asBuffer() = VirtualBuffer<Complex>(size) {
val value = get(it)
Complex(value.real, value.imaginary)
}
fun fourier(
normalization: DftNormalization = DftNormalization.STANDARD,
direction: TransformType = TransformType.FORWARD
): BufferTransform<Complex, Complex> = {
FastFourierTransformer(normalization).transform(it.toArray(), direction).asBuffer()
}
fun realFourier(
normalization: DftNormalization = DftNormalization.STANDARD,
direction: TransformType = TransformType.FORWARD
): BufferTransform<Double, Complex> = {
FastFourierTransformer(normalization).transform(it.asArray(), direction).asBuffer()
}
fun sine(
normalization: DstNormalization = DstNormalization.STANDARD_DST_I,
direction: TransformType = TransformType.FORWARD
): BufferTransform<Double, Double> = {
FastSineTransformer(normalization).transform(it.asArray(), direction).asBuffer()
}
fun cosine(
normalization: DctNormalization = DctNormalization.STANDARD_DCT_I,
direction: TransformType = TransformType.FORWARD
): BufferTransform<Double, Double> = {
FastCosineTransformer(normalization).transform(it.asArray(), direction).asBuffer()
}
fun hadamard(
direction: TransformType = TransformType.FORWARD
): BufferTransform<Double, Double> = {
FastHadamardTransformer().transform(it.asArray(), direction).asBuffer()
}
}

View File

@ -154,7 +154,7 @@ inline class DoubleBuffer(val array: DoubleArray) : MutableBuffer<Double> {
array[index] = value array[index] = value
} }
override fun iterator(): Iterator<Double> = array.iterator() override fun iterator() = array.iterator()
override fun copy(): MutableBuffer<Double> = DoubleBuffer(array.copyOf()) override fun copy(): MutableBuffer<Double> = DoubleBuffer(array.copyOf())
} }
@ -162,7 +162,6 @@ inline class DoubleBuffer(val array: DoubleArray) : MutableBuffer<Double> {
@Suppress("FunctionName") @Suppress("FunctionName")
inline fun DoubleBuffer(size: Int, init: (Int) -> Double) = DoubleBuffer(DoubleArray(size) { init(it) }) inline fun DoubleBuffer(size: Int, init: (Int) -> Double) = DoubleBuffer(DoubleArray(size) { init(it) })
/** /**
* Transform buffer of doubles into array for high performance operations * Transform buffer of doubles into array for high performance operations
*/ */
@ -184,7 +183,7 @@ inline class ShortBuffer(val array: ShortArray) : MutableBuffer<Short> {
array[index] = value array[index] = value
} }
override fun iterator(): Iterator<Short> = array.iterator() override fun iterator() = array.iterator()
override fun copy(): MutableBuffer<Short> = ShortBuffer(array.copyOf()) override fun copy(): MutableBuffer<Short> = ShortBuffer(array.copyOf())
@ -201,7 +200,7 @@ inline class IntBuffer(val array: IntArray) : MutableBuffer<Int> {
array[index] = value array[index] = value
} }
override fun iterator(): Iterator<Int> = array.iterator() override fun iterator() = array.iterator()
override fun copy(): MutableBuffer<Int> = IntBuffer(array.copyOf()) override fun copy(): MutableBuffer<Int> = IntBuffer(array.copyOf())
@ -218,7 +217,7 @@ inline class LongBuffer(val array: LongArray) : MutableBuffer<Long> {
array[index] = value array[index] = value
} }
override fun iterator(): Iterator<Long> = array.iterator() override fun iterator() = array.iterator()
override fun copy(): MutableBuffer<Long> = LongBuffer(array.copyOf()) override fun copy(): MutableBuffer<Long> = LongBuffer(array.copyOf())
@ -231,7 +230,7 @@ inline class ReadOnlyBuffer<T>(val buffer: MutableBuffer<T>) : Buffer<T> {
override fun get(index: Int): T = buffer.get(index) override fun get(index: Int): T = buffer.get(index)
override fun iterator(): Iterator<T> = buffer.iterator() override fun iterator() = buffer.iterator()
} }
/** /**
@ -260,3 +259,8 @@ fun <T> Buffer<T>.asReadOnly(): Buffer<T> = if (this is MutableBuffer) {
} else { } else {
this this
} }
/**
* Typealias for buffer transformations
*/
typealias BufferTransform<T, R> = (Buffer<T>) -> Buffer<R>

View File

@ -0,0 +1,7 @@
package scientifik.kmath.sequential
import kotlinx.coroutines.CoroutineScope
//class FFTProcessor(scope: CoroutineScope): AbstractDoubleProcessor(scope){
//
//}

View File

@ -7,13 +7,22 @@ import kotlinx.coroutines.channels.*
import kotlinx.coroutines.isActive import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import scientifik.kmath.structures.Buffer
import scientifik.kmath.structures.asBuffer
import scientifik.kmath.structures.asSequence
fun <T> Buffer<T>.asChannel(scope: CoroutineScope): ReceiveChannel<T> = scope.produce {
for (i in (0 until size)) {
send(get(i))
}
}
interface DoubleProducer : Producer<Double> { interface DoubleProducer : Producer<Double> {
suspend fun receiveArray(): DoubleArray suspend fun receiveArray(): Buffer<Double>
} }
interface DoubleConsumer : Consumer<Double> { interface DoubleConsumer : Consumer<Double> {
suspend fun sendArray(array: DoubleArray) suspend fun sendArray(array: Buffer<Double>)
} }
abstract class AbstractDoubleProducer(scope: CoroutineScope) : AbstractProducer<Double>(scope), DoubleProducer { abstract class AbstractDoubleProducer(scope: CoroutineScope) : AbstractProducer<Double>(scope), DoubleProducer {
@ -79,15 +88,15 @@ abstract class AbstractDoubleProcessor(scope: CoroutineScope) : AbstractProcesso
class BasicDoubleProducer( class BasicDoubleProducer(
scope: CoroutineScope, scope: CoroutineScope,
capacity: Int = Channel.UNLIMITED, capacity: Int = Channel.UNLIMITED,
block: suspend ProducerScope<DoubleArray>.() -> Unit block: suspend ProducerScope<Buffer<Double>>.() -> Unit
) : AbstractDoubleProducer(scope) { ) : AbstractDoubleProducer(scope) {
private val currentArray = atomic<ReceiveChannel<Double>?>(null) private val currentArray = atomic<ReceiveChannel<Double>?>(null)
private val channel: ReceiveChannel<DoubleArray> by lazy { produce(capacity = capacity, block = block) } private val channel: ReceiveChannel<Buffer<Double>> by lazy { produce(capacity = capacity, block = block) }
private val cachingChannel by lazy { private val cachingChannel by lazy {
channel.map { channel.map {
it.also { doubles -> currentArray.lazySet(doubles.asChannel()) } it.also { doubles -> currentArray.lazySet(doubles.asChannel(this)) }
} }
} }
@ -97,40 +106,39 @@ class BasicDoubleProducer(
} }
} }
override suspend fun receiveArray(): DoubleArray = cachingChannel.receive() override suspend fun receiveArray(): Buffer<Double> = cachingChannel.receive()
override suspend fun receive(): Double = (currentArray.value ?: cachingChannel.receive().asChannel()).receive() override suspend fun receive(): Double = (currentArray.value ?: cachingChannel.receive().asChannel(this)).receive()
} }
class DoubleReducer<S>( class DoubleReducer<S>(
scope: CoroutineScope, scope: CoroutineScope,
initialState: S, initialState: S,
val fold: suspend (S, DoubleArray) -> S val fold: suspend (S, Buffer<Double>) -> S
) : AbstractDoubleConsumer(scope) { ) : AbstractDoubleConsumer(scope) {
var state: S = initialState var state: S = initialState
private set private set
private val mutex = Mutex() override suspend fun sendArray(array: Buffer<Double>) {
override suspend fun sendArray(array: DoubleArray) {
state = fold(state, array) state = fold(state, array)
} }
override suspend fun send(value: Double) = sendArray(doubleArrayOf(value)) override suspend fun send(value: Double) = sendArray(doubleArrayOf(value).asBuffer())
} }
/** /**
* Convert an array to single element producer, splitting it in chunks if necessary * Convert an array to single element producer, splitting it in chunks if necessary
*/ */
fun DoubleArray.produce(scope: CoroutineScope = GlobalScope, chunkSize: Int = Int.MAX_VALUE) = if (size < chunkSize) { fun Buffer<Double>.produce(scope: CoroutineScope = GlobalScope, chunkSize: Int = Int.MAX_VALUE) =
if (size < chunkSize) {
BasicDoubleProducer(scope) { send(this@produce) } BasicDoubleProducer(scope) { send(this@produce) }
} else { } else {
BasicDoubleProducer(scope) { BasicDoubleProducer(scope) {
//TODO optimize this! //TODO optimize this!
asSequence().chunked(chunkSize).forEach { asSequence().chunked(chunkSize).forEach {
send(it.toDoubleArray()) send(it.asBuffer())
}
} }
} }
}