Tools to evaluate statistic

This commit is contained in:
Alexander Nozik 2019-08-25 10:51:25 +03:00
parent 645d81abf0
commit 92b4e6f28c
19 changed files with 191 additions and 81 deletions

View File

@ -1,10 +1,8 @@
plugins {
id("scientifik.mpp") version "0.1.4" apply false
id("scientifik.publish") version "0.1.4" apply false
id("kotlinx-atomicfu") version "0.12.9" apply false
id("scientifik.publish") version "0.1.6" apply false
}
val kmathVersion by extra("0.1.4-dev-1")
val kmathVersion by extra("0.1.4-dev")
val bintrayRepo by extra("scientifik")
val githubProject by extra("kmath")

View File

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.5.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

4
gradlew vendored
View File

@ -125,8 +125,8 @@ if $darwin; then
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi
# For Cygwin, switch paths to Windows format before running java
if $cygwin ; then
# For Cygwin or MSYS, switch paths to Windows format before running java
if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`

View File

@ -10,29 +10,32 @@ import kotlin.jvm.JvmName
* @param R type of resulting iterable
* @param initial lazy evaluated
*/
fun <T, R> Iterator<T>.cumulative(initial: R, operation: (T, R) -> R): Iterator<R> = object : Iterator<R> {
fun <T, R> Iterator<T>.cumulative(initial: R, operation: (R, T) -> R): Iterator<R> = object : Iterator<R> {
var state: R = initial
override fun hasNext(): Boolean = this@cumulative.hasNext()
override fun next(): R {
state = operation.invoke(this@cumulative.next(), state)
state = operation(state, this@cumulative.next())
return state
}
}
fun <T, R> Iterable<T>.cumulative(initial: R, operation: (T, R) -> R): Iterable<R> = object : Iterable<R> {
fun <T, R> Iterable<T>.cumulative(initial: R, operation: (R, T) -> R): Iterable<R> = object : Iterable<R> {
override fun iterator(): Iterator<R> = this@cumulative.iterator().cumulative(initial, operation)
}
fun <T, R> Sequence<T>.cumulative(initial: R, operation: (T, R) -> R): Sequence<R> = object : Sequence<R> {
fun <T, R> Sequence<T>.cumulative(initial: R, operation: (R, T) -> R): Sequence<R> = object : Sequence<R> {
override fun iterator(): Iterator<R> = this@cumulative.iterator().cumulative(initial, operation)
}
fun <T, R> List<T>.cumulative(initial: R, operation: (T, R) -> R): List<R> =
fun <T, R> List<T>.cumulative(initial: R, operation: (R, T) -> R): List<R> =
this.iterator().cumulative(initial, operation).asSequence().toList()
//Cumulative sum
/**
* Cumulative sum with custom space
*/
fun <T> Iterable<T>.cumulativeSum(space: Space<T>) = with(space) {
cumulative(zero) { element: T, sum: T -> sum + element }
}

View File

@ -1,7 +1,8 @@
package scientifik.kmath.operations
package scientifik.kmath.coroutines
import scientifik.kmath.operations.RealField
import scientifik.kmath.operations.SpaceOperations
import kotlin.jvm.JvmName
import kotlin.reflect.KClass
/**
* A suspendable univariate function defined in algebraic context
@ -28,7 +29,9 @@ suspend fun MFunction<Double, RealField>.invoke(args: DoubleArray) = RealField.i
@JvmName("varargInvoke")
suspend fun MFunction<Double, RealField>.invoke(vararg args: Double) = RealField.invoke(*args.toTypedArray())
/**
* A suspendable univariate function with parameter
*/
interface ParametricUFunction<T, P, C : SpaceOperations<T>> {
suspend operator fun C.invoke(arg: T, parameter: P): T
}
}

View File

@ -1,7 +1,4 @@
package scientifik.kmath.operations
import scientifik.kmath.structures.Buffer
import scientifik.kmath.structures.asSequence
fun <T> Space<T>.sum(data : Iterable<T>): T = data.fold(zero) { left, right -> add(left,right) }
fun <T> Space<T>.sum(data : Sequence<T>): T = data.fold(zero) { left, right -> add(left, right) }

View File

@ -69,9 +69,9 @@ interface Buffer<T> {
}
}
fun <T> Buffer<T>.asSequence(): Sequence<T> = iterator().asSequence()
fun <T> Buffer<T>.asSequence(): Sequence<T> = Sequence(::iterator)
fun <T> Buffer<T>.asIterable(): Iterable<T> = iterator().asSequence().asIterable()
fun <T> Buffer<T>.asIterable(): Iterable<T> = asSequence().asIterable()
interface MutableBuffer<T> : Buffer<T> {
operator fun set(index: Int, value: T)

View File

@ -1,5 +1,6 @@
plugins {
id("scientifik.mpp")
//id("scientifik.atomic")
}
kotlin.sourceSets {
@ -7,19 +8,16 @@ kotlin.sourceSets {
dependencies {
api(project(":kmath-core"))
api("org.jetbrains.kotlinx:kotlinx-coroutines-core-common:${Scientifik.coroutinesVersion}")
compileOnly("org.jetbrains.kotlinx:atomicfu-common:${Scientifik.atomicfuVersion}")
}
}
jvmMain {
dependencies {
api("org.jetbrains.kotlinx:kotlinx-coroutines-core:${Scientifik.coroutinesVersion}")
compileOnly("org.jetbrains.kotlinx:atomicfu:${Scientifik.atomicfuVersion}")
}
}
jsMain {
dependencies {
api("org.jetbrains.kotlinx:kotlinx-coroutines-core-js:${Scientifik.coroutinesVersion}")
compileOnly("org.jetbrains.kotlinx:atomicfu-js:${Scientifik.atomicfuVersion}")
}
}
}

View File

@ -16,9 +16,6 @@
package scientifik.kmath.chains
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.updateAndGet
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
@ -46,9 +43,7 @@ interface Chain<out R> {
/**
* Chain as a coroutine flow. The flow emit affects chain state and vice versa
*/
@FlowPreview
val <R> Chain<R>.flow: Flow<R>
get() = kotlinx.coroutines.flow.flow { while (true) emit(next()) }
fun <R> Chain<R>.flow(): Flow<R> = kotlinx.coroutines.flow.flow { while (true) emit(next()) }
fun <T> Iterator<T>.asChain(): Chain<T> = SimpleChain { next() }
fun <T> Sequence<T>.asChain(): Chain<T> = iterator().asChain()
@ -66,8 +61,6 @@ class SimpleChain<out R>(private val gen: suspend () -> R) : Chain<R> {
*/
class MarkovChain<out R : Any>(private val seed: suspend () -> R, private val gen: suspend (R) -> R) : Chain<R> {
//constructor(seedValue: R, gen: suspend (R) -> R) : this({ seedValue }, gen)
private val mutex = Mutex()
private var value: R? = null
@ -97,12 +90,6 @@ class StatefulChain<S, out R>(
private val gen: suspend S.(R) -> R
) : Chain<R> {
// constructor(state: S, seedValue: R, forkState: ((S) -> S), gen: suspend S.(R) -> R) : this(
// state,
// { seedValue },
// forkState,
// gen
// )
private val mutex = Mutex()
private var value: R? = null

View File

@ -0,0 +1,27 @@
package scientifik.kmath.chains
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.scan
import kotlinx.coroutines.flow.scanReduce
import scientifik.kmath.operations.Space
import scientifik.kmath.operations.SpaceOperations
@ExperimentalCoroutinesApi
fun <T> Flow<T>.cumulativeSum(space: SpaceOperations<T>): Flow<T> = with(space) {
scanReduce { sum: T, element: T -> sum + element }
}
@ExperimentalCoroutinesApi
fun <T> Flow<T>.mean(space: Space<T>): Flow<T> = with(space) {
class Accumulator(var sum: T, var num: Int)
scan(Accumulator(zero, 0)) { sum, element ->
sum.apply {
this.sum += element
this.num += 1
}
}.map { it.sum / it.num }
}

View File

@ -21,8 +21,8 @@ internal class LazyDeferred<T>(val dispatcher: CoroutineDispatcher, val block: s
suspend fun await(): T = deferred?.await() ?: error("Coroutine not started")
}
@FlowPreview
class AsyncFlow<T> internal constructor(internal val deferredFlow: Flow<LazyDeferred<T>>) : Flow<T> {
@InternalCoroutinesApi
override suspend fun collect(collector: FlowCollector<T>) {
deferredFlow.collect {
collector.emit((it.await()))
@ -88,14 +88,13 @@ suspend fun <T> AsyncFlow<T>.collect(concurrency: Int, action: suspend (value: T
})
}
@ExperimentalCoroutinesApi
@FlowPreview
fun <T, R> Flow<T>.map(
dispatcher: CoroutineDispatcher,
concurrencyLevel: Int = 16,
bufferSize: Int = concurrencyLevel,
fun <T, R> Flow<T>.mapParallel(
dispatcher: CoroutineDispatcher = Dispatchers.Default,
transform: suspend (T) -> R
): Flow<R> {
return flatMapMerge(concurrencyLevel, bufferSize) { value ->
return flatMapMerge{ value ->
flow { emit(transform(value)) }
}.flowOn(dispatcher)
}

View File

@ -9,7 +9,6 @@ import scientifik.kmath.structures.DoubleBuffer
/**
* Create a [Flow] from buffer
*/
@FlowPreview
fun <T> Buffer<T>.asFlow() = iterator().asFlow()
/**
@ -21,7 +20,6 @@ fun <T> Flow<Buffer<out T>>.spread(): Flow<T> = flatMapConcat { it.asFlow() }
/**
* Collect incoming flow into fixed size chunks
*/
@FlowPreview
fun <T> Flow<T>.chunked(bufferSize: Int, bufferFactory: BufferFactory<T>): Flow<Buffer<T>> = flow {
require(bufferSize > 0) { "Resulting chunk size must be more than zero" }
val list = ArrayList<T>(bufferSize)
@ -45,7 +43,6 @@ fun <T> Flow<T>.chunked(bufferSize: Int, bufferFactory: BufferFactory<T>): Flow<
/**
* Specialized flow chunker for real buffer
*/
@FlowPreview
fun Flow<Double>.chunked(bufferSize: Int): Flow<DoubleBuffer> = flow {
require(bufferSize > 0) { "Resulting chunk size must be more than zero" }
val array = DoubleArray(bufferSize)
@ -69,7 +66,6 @@ fun Flow<Double>.chunked(bufferSize: Int): Flow<DoubleBuffer> = flow {
* Map a flow to a moving window buffer. The window step is one.
* In order to get different steps, one could use skip operation.
*/
@FlowPreview
fun <T> Flow<T>.windowed(window: Int): Flow<Buffer<T>> = flow {
require(window > 1) { "Window size must be more than one" }
val ringBuffer = RingBuffer.boxing<T>(window)

View File

@ -6,7 +6,7 @@ import kotlinx.coroutines.flow.collect
import org.junit.Test
import scientifik.kmath.coroutines.async
import scientifik.kmath.coroutines.collect
import scientifik.kmath.coroutines.map
import scientifik.kmath.coroutines.mapParallel
import java.util.concurrent.Executors
@ -20,8 +20,8 @@ class BufferFlowTest {
@Test(timeout = 2000)
fun map() {
runBlocking {
(1..20).asFlow().map( dispatcher) {
//println("Started $it on ${Thread.currentThread().name}")
(1..20).asFlow().mapParallel( dispatcher) {
println("Started $it on ${Thread.currentThread().name}")
@Suppress("BlockingMethodInNonBlockingContext")
Thread.sleep(200)
it
@ -35,7 +35,7 @@ class BufferFlowTest {
fun async() {
runBlocking {
(1..20).asFlow().async(dispatcher) {
//println("Started $it on ${Thread.currentThread().name}")
println("Started $it on ${Thread.currentThread().name}")
@Suppress("BlockingMethodInNonBlockingContext")
Thread.sleep(200)
it

View File

@ -1,7 +1,5 @@
package scientifik.memory
import kotlin.reflect.KClass
/**
* A specification to read or write custom objects with fixed size in bytes
*/
@ -27,7 +25,7 @@ inline fun <reified T : Any> MemoryReader.readArray(spec: MemorySpec<T>, offset:
fun <T : Any> MemoryWriter.writeArray(spec: MemorySpec<T>, offset: Int, array: Array<T>) {
spec.run {
for (i in 0 until array.size) {
for (i in array.indices) {
write(offset + i * objectSize, array[i])
}
}

View File

@ -1,30 +1,11 @@
plugins {
id("scientifik.mpp")
id("kotlinx-atomicfu")
}
kotlin.sourceSets {
commonMain {
dependencies {
api(project(":kmath-coroutines"))
compileOnly("org.jetbrains.kotlinx:atomicfu-common:${Scientifik.atomicfuVersion}")
}
}
jvmMain {
dependencies {
// https://mvnrepository.com/artifact/org.apache.commons/commons-rng-simple
//api("org.apache.commons:commons-rng-sampling:1.2")
compileOnly("org.jetbrains.kotlinx:atomicfu:${Scientifik.atomicfuVersion}")
}
}
jsMain {
dependencies {
compileOnly("org.jetbrains.kotlinx:atomicfu-js:${Scientifik.atomicfuVersion}")
}
}
}
atomicfu {
variant = "VH"
}
}

View File

@ -1,6 +1,5 @@
package scientifik.kmath.prob
import kotlinx.atomicfu.atomic
import scientifik.kmath.chains.Chain
/**
@ -10,4 +9,7 @@ class RandomChain<out R>(val generator: RandomGenerator, private val gen: suspen
override suspend fun next(): R = generator.gen()
override fun fork(): Chain<R> = RandomChain(generator.fork(), gen)
}
}
fun <R> RandomGenerator.chain(gen: suspend RandomGenerator.() -> R): RandomChain<R> = RandomChain(this, gen)
fun <R> RandomGenerator.flow(gen: suspend RandomGenerator.() -> R) = chain(gen).fork()

View File

@ -0,0 +1,88 @@
package scientifik.kmath.prob
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.scanReduce
import scientifik.kmath.coroutines.mapParallel
import scientifik.kmath.operations.*
import scientifik.kmath.structures.Buffer
import scientifik.kmath.structures.asIterable
/**
* A function, that transforms a buffer of random quantities to some resulting value
*/
interface Statistic<T, R> {
suspend operator fun invoke(data: Buffer<T>): R
}
/**
* A statistic tha could be computed separately on different blocks of data and then composed
*/
interface ComposableStatistic<T, I, R> : Statistic<T, R> {
suspend fun computeIntermediate(data: Buffer<T>): I
suspend fun composeIntermediate(first: I, second: I): I
suspend fun toResult(intermediate: I): R
override suspend fun invoke(data: Buffer<T>): R = toResult(computeIntermediate(data))
}
@FlowPreview
@ExperimentalCoroutinesApi
fun <T, I, R> ComposableStatistic<T, I, R>.flowIntermediate(
flow: Flow<Buffer<T>>,
dispatcher: CoroutineDispatcher = Dispatchers.Default
): Flow<I> = flow
.mapParallel(dispatcher) { computeIntermediate(it) }
.scanReduce(::composeIntermediate)
/**
* Perform a streaming statistical analysis on a chunked data. The computation of inner representation is done in parallel
* if [dispatcher] allows it.
*
* The resulting flow contains values that include the whole previous statistics, not only the last chunk.
*/
@FlowPreview
@ExperimentalCoroutinesApi
fun <T, I, R> ComposableStatistic<T, I, R>.flow(
flow: Flow<Buffer<T>>,
dispatcher: CoroutineDispatcher = Dispatchers.Default
): Flow<R> = flowIntermediate(flow,dispatcher).map(::toResult)
/**
* Arithmetic mean
*/
class Mean<T>(val space: Space<T>) : ComposableStatistic<T, Pair<T, Int>, T> {
override suspend fun computeIntermediate(data: Buffer<T>): Pair<T, Int> =
space.run { sum(data.asIterable()) } to data.size
override suspend fun composeIntermediate(first: Pair<T, Int>, second: Pair<T, Int>): Pair<T, Int> =
space.run { first.first + second.first } to (first.second + second.second)
override suspend fun toResult(intermediate: Pair<T, Int>): T =
space.run { intermediate.first / intermediate.second }
companion object {
//TODO replace with optimized version which respects overflow
val real = Mean(RealField)
val int = Mean(IntRing)
val long = Mean(LongRing)
}
}
/**
* Non-composable median
*/
class Median<T>(comparator: Comparator<T>) : Statistic<T, T> {
override suspend fun invoke(data: Buffer<T>): T {
return data.asIterable().toList()[data.size / 2] //TODO check if this is correct
}
companion object {
val real = Median(Comparator { a: Double, b: Double -> a.compareTo(b) })
}
}

View File

@ -0,0 +1,28 @@
package scientifik.kmath.prob
import kotlinx.coroutines.flow.drop
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.runBlocking
import scientifik.kmath.chains.flow
import scientifik.kmath.streaming.chunked
import kotlin.test.Test
class StatisticTest {
//create a random number generator.
val generator = DefaultGenerator(1)
//Create a stateless chain from generator.
val data = generator.chain { nextDouble() }
//Convert a chaint to Flow and break it into chunks.
val chunked = data.flow().chunked(1000)
@Test
fun testParallelMean() {
runBlocking {
val average = Mean.real
.flow(chunked) //create a flow with results
.drop(99) // Skip first 99 values and use one with total data
.first() //get 1e5 data samples average
println(average)
}
}
}

View File

@ -1,4 +1,12 @@
pluginManagement {
plugins {
id("scientifik.mpp") version "0.1.6"
id("scientifik.jvm") version "0.1.6"
id("scientifik.atomic") version "0.1.6"
id("scientifik.publish") version "0.1.6"
}
repositories {
mavenLocal()
jcenter()
@ -7,13 +15,10 @@ pluginManagement {
maven("https://dl.bintray.com/mipt-npm/scientifik")
maven("https://dl.bintray.com/kotlin/kotlinx")
}
resolutionStrategy {
eachPlugin {
when (requested.id.id) {
"kotlinx-atomicfu" -> useModule("org.jetbrains.kotlinx:atomicfu-gradle-plugin:${requested.version}")
"kotlin-multiplatform" -> useModule("org.jetbrains.kotlin:kotlin-gradle-plugin:${requested.version}")
"kotlin2js" -> useModule("org.jetbrains.kotlin:kotlin-gradle-plugin:${requested.version}")
"org.jetbrains.kotlin.frontend" -> useModule("org.jetbrains.kotlin:kotlin-frontend-plugin:0.0.45")
"scientifik.mpp", "scientifik.publish" -> useModule("scientifik:gradle-tools:${requested.version}")
}
}