fix mean statistics composition

This commit is contained in:
Alexander Nozik 2025-05-11 19:36:02 +03:00
parent 752a849cd3
commit b10caebe2a
4 changed files with 47 additions and 16 deletions
kmath-stat/src
commonMain/kotlin/space/kscience/kmath/stat
jvmTest/kotlin/space/kscience/kmath/stat

@ -5,7 +5,9 @@
package space.kscience.kmath.stat
import space.kscience.kmath.operations.*
import space.kscience.kmath.operations.ExtendedField
import space.kscience.kmath.operations.Float32Field
import space.kscience.kmath.operations.Float64Field
import space.kscience.kmath.structures.Buffer
import space.kscience.kmath.structures.Float32
import space.kscience.kmath.structures.Float64
@ -14,19 +16,27 @@ import space.kscience.kmath.structures.indices
/**
* Geometric mean. Nth root of product of values.
*/
public class GeometricMean<T : Comparable<T>>(private val field: ExtendedField<T>) : BlockingStatistic<T, T>, ComposableStatistic<T, T, T> {
override fun evaluateBlocking(data: Buffer<T>): T = with(field) {
public class GeometricMean<T : Comparable<T>>(
private val field: ExtendedField<T>
) : BlockingStatistic<T, T>, ComposableStatistic<T, T, T> {
private fun logsum(data: Buffer<T>): T = with(field) {
require(data.size > 0) { "Data must not be empty" }
var res = zero
for (i in data.indices) {
if (data[i] < zero) throw ArithmeticException("Geometric mean is not defined for negative numbers. Found: " + data[i])
res += ln(data[i])
}
exp(res / data.size)
res
}
override fun evaluateBlocking(data: Buffer<T>): T = with(field) {
exp(logsum(data) / data.size)
}
override suspend fun computeIntermediate(data: Buffer<T>): T = evaluateBlocking(data)
override suspend fun composeIntermediate(first: T, second: T): T = with(field){ exp((ln(first) + ln(second))/2) }
override suspend fun composeIntermediate(first: T, second: T): T = with(field) { exp((ln(first) + ln(second)) / 2) }
override suspend fun toResult(intermediate: T): T = intermediate

@ -35,10 +35,16 @@ public class Mean<T>(
override suspend fun evaluate(data: Buffer<T>): T = super<ComposableStatistic>.evaluate(data)
override suspend fun computeIntermediate(data: Buffer<T>): Pair<T, Int> = evaluateBlocking(data) to data.size
override suspend fun computeIntermediate(data: Buffer<T>): Pair<T, Int> = with(field) {
var sum = zero
data.indices.forEach { sum += data[it] }
sum to data.size
}
override suspend fun composeIntermediate(first: Pair<T, Int>, second: Pair<T, Int>): Pair<T, Int> =
with(field) { first.first + second.first } to (first.second + second.second)
override suspend fun composeIntermediate(
first: Pair<T, Int>,
second: Pair<T, Int>
): Pair<T, Int> = with(field) { first.first + second.first } to (first.second + second.second)
override suspend fun toResult(intermediate: Pair<T, Int>): T = with(field) {
intermediate.first / intermediate.second

@ -5,6 +5,7 @@
package space.kscience.kmath.stat
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.last
import kotlinx.coroutines.flow.take
@ -14,6 +15,7 @@ import space.kscience.kmath.operations.Float64Field
import space.kscience.kmath.random.RandomGenerator
import space.kscience.kmath.random.chain
import space.kscience.kmath.streaming.chunked
import space.kscience.kmath.structures.Float64Buffer
import kotlin.test.Test
import kotlin.test.assertEquals
@ -21,14 +23,17 @@ internal class GeometricMeanStatisticTest {
//create a random number generator.
val generator = RandomGenerator.default(1)
//Create a stateless chain from generator.
val data = generator.chain { if (nextBoolean()) 1.0 else 4.0 }
private fun setupData(): Flow<Float64Buffer> {
//Create a stateless chain from generator.
val data = generator.chain { if (nextBoolean()) 1.0 else 4.0 }
//Convert a chain to Flow and break it into chunks.
val chunked = data.chunked(1000)
//Convert a chain to Flow and break it into chunks.
return data.chunked(1000)
}
@Test
fun singleBlockingMean() = runTest {
val chunked = setupData()
val first = chunked.first()
val res = Float64Field.geometricMean(first)
assertEquals(2.0, res, 1e-1)
@ -36,6 +41,7 @@ internal class GeometricMeanStatisticTest {
@Test
fun singleSuspendMean() = runTest {
val chunked = setupData()
val first = runBlocking { chunked.first() }
val res = Float64Field.geometricMean(first)
assertEquals(2.0, res, 1e-1)
@ -43,6 +49,7 @@ internal class GeometricMeanStatisticTest {
@Test
fun parallelMean() = runTest {
val chunked = setupData()
val average = Float64Field.geometricMean
.flow(chunked) //create a flow from evaluated results
.take(100) // Take 100 data chunks from the source and accumulate them

@ -5,6 +5,7 @@
package space.kscience.kmath.stat
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.last
import kotlinx.coroutines.flow.take
@ -14,6 +15,7 @@ import space.kscience.kmath.operations.Float64Field
import space.kscience.kmath.random.RandomGenerator
import space.kscience.kmath.random.chain
import space.kscience.kmath.streaming.chunked
import space.kscience.kmath.structures.Float64Buffer
import kotlin.test.Test
import kotlin.test.assertEquals
@ -21,14 +23,18 @@ internal class MeanStatisticTest {
//create a random number generator.
val generator = RandomGenerator.default(1)
//Create a stateless chain from generator.
val data = generator.chain { nextDouble() }
private fun setupData(): Flow<Float64Buffer> {
//Create a stateless chain from generator.
val data = generator.chain { nextDouble() }
//Convert a chain to Flow and break it into chunks.
return data.chunked(1000)
}
//Convert a chain to Flow and break it into chunks.
val chunked = data.chunked(1000)
@Test
fun singleBlockingMean() = runTest {
val chunked = setupData()
val first = chunked.first()
val res = Float64Field.mean(first)
assertEquals(0.5, res, 1e-1)
@ -36,6 +42,7 @@ internal class MeanStatisticTest {
@Test
fun singleSuspendMean() = runTest {
val chunked = setupData()
val first = runBlocking { chunked.first() }
val res = Float64Field.mean(first)
assertEquals(0.5, res, 1e-1)
@ -43,6 +50,7 @@ internal class MeanStatisticTest {
@Test
fun parallelMean() = runTest {
val chunked = setupData()
val average = Float64Field.mean
.flow(chunked) //create a flow from evaluated results
.take(100) // Take 100 data chunks from the source and accumulate them