Chain implements Flow

This commit is contained in:
Alexander Nozik 2020-04-26 20:04:15 +03:00
parent c6640b4d31
commit 28ef591524
12 changed files with 38 additions and 25 deletions

View File

@ -1,8 +1,8 @@
plugins {
id("scientifik.publish") version "0.4.1" apply false
id("scientifik.publish") version "0.4.2" apply false
}
val kmathVersion by extra("0.1.4-dev-2")
val kmathVersion by extra("0.1.4-dev-3")
val bintrayRepo by extra("scientifik")
val githubProject by extra("kmath")

Binary file not shown.

View File

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.0-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.3-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

3
gradlew.bat vendored
View File

@ -29,6 +29,9 @@ if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@rem Resolve any "." and ".." in APP_HOME to make it shorter.
for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"

View File

@ -1,7 +1,7 @@
package scientifik.kmath.commons.expressions
import org.junit.Test
import scientifik.kmath.expressions.invoke
import kotlin.test.Test
import kotlin.test.assertEquals
inline fun <R> diff(order: Int, vararg parameters: Pair<String, Double>, block: DerivativeStructureField.() -> R) =

View File

@ -41,16 +41,6 @@ fun <T : Any> Structure2D.Companion.square(vararg elements: T): FeaturedMatrix<T
return BufferMatrix(size, size, buffer)
}
fun <T : Any> Structure2D.Companion.build(rows: Int, columns: Int): MatrixBuilder<T> = MatrixBuilder(rows, columns)
class MatrixBuilder<T : Any>(val rows: Int, val columns: Int) {
operator fun invoke(vararg elements: T): FeaturedMatrix<T> {
if (rows * columns != elements.size) error("The number of elements ${elements.size} is not equal $rows * $columns")
val buffer = elements.asBuffer()
return BufferMatrix(rows, columns, buffer)
}
}
val Matrix<*>.features get() = (this as? FeaturedMatrix)?.features?: emptySet()
/**

View File

@ -0,0 +1,14 @@
package scientifik.kmath.linear
import scientifik.kmath.structures.Structure2D
import scientifik.kmath.structures.asBuffer
class MatrixBuilder<T : Any>(val rows: Int, val columns: Int) {
operator fun invoke(vararg elements: T): FeaturedMatrix<T> {
if (rows * columns != elements.size) error("The number of elements ${elements.size} is not equal $rows * $columns")
val buffer = elements.asBuffer()
return BufferMatrix(rows, columns, buffer)
}
}
fun <T : Any> Structure2D.Companion.build(rows: Int, columns: Int): MatrixBuilder<T> = MatrixBuilder(rows, columns)

View File

@ -16,7 +16,9 @@
package scientifik.kmath.chains
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
@ -25,7 +27,7 @@ import kotlinx.coroutines.sync.withLock
* A not-necessary-Markov chain of some type
* @param R - the chain element type
*/
interface Chain<out R> {
interface Chain<out R>: Flow<R> {
/**
* Generate next value, changing state if needed
*/
@ -36,14 +38,15 @@ interface Chain<out R> {
*/
fun fork(): Chain<R>
@InternalCoroutinesApi
override suspend fun collect(collector: FlowCollector<R>) {
kotlinx.coroutines.flow.flow { while (true) emit(next()) }.collect(collector)
}
companion object
}
/**
* Chain as a coroutine flow. The flow emit affects chain state and vice versa
*/
fun <R> Chain<R>.flow(): Flow<R> = kotlinx.coroutines.flow.flow { while (true) emit(next()) }
fun <T> Iterator<T>.asChain(): Chain<T> = SimpleChain { next() }
fun <T> Sequence<T>.asChain(): Chain<T> = iterator().asChain()

View File

@ -3,11 +3,12 @@ package scientifik.kmath.streaming
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import org.junit.Test
import org.junit.jupiter.api.Timeout
import scientifik.kmath.coroutines.async
import scientifik.kmath.coroutines.collect
import scientifik.kmath.coroutines.mapParallel
import java.util.concurrent.Executors
import kotlin.test.Test
@ExperimentalCoroutinesApi
@ -17,7 +18,8 @@ class BufferFlowTest {
val dispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
@Test(timeout = 2000)
@Test
@Timeout(2000)
fun map() {
runBlocking {
(1..20).asFlow().mapParallel( dispatcher) {
@ -31,7 +33,8 @@ class BufferFlowTest {
}
}
@Test(timeout = 2000)
@Test
@Timeout(2000)
fun async() {
runBlocking {
(1..20).asFlow().async(dispatcher) {

View File

@ -2,8 +2,8 @@ package scientifik.kmath.streaming
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import org.junit.Test
import scientifik.kmath.structures.asSequence
import kotlin.test.Test
import kotlin.test.assertEquals
class RingBufferTest {

View File

@ -3,7 +3,7 @@ package scientifik.kmath.prob
import kotlinx.coroutines.flow.drop
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.runBlocking
import scientifik.kmath.chains.flow
import scientifik.kmath.streaming.chunked
import kotlin.test.Test
@ -13,7 +13,7 @@ class StatisticTest {
//Create a stateless chain from generator.
val data = generator.chain { nextDouble() }
//Convert a chaint to Flow and break it into chunks.
val chunked = data.flow().chunked(1000)
val chunked = data.chunked(1000)
@Test
fun testParallelMean() {