From 2e7607371222ea5361f12d0b19a052e4b2099571 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Fri, 7 Jun 2019 15:08:05 +0300 Subject: [PATCH] Initial work on distributions --- .../kotlin/scientifik/kmath/chains/Chain.kt | 32 +++++------ .../kmath/{ => coroutines}/CoroutinesExtra.kt | 17 +++--- .../scientifik/kmath/streaming/BufferFlow.kt | 4 +- .../scientifik/kmath/chains/ChainExt.kt | 2 +- .../kmath/structures/LazyNDStructure.kt | 2 +- .../kmath/streaming/BufferFlowTest.kt | 6 +-- kmath-prob/build.gradle.kts | 3 +- .../scientifik/kmath/prob/Distribution.kt | 28 ++++++++++ .../scientifik/kmath/prob/RandomGenerator.kt | 11 ++++ .../kmath/prob/CommonDistributions.kt | 53 +++++++++++++++++++ .../prob/CommonsRandomProviderWrapper.kt | 18 +++++++ 11 files changed, 144 insertions(+), 32 deletions(-) rename kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/{ => coroutines}/CoroutinesExtra.kt (90%) create mode 100644 kmath-prob/src/commonMain/kotlin/scientifik/kmath/prob/Distribution.kt create mode 100644 kmath-prob/src/commonMain/kotlin/scientifik/kmath/prob/RandomGenerator.kt create mode 100644 kmath-prob/src/jvmMain/kotlin/scientifik/kmath/prob/CommonDistributions.kt create mode 100644 kmath-prob/src/jvmMain/kotlin/scientifik/kmath/prob/CommonsRandomProviderWrapper.kt diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/chains/Chain.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/chains/Chain.kt index 4d2f604e1..4adc2a9a7 100644 --- a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/chains/Chain.kt +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/chains/Chain.kt @@ -18,6 +18,7 @@ package scientifik.kmath.chains import kotlinx.atomicfu.atomic import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow /** @@ -28,7 +29,7 @@ interface Chain { /** * Last cached value of the chain. Returns null if [next] was not called */ - val value: R? + fun peek(): R? /** * Generate next value, changing state if needed @@ -46,13 +47,12 @@ interface Chain { * Chain as a coroutine flow. The flow emit affects chain state and vice versa */ @FlowPreview -val Chain.flow +val Chain.flow: Flow get() = kotlinx.coroutines.flow.flow { while (true) emit(next()) } fun Iterator.asChain(): Chain = SimpleChain { next() } fun Sequence.asChain(): Chain = iterator().asChain() - /** * Map the chain result using suspended transformation. Initial chain result can no longer be safely consumed * since mapped chain consumes tokens. Accepts regular transformation function @@ -60,7 +60,7 @@ fun Sequence.asChain(): Chain = iterator().asChain() fun Chain.map(func: (T) -> R): Chain { val parent = this; return object : Chain { - override val value: R? get() = parent.value?.let(func) + override fun peek(): R? = parent.peek()?.let(func) override suspend fun next(): R { return func(parent.next()) @@ -77,7 +77,7 @@ fun Chain.map(func: (T) -> R): Chain { */ class SimpleChain(private val gen: suspend () -> R) : Chain { private val atomicValue = atomic(null) - override val value: R? get() = atomicValue.value + override fun peek(): R? = atomicValue.value override suspend fun next(): R = gen().also { atomicValue.lazySet(it) } @@ -95,16 +95,16 @@ class MarkovChain(private val seed: () -> R, private val gen: suspe constructor(seed: R, gen: suspend (R) -> R) : this({ seed }, gen) private val atomicValue = atomic(null) - override val value: R get() = atomicValue.value ?: seed() + override fun peek(): R = atomicValue.value ?: seed() override suspend fun next(): R { - val newValue = gen(value) + val newValue = gen(peek()) atomicValue.lazySet(newValue) - return value + return peek() } override fun fork(): Chain { - return MarkovChain(value, gen) + return MarkovChain(peek(), gen) } } @@ -121,12 +121,12 @@ class StatefulChain( constructor(state: S, seed: R, gen: suspend S.(R) -> R) : this(state, { seed }, gen) private val atomicValue = atomic(null) - override val value: R get() = atomicValue.value ?: seed(state) + override fun peek(): R = atomicValue.value ?: seed(state) override suspend fun next(): R { - val newValue = gen(state, value) + val newValue = gen(state, peek()) atomicValue.lazySet(newValue) - return value + return peek() } override fun fork(): Chain { @@ -137,10 +137,10 @@ class StatefulChain( /** * A chain that repeats the same value */ -class ConstantChain(override val value: T) : Chain { - override suspend fun next(): T { - return value - } +class ConstantChain(val value: T) : Chain { + override fun peek(): T? = value + + override suspend fun next(): T = value override fun fork(): Chain { return this diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/CoroutinesExtra.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/coroutines/CoroutinesExtra.kt similarity index 90% rename from kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/CoroutinesExtra.kt rename to kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/coroutines/CoroutinesExtra.kt index 51cc07511..dc82d1881 100644 --- a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/CoroutinesExtra.kt +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/coroutines/CoroutinesExtra.kt @@ -1,4 +1,4 @@ -package scientifik.kmath +package scientifik.kmath.coroutines import kotlinx.coroutines.* import kotlinx.coroutines.channels.produce @@ -42,13 +42,14 @@ fun Flow.async( } @FlowPreview -fun AsyncFlow.map(action: (T) -> R) = AsyncFlow(deferredFlow.map { input -> - //TODO add function composition - LazyDeferred(input.dispatcher) { - input.start(this) - action(input.await()) - } -}) +fun AsyncFlow.map(action: (T) -> R) = + AsyncFlow(deferredFlow.map { input -> + //TODO add function composition + LazyDeferred(input.dispatcher) { + input.start(this) + action(input.await()) + } + }) @ExperimentalCoroutinesApi @FlowPreview diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt index d5e94fc3b..85ce73c4b 100644 --- a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt @@ -22,7 +22,7 @@ fun Flow>.spread(): Flow = flatMapConcat { it.asFlow() } * Collect incoming flow into fixed size chunks */ @FlowPreview -fun Flow.chunked(bufferSize: Int, bufferFactory: BufferFactory) = flow { +fun Flow.chunked(bufferSize: Int, bufferFactory: BufferFactory): Flow> = flow { require(bufferSize > 0) { "Resulting chunk size must be more than zero" } val list = ArrayList(bufferSize) var counter = 0 @@ -46,7 +46,7 @@ fun Flow.chunked(bufferSize: Int, bufferFactory: BufferFactory) = flow * Specialized flow chunker for real buffer */ @FlowPreview -fun Flow.chunked(bufferSize: Int) = flow { +fun Flow.chunked(bufferSize: Int): Flow = flow { require(bufferSize > 0) { "Resulting chunk size must be more than zero" } val array = DoubleArray(bufferSize) var counter = 0 diff --git a/kmath-coroutines/src/jvmMain/kotlin/scientifik/kmath/chains/ChainExt.kt b/kmath-coroutines/src/jvmMain/kotlin/scientifik/kmath/chains/ChainExt.kt index 4f2c4cb8b..b634c5ae2 100644 --- a/kmath-coroutines/src/jvmMain/kotlin/scientifik/kmath/chains/ChainExt.kt +++ b/kmath-coroutines/src/jvmMain/kotlin/scientifik/kmath/chains/ChainExt.kt @@ -27,7 +27,7 @@ fun Chain.asSequence(): Sequence = object : Sequence { fun Chain.map(func: suspend (T) -> R): Chain { val parent = this; return object : Chain { - override val value: R? get() = runBlocking { parent.value?.let { func(it) } } + override fun peek(): R? = runBlocking { parent.peek()?.let { func(it) } } override suspend fun next(): R { return func(parent.next()) diff --git a/kmath-coroutines/src/jvmMain/kotlin/scientifik/kmath/structures/LazyNDStructure.kt b/kmath-coroutines/src/jvmMain/kotlin/scientifik/kmath/structures/LazyNDStructure.kt index fa81fc67b..784b7cd10 100644 --- a/kmath-coroutines/src/jvmMain/kotlin/scientifik/kmath/structures/LazyNDStructure.kt +++ b/kmath-coroutines/src/jvmMain/kotlin/scientifik/kmath/structures/LazyNDStructure.kt @@ -1,7 +1,7 @@ package scientifik.kmath.structures import kotlinx.coroutines.* -import scientifik.kmath.Math +import scientifik.kmath.coroutines.Math class LazyNDStructure( val scope: CoroutineScope, diff --git a/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/BufferFlowTest.kt b/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/BufferFlowTest.kt index 5b65c2120..4fd397993 100644 --- a/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/BufferFlowTest.kt +++ b/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/BufferFlowTest.kt @@ -4,9 +4,9 @@ import kotlinx.coroutines.* import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.collect import org.junit.Test -import scientifik.kmath.async -import scientifik.kmath.collect -import scientifik.kmath.map +import scientifik.kmath.coroutines.async +import scientifik.kmath.coroutines.collect +import scientifik.kmath.coroutines.map import java.util.concurrent.Executors diff --git a/kmath-prob/build.gradle.kts b/kmath-prob/build.gradle.kts index 28ab2d6c7..77a4aefc8 100644 --- a/kmath-prob/build.gradle.kts +++ b/kmath-prob/build.gradle.kts @@ -6,13 +6,14 @@ plugins { kotlin.sourceSets { commonMain { dependencies { - api(project(":kmath-core")) api(project(":kmath-coroutines")) compileOnly("org.jetbrains.kotlinx:atomicfu-common:${Versions.atomicfuVersion}") } } jvmMain { dependencies { + // https://mvnrepository.com/artifact/org.apache.commons/commons-rng-simple + api("org.apache.commons:commons-rng-sampling:1.2") compileOnly("org.jetbrains.kotlinx:atomicfu:${Versions.atomicfuVersion}") } } diff --git a/kmath-prob/src/commonMain/kotlin/scientifik/kmath/prob/Distribution.kt b/kmath-prob/src/commonMain/kotlin/scientifik/kmath/prob/Distribution.kt new file mode 100644 index 000000000..946b173ff --- /dev/null +++ b/kmath-prob/src/commonMain/kotlin/scientifik/kmath/prob/Distribution.kt @@ -0,0 +1,28 @@ +package scientifik.kmath.prob + +import scientifik.kmath.chains.Chain + +/** + * A distribution of typed objects + */ +interface Distribution { + /** + * A probability value for given argument [arg]. + * For continuous distributions returns PDF + */ + fun probability(arg: T): Double + + /** + * Create a chain of samples from this distribution. + * The chain is not guaranteed to be stateless. + */ + fun sample(generator: RandomGenerator): Chain + //TODO add sample bunch generator +} + +interface UnivariateDistribution> : Distribution { + /** + * Cumulative distribution for ordered parameter + */ + fun cumulative(arg: T): Double +} \ No newline at end of file diff --git a/kmath-prob/src/commonMain/kotlin/scientifik/kmath/prob/RandomGenerator.kt b/kmath-prob/src/commonMain/kotlin/scientifik/kmath/prob/RandomGenerator.kt new file mode 100644 index 000000000..89cbc615c --- /dev/null +++ b/kmath-prob/src/commonMain/kotlin/scientifik/kmath/prob/RandomGenerator.kt @@ -0,0 +1,11 @@ +package scientifik.kmath.prob + +/** + * A basic generator + */ +interface RandomGenerator { + fun nextDouble(): Double + fun nextInt(): Int + fun nextLong(): Long + fun nextBlock(size: Int): ByteArray +} \ No newline at end of file diff --git a/kmath-prob/src/jvmMain/kotlin/scientifik/kmath/prob/CommonDistributions.kt b/kmath-prob/src/jvmMain/kotlin/scientifik/kmath/prob/CommonDistributions.kt new file mode 100644 index 000000000..c08008c28 --- /dev/null +++ b/kmath-prob/src/jvmMain/kotlin/scientifik/kmath/prob/CommonDistributions.kt @@ -0,0 +1,53 @@ +package scientifik.kmath.prob + +import org.apache.commons.rng.sampling.distribution.* +import scientifik.kmath.chains.Chain +import scientifik.kmath.chains.SimpleChain +import kotlin.math.PI +import kotlin.math.exp +import kotlin.math.sqrt + +class NormalDistribution(val mean: Double, val sigma: Double) : UnivariateDistribution { + enum class Sampler { + BoxMuller, + Marsaglia, + Ziggurat + } + + override fun probability(arg: Double): Double { + val d = (arg - mean) / sigma + return 1.0 / sqrt(2.0 * PI * sigma) * exp(-d * d / 2) + } + + override fun cumulative(arg: Double): Double { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + fun sample(generator: RandomGenerator, sampler: Sampler): Chain { + val normalized = when (sampler) { + Sampler.BoxMuller -> BoxMullerNormalizedGaussianSampler(generator.asProvider()) + Sampler.Marsaglia -> MarsagliaNormalizedGaussianSampler(generator.asProvider()) + Sampler.Ziggurat -> ZigguratNormalizedGaussianSampler(generator.asProvider()) + } + val gauss = GaussianSampler(normalized, mean, sigma) + //TODO add generator to chain state to allow stateful forks + return SimpleChain { gauss.sample() } + } + + override fun sample(generator: RandomGenerator): Chain = sample(generator, Sampler.BoxMuller) +} + +class PoissonDistribution(val mean: Double): UnivariateDistribution{ + override fun probability(arg: Int): Double { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override fun cumulative(arg: Int): Double { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override fun sample(generator: RandomGenerator): Chain { + val sampler = PoissonSampler(generator.asProvider(), mean) + return SimpleChain{sampler.sample()} + } +} \ No newline at end of file diff --git a/kmath-prob/src/jvmMain/kotlin/scientifik/kmath/prob/CommonsRandomProviderWrapper.kt b/kmath-prob/src/jvmMain/kotlin/scientifik/kmath/prob/CommonsRandomProviderWrapper.kt new file mode 100644 index 000000000..ad408460d --- /dev/null +++ b/kmath-prob/src/jvmMain/kotlin/scientifik/kmath/prob/CommonsRandomProviderWrapper.kt @@ -0,0 +1,18 @@ +package scientifik.kmath.prob + +import org.apache.commons.rng.UniformRandomProvider + +inline class CommonsRandomProviderWrapper(val provider: UniformRandomProvider) : RandomGenerator { + override fun nextDouble(): Double = provider.nextDouble() + + override fun nextInt(): Int = provider.nextInt() + + override fun nextLong(): Long = provider.nextLong() + + override fun nextBlock(size: Int): ByteArray = ByteArray(size).also { provider.nextBytes(it) } +} + +fun UniformRandomProvider.asGenerator(): RandomGenerator = CommonsRandomProviderWrapper(this) + +fun RandomGenerator.asProvider(): UniformRandomProvider = + (this as? CommonsRandomProviderWrapper)?.provider ?: TODO("implement reverse wrapper") \ No newline at end of file