Fix Mean bug

This commit is contained in:
Alexander Nozik 2021-10-04 12:40:30 +03:00
parent abae29bbed
commit fd8a61c852
7 changed files with 72 additions and 30 deletions

View File

@ -14,7 +14,6 @@ import space.kscience.kmath.ejml.EjmlLinearSpaceDDRM
import space.kscience.kmath.linear.invoke import space.kscience.kmath.linear.invoke
import space.kscience.kmath.linear.linearSpace import space.kscience.kmath.linear.linearSpace
import space.kscience.kmath.operations.DoubleField import space.kscience.kmath.operations.DoubleField
import space.kscience.kmath.operations.algebra
import space.kscience.kmath.structures.Buffer import space.kscience.kmath.structures.Buffer
import kotlin.random.Random import kotlin.random.Random
@ -25,11 +24,11 @@ internal class DotBenchmark {
const val dim = 1000 const val dim = 1000
//creating invertible matrix //creating invertible matrix
val matrix1 = Double.algebra.linearSpace.buildMatrix(dim, dim) { i, j -> val matrix1 = DoubleField.linearSpace.buildMatrix(dim, dim) { _, _ ->
if (i <= j) random.nextDouble() else 0.0 random.nextDouble()
} }
val matrix2 = Double.algebra.linearSpace.buildMatrix(dim, dim) { i, j -> val matrix2 = DoubleField.linearSpace.buildMatrix(dim, dim) { _, _ ->
if (i <= j) random.nextDouble() else 0.0 random.nextDouble()
} }
val cmMatrix1 = CMLinearSpace { matrix1.toCM() } val cmMatrix1 = CMLinearSpace { matrix1.toCM() }
@ -65,7 +64,7 @@ internal class DotBenchmark {
} }
@Benchmark @Benchmark
fun doubleDot(blackhole: Blackhole) = with(Double.algebra.linearSpace) { fun doubleDot(blackhole: Blackhole) = with(DoubleField.linearSpace) {
blackhole.consume(matrix1 dot matrix2) blackhole.consume(matrix1 dot matrix2)
} }
} }

View File

@ -75,8 +75,9 @@ internal class ExpressionsInterpretersBenchmark {
private val algebra = DoubleField private val algebra = DoubleField
private const val times = 1_000_000 private const val times = 1_000_000
private val functional = DoubleField.expressionInExtendedField { private val functional = DoubleField.expression {
bindSymbol(x) * number(2.0) + number(2.0) / bindSymbol(x) - number(16.0) / sin(bindSymbol(x)) val x = bindSymbol(Symbol.x)
x * number(2.0) + 2.0 / x - 16.0 / sin(x)
} }
private val node = MstExtendedField { private val node = MstExtendedField {

View File

@ -192,3 +192,7 @@ public inline fun <T, A : Field<T>> A.expressionInField(
public inline fun <T, A : ExtendedField<T>> A.expressionInExtendedField( public inline fun <T, A : ExtendedField<T>> A.expressionInExtendedField(
block: FunctionalExpressionExtendedField<T, A>.() -> Expression<T>, block: FunctionalExpressionExtendedField<T, A>.() -> Expression<T>,
): Expression<T> = FunctionalExpressionExtendedField(this).block() ): Expression<T> = FunctionalExpressionExtendedField(this).block()
public inline fun DoubleField.expression(
block: FunctionalExpressionExtendedField<Double, DoubleField>.() -> Expression<Double>,
): Expression<Double> = FunctionalExpressionExtendedField(this).block()

View File

@ -13,7 +13,6 @@ import space.kscience.kmath.operations.DoubleBufferOperations
import space.kscience.kmath.operations.DoubleField import space.kscience.kmath.operations.DoubleField
import space.kscience.kmath.structures.Buffer import space.kscience.kmath.structures.Buffer
import space.kscience.kmath.structures.DoubleBuffer import space.kscience.kmath.structures.DoubleBuffer
import space.kscience.kmath.structures.indices
public object DoubleLinearSpace : LinearSpace<Double, DoubleField> { public object DoubleLinearSpace : LinearSpace<Double, DoubleField> {
@ -30,7 +29,6 @@ public object DoubleLinearSpace : LinearSpace<Double, DoubleField> {
initializer: DoubleField.(i: Int, j: Int) -> Double initializer: DoubleField.(i: Int, j: Int) -> Double
): Matrix<Double> = ndRing(rows, columns).produce { (i, j) -> DoubleField.initializer(i, j) }.as2D() ): Matrix<Double> = ndRing(rows, columns).produce { (i, j) -> DoubleField.initializer(i, j) }.as2D()
override fun buildVector(size: Int, initializer: DoubleField.(Int) -> Double): DoubleBuffer = override fun buildVector(size: Int, initializer: DoubleField.(Int) -> Double): DoubleBuffer =
DoubleBuffer(size) { DoubleField.initializer(it) } DoubleBuffer(size) { DoubleField.initializer(it) }
@ -50,9 +48,9 @@ public object DoubleLinearSpace : LinearSpace<Double, DoubleField> {
// Create a continuous in-memory representation of this vector for better memory layout handling // Create a continuous in-memory representation of this vector for better memory layout handling
private fun Buffer<Double>.linearize() = if (this is DoubleBuffer) { private fun Buffer<Double>.linearize() = if (this is DoubleBuffer) {
this this.array
} else { } else {
DoubleBuffer(size) { get(it) } DoubleArray(size) { get(it) }
} }
@OptIn(PerformancePitfall::class) @OptIn(PerformancePitfall::class)

View File

@ -27,8 +27,13 @@ public class Mean<T>(
override suspend fun evaluate(data: Buffer<T>): T = super<ComposableStatistic>.evaluate(data) override suspend fun evaluate(data: Buffer<T>): T = super<ComposableStatistic>.evaluate(data)
override suspend fun computeIntermediate(data: Buffer<T>): Pair<T, Int> = override suspend fun computeIntermediate(data: Buffer<T>): Pair<T, Int> = group {
evaluateBlocking(data) to data.size var res = zero
for (i in data.indices) {
res += data[i]
}
res to data.size
}
override suspend fun composeIntermediate(first: Pair<T, Int>, second: Pair<T, Int>): Pair<T, Int> = override suspend fun composeIntermediate(first: Pair<T, Int>, second: Pair<T, Int>): Pair<T, Int> =
group { first.first + second.first } to (first.second + second.second) group { first.first + second.first } to (first.second + second.second)
@ -38,9 +43,11 @@ public class Mean<T>(
} }
public companion object { public companion object {
//TODO replace with optimized version which respects overflow @Deprecated("Use Double.mean instead")
public val double: Mean<Double> = Mean(DoubleField) { sum, count -> sum / count } public val double: Mean<Double> = Mean(DoubleField) { sum, count -> sum / count }
@Deprecated("Use Int.mean instead")
public val int: Mean<Int> = Mean(IntRing) { sum, count -> sum / count } public val int: Mean<Int> = Mean(IntRing) { sum, count -> sum / count }
@Deprecated("Use Long.mean instead")
public val long: Mean<Long> = Mean(LongRing) { sum, count -> sum / count } public val long: Mean<Long> = Mean(LongRing) { sum, count -> sum / count }
public fun evaluate(buffer: Buffer<Double>): Double = double.evaluateBlocking(buffer) public fun evaluate(buffer: Buffer<Double>): Double = double.evaluateBlocking(buffer)
@ -48,3 +55,11 @@ public class Mean<T>(
public fun evaluate(buffer: Buffer<Long>): Long = long.evaluateBlocking(buffer) public fun evaluate(buffer: Buffer<Long>): Long = long.evaluateBlocking(buffer)
} }
} }
//TODO replace with optimized version which respects overflow
public val Double.Companion.mean: Mean<Double> get() = Mean(DoubleField) { sum, count -> sum / count }
public val Int.Companion.mean: Mean<Int> get() = Mean(IntRing) { sum, count -> sum / count }
public val Long.Companion.mean: Mean<Long> get() = Mean(LongRing) { sum, count -> sum / count }

View File

@ -8,7 +8,6 @@ package space.kscience.kmath.stat
import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.runningReduce import kotlinx.coroutines.flow.runningReduce
@ -18,16 +17,23 @@ import space.kscience.kmath.structures.Buffer
/** /**
* A function, that transforms a buffer of random quantities to some resulting value * A function, that transforms a buffer of random quantities to some resulting value
*/ */
public interface Statistic<in T, out R> { public fun interface Statistic<in T, out R> {
public suspend fun evaluate(data: Buffer<T>): R public suspend fun evaluate(data: Buffer<T>): R
} }
public interface BlockingStatistic<in T, out R> : Statistic<T, R> { public suspend operator fun <T, R> Statistic<T, R>.invoke(data: Buffer<T>): R = evaluate(data)
/**
* A statistic that is computed in a synchronous blocking mode
*/
public fun interface BlockingStatistic<in T, out R> : Statistic<T, R> {
public fun evaluateBlocking(data: Buffer<T>): R public fun evaluateBlocking(data: Buffer<T>): R
override suspend fun evaluate(data: Buffer<T>): R = evaluateBlocking(data) override suspend fun evaluate(data: Buffer<T>): R = evaluateBlocking(data)
} }
public operator fun <T, R> BlockingStatistic<T, R>.invoke(data: Buffer<T>): R = evaluateBlocking(data)
/** /**
* A statistic tha could be computed separately on different blocks of data and then composed * A statistic tha could be computed separately on different blocks of data and then composed
* *
@ -48,8 +54,10 @@ public interface ComposableStatistic<in T, I, out R> : Statistic<T, R> {
override suspend fun evaluate(data: Buffer<T>): R = toResult(computeIntermediate(data)) override suspend fun evaluate(data: Buffer<T>): R = toResult(computeIntermediate(data))
} }
@FlowPreview /**
@ExperimentalCoroutinesApi * Flow intermediate state of the [ComposableStatistic]
*/
@OptIn(ExperimentalCoroutinesApi::class)
private fun <T, I, R> ComposableStatistic<T, I, R>.flowIntermediate( private fun <T, I, R> ComposableStatistic<T, I, R>.flowIntermediate(
flow: Flow<Buffer<T>>, flow: Flow<Buffer<T>>,
dispatcher: CoroutineDispatcher = Dispatchers.Default, dispatcher: CoroutineDispatcher = Dispatchers.Default,
@ -64,7 +72,7 @@ private fun <T, I, R> ComposableStatistic<T, I, R>.flowIntermediate(
* *
* The resulting flow contains values that include the whole previous statistics, not only the last chunk. * The resulting flow contains values that include the whole previous statistics, not only the last chunk.
*/ */
@OptIn(FlowPreview::class, ExperimentalCoroutinesApi::class) @OptIn(ExperimentalCoroutinesApi::class)
public fun <T, I, R> ComposableStatistic<T, I, R>.flow( public fun <T, I, R> ComposableStatistic<T, I, R>.flow(
flow: Flow<Buffer<T>>, flow: Flow<Buffer<T>>,
dispatcher: CoroutineDispatcher = Dispatchers.Default, dispatcher: CoroutineDispatcher = Dispatchers.Default,

View File

@ -5,11 +5,13 @@
package space.kscience.kmath.stat package space.kscience.kmath.stat
import kotlinx.coroutines.flow.drop
import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.last
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import space.kscience.kmath.streaming.chunked import space.kscience.kmath.streaming.chunked
import kotlin.test.Test import kotlin.test.Test
import kotlin.test.assertEquals
internal class StatisticTest { internal class StatisticTest {
//create a random number generator. //create a random number generator.
@ -22,12 +24,27 @@ internal class StatisticTest {
val chunked = data.chunked(1000) val chunked = data.chunked(1000)
@Test @Test
fun testParallelMean() = runBlocking { fun singleBlockingMean() {
val average = Mean.double val first = runBlocking { chunked.first()}
.flow(chunked) //create a flow with results val res = Double.mean(first)
.drop(99) // Skip first 99 values and use one with total data assertEquals(0.5,res, 1e-1)
.first() //get 1e5 data samples average }
println(average) @Test
fun singleSuspendMean() = runBlocking {
val first = runBlocking { chunked.first()}
val res = Double.mean(first)
assertEquals(0.5,res, 1e-1)
} }
@Test
fun parallelMean() = runBlocking {
val average = Double.mean
.flow(chunked) //create a flow from evaluated results
.take(100) // Take 100 data chunks from the source and accumulate them
.last() //get 1e5 data samples average
assertEquals(0.5,average, 1e-3)
}
} }