Sequential operations

This commit is contained in:
Alexander Nozik 2019-02-02 18:16:25 +03:00
parent a3e8ffa147
commit 1e99e89c4c
9 changed files with 321 additions and 17 deletions

View File

@ -26,7 +26,7 @@ allprojects {
apply(plugin = "com.jfrog.artifactory")
group = "scientifik"
version = "0.0.3-dev-4"
version = "0.0.3-dev-5"
repositories {
@ -44,7 +44,6 @@ allprojects {
targets.all {
sourceSets.all {
languageSettings.progressiveMode = true

View File

@ -1,25 +1,13 @@
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
kotlin {
jvm {
compilations.all {
kotlinOptions {
jvmTarget = "1.8"
jvm ()
sourceSets {
all {
languageSettings.progressiveMode = true
val commonMain by getting {
dependencies {

View File

@ -0,0 +1,51 @@
plugins {
kotlin {
jvm ()
sourceSets {
val commonMain by getting {
dependencies {
val commonTest by getting {
dependencies {
val jvmMain by getting {
dependencies {
val jvmTest by getting {
dependencies {
// val jsMain by getting {
// dependencies {
// api(kotlin("stdlib-js"))
// }
// }
val jsTest by getting {
dependencies {
// mingwMain {
// }
// mingwTest {
// }

View File

@ -0,0 +1,35 @@
package scientifik.kmath.sequential
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.getAndUpdate
import scientifik.kmath.operations.Space
* An object with a state that accumulates incoming elements
interface Accumulator<in T> {
//PENDING use suspend operations?
fun push(value: T)
fun <T> Accumulator<T>.pushAll(values: Iterable<T>) {
values.forEach { push(it) }
* Generic thread-safe summator
class GenericSum<T : Any>(val context: Space<T>) : Accumulator<T> {
//TODO add guard against overflow
val counter = atomic(0)
val sum = atomic(
val value get() = with(context) { sum.value / counter.value }
override fun push(value: T) {
with(context) {
sum.getAndUpdate { it + value }

View File

@ -0,0 +1,149 @@
* Copyright 2018 Alexander Nozik.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package scientifik.kmath.sequential
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.isActive
* A not-necessary-Markov chain of some type
* @param R - the chain element type
interface Chain<out R> {
* Last value of the chain. Returns null if [next] was not called
val value: R?
* Generate next value, changing state if needed
suspend fun next(): R
* Create a copy of current chain state. Consuming resulting chain does not affect initial chain
fun fork(): Chain<R>
* Chain as a coroutine receive channel
fun <R> Chain<R>.asChannel(scope: CoroutineScope): ReceiveChannel<R> = scope.produce { while (isActive) send(next()) }
fun <T> Iterator<T>.asChain(): Chain<T> = SimpleChain { next() }
fun <T> Sequence<T>.asChain(): Chain<T> = iterator().asChain()
* Map the chain result using suspended transformation. Initial chain result can no longer be safely consumed
* since mapped chain consumes tokens. Accepts regular transformation function
fun <T, R> Chain<T>.map(func: (T) -> R): Chain<R> {
val parent = this;
return object : Chain<R> {
override val value: R? get() = parent.value?.let(func)
override suspend fun next(): R {
return func(
override fun fork(): Chain<R> {
return parent.fork().map(func)
* A simple chain of independent tokens
class SimpleChain<out R>(private val gen: suspend () -> R) : Chain<R> {
private val atomicValue = atomic<R?>(null)
override val value: R? get() = atomicValue.value
override suspend fun next(): R = gen().also { atomicValue.lazySet(it) }
override fun fork(): Chain<R> = this
//TODO force forks on mapping operations?
* A stateless Markov chain
class MarkovChain<out R : Any>(private val seed: () -> R, private val gen: suspend (R) -> R) :
Chain<R> {
constructor(seed: R, gen: suspend (R) -> R) : this({ seed }, gen)
private val atomicValue by lazy { atomic(seed()) }
override val value: R get() = atomicValue.value
override suspend fun next(): R {
return value
override fun fork(): Chain<R> {
return MarkovChain(value, gen)
* A chain with possibly mutable state. The state must not be changed outside the chain. Two chins should never share the state
* @param S - the state of the chain
class StatefulChain<S, out R>(
private val state: S,
private val seed: S.() -> R,
private val gen: suspend S.(R) -> R
) : Chain<R> {
constructor(state: S, seed: R, gen: suspend S.(R) -> R) : this(state, { seed }, gen)
private val atomicValue by lazy { atomic(seed(state)) }
override val value: R get() = atomicValue.value
override suspend fun next(): R {
atomicValue.lazySet(gen(state, value))
return value
override fun fork(): Chain<R> {
throw RuntimeException("Fork not supported for stateful chain")
* A chain that repeats the same value
class ConstantChain<out T>(override val value: T) : Chain<T> {
override suspend fun next(): T {
return value
override fun fork(): Chain<T> {
return this

View File

@ -1,4 +1,4 @@
package scientifik.kmath.misc
package scientifik.kmath.sequential
import kotlin.jvm.JvmName

View File

@ -0,0 +1,21 @@
package scientifik.kmath.sequential
import scientifik.kmath.operations.Space
typealias Reducer<T, C, R> = (C, Iterable<T>) -> R
inline fun <T, C, R> Iterable<T>.reduce(context: C, crossinline reducer: Reducer<T, C, R>) =
reducer(context, this@reduce)
inline fun <T, C, R> Sequence<T>.reduce(context: C, crossinline reducer: Reducer<T, C, R>) =
asIterable().reduce(context, reducer)
inline fun <T, C, R> Array<T>.reduce(context: C, crossinline reducer: Reducer<T, C, R>) =
asIterable().reduce(context, reducer)
object Reducers {
fun <T : Any> mean(): Reducer<T, Space<T>, T> = { context, data ->
data.fold(GenericSum(context)) { sum, value -> sum.apply { push(value) } }.value

View File

@ -0,0 +1,41 @@
package scientifik.kmath.chains
import kotlinx.coroutines.runBlocking
import scientifik.kmath.sequential.Chain
import kotlin.sequences.Sequence
* Represent a chain as regular iterator (uses blocking calls)
operator fun <R> Chain<R>.iterator() = object : Iterator<R> {
override fun hasNext(): Boolean = true
override fun next(): R = runBlocking { next() }
* Represent a chain as a sequence
fun <R> Chain<R>.asSequence(): Sequence<R> = object : Sequence<R> {
override fun iterator(): Iterator<R> = this@asSequence.iterator()
* Map the chain result using suspended transformation. Initial chain result can no longer be safely consumed
* since mapped chain consumes tokens. Accepts suspending transformation function.
fun <T, R> Chain<T>.map(func: suspend (T) -> R): Chain<R> {
val parent = this;
return object : Chain<R> {
override val value: R? get() = runBlocking { parent.value?.let { func(it) } }
override suspend fun next(): R {
return func(
override fun fork(): Chain<R> {
return parent.fork().map(func)

View File

@ -1,9 +1,28 @@
buildscript {
dependencies {
repositories {
pluginManagement {
repositories {
//maven ("")
resolutionStrategy {
eachPlugin {
when ( {
"kotlinx-atomicfu" -> {
@ -15,5 +34,6 @@ include(