From 92b4e6f28cb7291efc60898b5b581bb92e294678 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sun, 25 Aug 2019 10:51:25 +0300 Subject: [PATCH] Tools to evaluate statistic --- build.gradle.kts | 6 +- gradle/wrapper/gradle-wrapper.properties | 2 +- gradlew | 4 +- .../misc/{Cumulative.kt => cumulative.kt} | 13 +-- .../kmath/{operations => misc}/functions.kt | 11 ++- .../kmath/operations/AlgebraExtensions.kt | 3 - .../scientifik/kmath/structures/Buffers.kt | 4 +- kmath-coroutines/build.gradle.kts | 4 +- .../kotlin/scientifik/kmath/chains/Chain.kt | 15 +--- .../scientifik/kmath/chains/flowExtra.kt | 27 ++++++ ...{CoroutinesExtra.kt => coroutinesExtra.kt} | 11 ++- .../scientifik/kmath/streaming/BufferFlow.kt | 4 - .../kmath/streaming/BufferFlowTest.kt | 8 +- .../kotlin/scientifik/memory/MemorySpec.kt | 4 +- kmath-prob/build.gradle.kts | 21 +---- .../scientifik/kmath/prob/RandomChain.kt | 6 +- .../kotlin/scientifik/kmath/prob/Statistic.kt | 88 +++++++++++++++++++ .../scientifik/kmath/prob/StatisticTest.kt | 28 ++++++ settings.gradle.kts | 13 ++- 19 files changed, 191 insertions(+), 81 deletions(-) rename kmath-core/src/commonMain/kotlin/scientifik/kmath/misc/{Cumulative.kt => cumulative.kt} (86%) rename kmath-core/src/commonMain/kotlin/scientifik/kmath/{operations => misc}/functions.kt (82%) create mode 100644 kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/chains/flowExtra.kt rename kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/coroutines/{CoroutinesExtra.kt => coroutinesExtra.kt} (92%) create mode 100644 kmath-prob/src/commonMain/kotlin/scientifik/kmath/prob/Statistic.kt create mode 100644 kmath-prob/src/jvmTest/kotlin/scientifik/kmath/prob/StatisticTest.kt diff --git a/build.gradle.kts b/build.gradle.kts index b4b3f032e..4e55432ee 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,10 +1,8 @@ plugins { - id("scientifik.mpp") version "0.1.4" apply false - id("scientifik.publish") version "0.1.4" apply false - id("kotlinx-atomicfu") version "0.12.9" apply false + id("scientifik.publish") version "0.1.6" apply false } -val kmathVersion by extra("0.1.4-dev-1") +val kmathVersion by extra("0.1.4-dev") val bintrayRepo by extra("scientifik") val githubProject by extra("kmath") diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 4b7e1f3d3..ef9a9e05e 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-5.5.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-5.6-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index 8e25e6c19..83f2acfdc 100755 --- a/gradlew +++ b/gradlew @@ -125,8 +125,8 @@ if $darwin; then GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" fi -# For Cygwin, switch paths to Windows format before running java -if $cygwin ; then +# For Cygwin or MSYS, switch paths to Windows format before running java +if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then APP_HOME=`cygpath --path --mixed "$APP_HOME"` CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` JAVACMD=`cygpath --unix "$JAVACMD"` diff --git a/kmath-core/src/commonMain/kotlin/scientifik/kmath/misc/Cumulative.kt b/kmath-core/src/commonMain/kotlin/scientifik/kmath/misc/cumulative.kt similarity index 86% rename from kmath-core/src/commonMain/kotlin/scientifik/kmath/misc/Cumulative.kt rename to kmath-core/src/commonMain/kotlin/scientifik/kmath/misc/cumulative.kt index 314696262..c3cfc448a 100644 --- a/kmath-core/src/commonMain/kotlin/scientifik/kmath/misc/Cumulative.kt +++ b/kmath-core/src/commonMain/kotlin/scientifik/kmath/misc/cumulative.kt @@ -10,29 +10,32 @@ import kotlin.jvm.JvmName * @param R type of resulting iterable * @param initial lazy evaluated */ -fun Iterator.cumulative(initial: R, operation: (T, R) -> R): Iterator = object : Iterator { +fun Iterator.cumulative(initial: R, operation: (R, T) -> R): Iterator = object : Iterator { var state: R = initial override fun hasNext(): Boolean = this@cumulative.hasNext() override fun next(): R { - state = operation.invoke(this@cumulative.next(), state) + state = operation(state, this@cumulative.next()) return state } } -fun Iterable.cumulative(initial: R, operation: (T, R) -> R): Iterable = object : Iterable { +fun Iterable.cumulative(initial: R, operation: (R, T) -> R): Iterable = object : Iterable { override fun iterator(): Iterator = this@cumulative.iterator().cumulative(initial, operation) } -fun Sequence.cumulative(initial: R, operation: (T, R) -> R): Sequence = object : Sequence { +fun Sequence.cumulative(initial: R, operation: (R, T) -> R): Sequence = object : Sequence { override fun iterator(): Iterator = this@cumulative.iterator().cumulative(initial, operation) } -fun List.cumulative(initial: R, operation: (T, R) -> R): List = +fun List.cumulative(initial: R, operation: (R, T) -> R): List = this.iterator().cumulative(initial, operation).asSequence().toList() //Cumulative sum +/** + * Cumulative sum with custom space + */ fun Iterable.cumulativeSum(space: Space) = with(space) { cumulative(zero) { element: T, sum: T -> sum + element } } diff --git a/kmath-core/src/commonMain/kotlin/scientifik/kmath/operations/functions.kt b/kmath-core/src/commonMain/kotlin/scientifik/kmath/misc/functions.kt similarity index 82% rename from kmath-core/src/commonMain/kotlin/scientifik/kmath/operations/functions.kt rename to kmath-core/src/commonMain/kotlin/scientifik/kmath/misc/functions.kt index a76167b43..cef8209ed 100644 --- a/kmath-core/src/commonMain/kotlin/scientifik/kmath/operations/functions.kt +++ b/kmath-core/src/commonMain/kotlin/scientifik/kmath/misc/functions.kt @@ -1,7 +1,8 @@ -package scientifik.kmath.operations +package scientifik.kmath.coroutines +import scientifik.kmath.operations.RealField +import scientifik.kmath.operations.SpaceOperations import kotlin.jvm.JvmName -import kotlin.reflect.KClass /** * A suspendable univariate function defined in algebraic context @@ -28,7 +29,9 @@ suspend fun MFunction.invoke(args: DoubleArray) = RealField.i @JvmName("varargInvoke") suspend fun MFunction.invoke(vararg args: Double) = RealField.invoke(*args.toTypedArray()) - +/** + * A suspendable univariate function with parameter + */ interface ParametricUFunction> { suspend operator fun C.invoke(arg: T, parameter: P): T -} \ No newline at end of file +} diff --git a/kmath-core/src/commonMain/kotlin/scientifik/kmath/operations/AlgebraExtensions.kt b/kmath-core/src/commonMain/kotlin/scientifik/kmath/operations/AlgebraExtensions.kt index 6dec8bd79..4e8dbf36d 100644 --- a/kmath-core/src/commonMain/kotlin/scientifik/kmath/operations/AlgebraExtensions.kt +++ b/kmath-core/src/commonMain/kotlin/scientifik/kmath/operations/AlgebraExtensions.kt @@ -1,7 +1,4 @@ package scientifik.kmath.operations -import scientifik.kmath.structures.Buffer -import scientifik.kmath.structures.asSequence - fun Space.sum(data : Iterable): T = data.fold(zero) { left, right -> add(left,right) } fun Space.sum(data : Sequence): T = data.fold(zero) { left, right -> add(left, right) } \ No newline at end of file diff --git a/kmath-core/src/commonMain/kotlin/scientifik/kmath/structures/Buffers.kt b/kmath-core/src/commonMain/kotlin/scientifik/kmath/structures/Buffers.kt index a38f09c8d..c63384fb9 100644 --- a/kmath-core/src/commonMain/kotlin/scientifik/kmath/structures/Buffers.kt +++ b/kmath-core/src/commonMain/kotlin/scientifik/kmath/structures/Buffers.kt @@ -69,9 +69,9 @@ interface Buffer { } } -fun Buffer.asSequence(): Sequence = iterator().asSequence() +fun Buffer.asSequence(): Sequence = Sequence(::iterator) -fun Buffer.asIterable(): Iterable = iterator().asSequence().asIterable() +fun Buffer.asIterable(): Iterable = asSequence().asIterable() interface MutableBuffer : Buffer { operator fun set(index: Int, value: T) diff --git a/kmath-coroutines/build.gradle.kts b/kmath-coroutines/build.gradle.kts index e01c61326..373d9b8ac 100644 --- a/kmath-coroutines/build.gradle.kts +++ b/kmath-coroutines/build.gradle.kts @@ -1,5 +1,6 @@ plugins { id("scientifik.mpp") + //id("scientifik.atomic") } kotlin.sourceSets { @@ -7,19 +8,16 @@ kotlin.sourceSets { dependencies { api(project(":kmath-core")) api("org.jetbrains.kotlinx:kotlinx-coroutines-core-common:${Scientifik.coroutinesVersion}") - compileOnly("org.jetbrains.kotlinx:atomicfu-common:${Scientifik.atomicfuVersion}") } } jvmMain { dependencies { api("org.jetbrains.kotlinx:kotlinx-coroutines-core:${Scientifik.coroutinesVersion}") - compileOnly("org.jetbrains.kotlinx:atomicfu:${Scientifik.atomicfuVersion}") } } jsMain { dependencies { api("org.jetbrains.kotlinx:kotlinx-coroutines-core-js:${Scientifik.coroutinesVersion}") - compileOnly("org.jetbrains.kotlinx:atomicfu-js:${Scientifik.atomicfuVersion}") } } } diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/chains/Chain.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/chains/Chain.kt index e3f141b44..161dc3b7f 100644 --- a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/chains/Chain.kt +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/chains/Chain.kt @@ -16,9 +16,6 @@ package scientifik.kmath.chains -import kotlinx.atomicfu.atomic -import kotlinx.atomicfu.updateAndGet -import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock @@ -46,9 +43,7 @@ interface Chain { /** * Chain as a coroutine flow. The flow emit affects chain state and vice versa */ -@FlowPreview -val Chain.flow: Flow - get() = kotlinx.coroutines.flow.flow { while (true) emit(next()) } +fun Chain.flow(): Flow = kotlinx.coroutines.flow.flow { while (true) emit(next()) } fun Iterator.asChain(): Chain = SimpleChain { next() } fun Sequence.asChain(): Chain = iterator().asChain() @@ -66,8 +61,6 @@ class SimpleChain(private val gen: suspend () -> R) : Chain { */ class MarkovChain(private val seed: suspend () -> R, private val gen: suspend (R) -> R) : Chain { - //constructor(seedValue: R, gen: suspend (R) -> R) : this({ seedValue }, gen) - private val mutex = Mutex() private var value: R? = null @@ -97,12 +90,6 @@ class StatefulChain( private val gen: suspend S.(R) -> R ) : Chain { -// constructor(state: S, seedValue: R, forkState: ((S) -> S), gen: suspend S.(R) -> R) : this( -// state, -// { seedValue }, -// forkState, -// gen -// ) private val mutex = Mutex() private var value: R? = null diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/chains/flowExtra.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/chains/flowExtra.kt new file mode 100644 index 000000000..bfd16d763 --- /dev/null +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/chains/flowExtra.kt @@ -0,0 +1,27 @@ +package scientifik.kmath.chains + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.scan +import kotlinx.coroutines.flow.scanReduce +import scientifik.kmath.operations.Space +import scientifik.kmath.operations.SpaceOperations + + +@ExperimentalCoroutinesApi +fun Flow.cumulativeSum(space: SpaceOperations): Flow = with(space) { + scanReduce { sum: T, element: T -> sum + element } +} + +@ExperimentalCoroutinesApi +fun Flow.mean(space: Space): Flow = with(space) { + class Accumulator(var sum: T, var num: Int) + + scan(Accumulator(zero, 0)) { sum, element -> + sum.apply { + this.sum += element + this.num += 1 + } + }.map { it.sum / it.num } +} \ No newline at end of file diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/coroutines/CoroutinesExtra.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/coroutines/coroutinesExtra.kt similarity index 92% rename from kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/coroutines/CoroutinesExtra.kt rename to kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/coroutines/coroutinesExtra.kt index dc82d1881..fdde62304 100644 --- a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/coroutines/CoroutinesExtra.kt +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/coroutines/coroutinesExtra.kt @@ -21,8 +21,8 @@ internal class LazyDeferred(val dispatcher: CoroutineDispatcher, val block: s suspend fun await(): T = deferred?.await() ?: error("Coroutine not started") } -@FlowPreview class AsyncFlow internal constructor(internal val deferredFlow: Flow>) : Flow { + @InternalCoroutinesApi override suspend fun collect(collector: FlowCollector) { deferredFlow.collect { collector.emit((it.await())) @@ -88,14 +88,13 @@ suspend fun AsyncFlow.collect(concurrency: Int, action: suspend (value: T }) } +@ExperimentalCoroutinesApi @FlowPreview -fun Flow.map( - dispatcher: CoroutineDispatcher, - concurrencyLevel: Int = 16, - bufferSize: Int = concurrencyLevel, +fun Flow.mapParallel( + dispatcher: CoroutineDispatcher = Dispatchers.Default, transform: suspend (T) -> R ): Flow { - return flatMapMerge(concurrencyLevel, bufferSize) { value -> + return flatMapMerge{ value -> flow { emit(transform(value)) } }.flowOn(dispatcher) } diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt index 85ce73c4b..cf4d4cc17 100644 --- a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt @@ -9,7 +9,6 @@ import scientifik.kmath.structures.DoubleBuffer /** * Create a [Flow] from buffer */ -@FlowPreview fun Buffer.asFlow() = iterator().asFlow() /** @@ -21,7 +20,6 @@ fun Flow>.spread(): Flow = flatMapConcat { it.asFlow() } /** * Collect incoming flow into fixed size chunks */ -@FlowPreview fun Flow.chunked(bufferSize: Int, bufferFactory: BufferFactory): Flow> = flow { require(bufferSize > 0) { "Resulting chunk size must be more than zero" } val list = ArrayList(bufferSize) @@ -45,7 +43,6 @@ fun Flow.chunked(bufferSize: Int, bufferFactory: BufferFactory): Flow< /** * Specialized flow chunker for real buffer */ -@FlowPreview fun Flow.chunked(bufferSize: Int): Flow = flow { require(bufferSize > 0) { "Resulting chunk size must be more than zero" } val array = DoubleArray(bufferSize) @@ -69,7 +66,6 @@ fun Flow.chunked(bufferSize: Int): Flow = flow { * Map a flow to a moving window buffer. The window step is one. * In order to get different steps, one could use skip operation. */ -@FlowPreview fun Flow.windowed(window: Int): Flow> = flow { require(window > 1) { "Window size must be more than one" } val ringBuffer = RingBuffer.boxing(window) diff --git a/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/BufferFlowTest.kt b/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/BufferFlowTest.kt index 4fd397993..af2d6cd43 100644 --- a/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/BufferFlowTest.kt +++ b/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/BufferFlowTest.kt @@ -6,7 +6,7 @@ import kotlinx.coroutines.flow.collect import org.junit.Test import scientifik.kmath.coroutines.async import scientifik.kmath.coroutines.collect -import scientifik.kmath.coroutines.map +import scientifik.kmath.coroutines.mapParallel import java.util.concurrent.Executors @@ -20,8 +20,8 @@ class BufferFlowTest { @Test(timeout = 2000) fun map() { runBlocking { - (1..20).asFlow().map( dispatcher) { - //println("Started $it on ${Thread.currentThread().name}") + (1..20).asFlow().mapParallel( dispatcher) { + println("Started $it on ${Thread.currentThread().name}") @Suppress("BlockingMethodInNonBlockingContext") Thread.sleep(200) it @@ -35,7 +35,7 @@ class BufferFlowTest { fun async() { runBlocking { (1..20).asFlow().async(dispatcher) { - //println("Started $it on ${Thread.currentThread().name}") + println("Started $it on ${Thread.currentThread().name}") @Suppress("BlockingMethodInNonBlockingContext") Thread.sleep(200) it diff --git a/kmath-memory/src/commonMain/kotlin/scientifik/memory/MemorySpec.kt b/kmath-memory/src/commonMain/kotlin/scientifik/memory/MemorySpec.kt index e6da316cf..0896f0dcb 100644 --- a/kmath-memory/src/commonMain/kotlin/scientifik/memory/MemorySpec.kt +++ b/kmath-memory/src/commonMain/kotlin/scientifik/memory/MemorySpec.kt @@ -1,7 +1,5 @@ package scientifik.memory -import kotlin.reflect.KClass - /** * A specification to read or write custom objects with fixed size in bytes */ @@ -27,7 +25,7 @@ inline fun MemoryReader.readArray(spec: MemorySpec, offset: fun MemoryWriter.writeArray(spec: MemorySpec, offset: Int, array: Array) { spec.run { - for (i in 0 until array.size) { + for (i in array.indices) { write(offset + i * objectSize, array[i]) } } diff --git a/kmath-prob/build.gradle.kts b/kmath-prob/build.gradle.kts index 3294469ab..59b25d340 100644 --- a/kmath-prob/build.gradle.kts +++ b/kmath-prob/build.gradle.kts @@ -1,30 +1,11 @@ plugins { id("scientifik.mpp") - id("kotlinx-atomicfu") } kotlin.sourceSets { commonMain { dependencies { api(project(":kmath-coroutines")) - compileOnly("org.jetbrains.kotlinx:atomicfu-common:${Scientifik.atomicfuVersion}") } } - jvmMain { - dependencies { - // https://mvnrepository.com/artifact/org.apache.commons/commons-rng-simple - //api("org.apache.commons:commons-rng-sampling:1.2") - compileOnly("org.jetbrains.kotlinx:atomicfu:${Scientifik.atomicfuVersion}") - } - } - jsMain { - dependencies { - compileOnly("org.jetbrains.kotlinx:atomicfu-js:${Scientifik.atomicfuVersion}") - } - } - -} - -atomicfu { - variant = "VH" -} +} \ No newline at end of file diff --git a/kmath-prob/src/commonMain/kotlin/scientifik/kmath/prob/RandomChain.kt b/kmath-prob/src/commonMain/kotlin/scientifik/kmath/prob/RandomChain.kt index d65b9530a..9b581afd7 100644 --- a/kmath-prob/src/commonMain/kotlin/scientifik/kmath/prob/RandomChain.kt +++ b/kmath-prob/src/commonMain/kotlin/scientifik/kmath/prob/RandomChain.kt @@ -1,6 +1,5 @@ package scientifik.kmath.prob -import kotlinx.atomicfu.atomic import scientifik.kmath.chains.Chain /** @@ -10,4 +9,7 @@ class RandomChain(val generator: RandomGenerator, private val gen: suspen override suspend fun next(): R = generator.gen() override fun fork(): Chain = RandomChain(generator.fork(), gen) -} \ No newline at end of file +} + +fun RandomGenerator.chain(gen: suspend RandomGenerator.() -> R): RandomChain = RandomChain(this, gen) +fun RandomGenerator.flow(gen: suspend RandomGenerator.() -> R) = chain(gen).fork() \ No newline at end of file diff --git a/kmath-prob/src/commonMain/kotlin/scientifik/kmath/prob/Statistic.kt b/kmath-prob/src/commonMain/kotlin/scientifik/kmath/prob/Statistic.kt new file mode 100644 index 000000000..b02eb598b --- /dev/null +++ b/kmath-prob/src/commonMain/kotlin/scientifik/kmath/prob/Statistic.kt @@ -0,0 +1,88 @@ +package scientifik.kmath.prob + +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.scanReduce +import scientifik.kmath.coroutines.mapParallel +import scientifik.kmath.operations.* +import scientifik.kmath.structures.Buffer +import scientifik.kmath.structures.asIterable + +/** + * A function, that transforms a buffer of random quantities to some resulting value + */ +interface Statistic { + suspend operator fun invoke(data: Buffer): R +} + +/** + * A statistic tha could be computed separately on different blocks of data and then composed + */ +interface ComposableStatistic : Statistic { + suspend fun computeIntermediate(data: Buffer): I + suspend fun composeIntermediate(first: I, second: I): I + suspend fun toResult(intermediate: I): R + + override suspend fun invoke(data: Buffer): R = toResult(computeIntermediate(data)) +} + +@FlowPreview +@ExperimentalCoroutinesApi +fun ComposableStatistic.flowIntermediate( + flow: Flow>, + dispatcher: CoroutineDispatcher = Dispatchers.Default +): Flow = flow + .mapParallel(dispatcher) { computeIntermediate(it) } + .scanReduce(::composeIntermediate) + + +/** + * Perform a streaming statistical analysis on a chunked data. The computation of inner representation is done in parallel + * if [dispatcher] allows it. + * + * The resulting flow contains values that include the whole previous statistics, not only the last chunk. + */ +@FlowPreview +@ExperimentalCoroutinesApi +fun ComposableStatistic.flow( + flow: Flow>, + dispatcher: CoroutineDispatcher = Dispatchers.Default +): Flow = flowIntermediate(flow,dispatcher).map(::toResult) + +/** + * Arithmetic mean + */ +class Mean(val space: Space) : ComposableStatistic, T> { + override suspend fun computeIntermediate(data: Buffer): Pair = + space.run { sum(data.asIterable()) } to data.size + + override suspend fun composeIntermediate(first: Pair, second: Pair): Pair = + space.run { first.first + second.first } to (first.second + second.second) + + override suspend fun toResult(intermediate: Pair): T = + space.run { intermediate.first / intermediate.second } + + companion object { + //TODO replace with optimized version which respects overflow + val real = Mean(RealField) + val int = Mean(IntRing) + val long = Mean(LongRing) + } +} + +/** + * Non-composable median + */ +class Median(comparator: Comparator) : Statistic { + override suspend fun invoke(data: Buffer): T { + return data.asIterable().toList()[data.size / 2] //TODO check if this is correct + } + + companion object { + val real = Median(Comparator { a: Double, b: Double -> a.compareTo(b) }) + } +} \ No newline at end of file diff --git a/kmath-prob/src/jvmTest/kotlin/scientifik/kmath/prob/StatisticTest.kt b/kmath-prob/src/jvmTest/kotlin/scientifik/kmath/prob/StatisticTest.kt new file mode 100644 index 000000000..8b46cac52 --- /dev/null +++ b/kmath-prob/src/jvmTest/kotlin/scientifik/kmath/prob/StatisticTest.kt @@ -0,0 +1,28 @@ +package scientifik.kmath.prob + +import kotlinx.coroutines.flow.drop +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.runBlocking +import scientifik.kmath.chains.flow +import scientifik.kmath.streaming.chunked +import kotlin.test.Test + +class StatisticTest { + //create a random number generator. + val generator = DefaultGenerator(1) + //Create a stateless chain from generator. + val data = generator.chain { nextDouble() } + //Convert a chaint to Flow and break it into chunks. + val chunked = data.flow().chunked(1000) + + @Test + fun testParallelMean() { + runBlocking { + val average = Mean.real + .flow(chunked) //create a flow with results + .drop(99) // Skip first 99 values and use one with total data + .first() //get 1e5 data samples average + println(average) + } + } +} \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index f7ef0fc42..042558d25 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -1,4 +1,12 @@ pluginManagement { + + plugins { + id("scientifik.mpp") version "0.1.6" + id("scientifik.jvm") version "0.1.6" + id("scientifik.atomic") version "0.1.6" + id("scientifik.publish") version "0.1.6" + } + repositories { mavenLocal() jcenter() @@ -7,13 +15,10 @@ pluginManagement { maven("https://dl.bintray.com/mipt-npm/scientifik") maven("https://dl.bintray.com/kotlin/kotlinx") } + resolutionStrategy { eachPlugin { when (requested.id.id) { - "kotlinx-atomicfu" -> useModule("org.jetbrains.kotlinx:atomicfu-gradle-plugin:${requested.version}") - "kotlin-multiplatform" -> useModule("org.jetbrains.kotlin:kotlin-gradle-plugin:${requested.version}") - "kotlin2js" -> useModule("org.jetbrains.kotlin:kotlin-gradle-plugin:${requested.version}") - "org.jetbrains.kotlin.frontend" -> useModule("org.jetbrains.kotlin:kotlin-frontend-plugin:0.0.45") "scientifik.mpp", "scientifik.publish" -> useModule("scientifik:gradle-tools:${requested.version}") } }