diff --git a/build.gradle.kts b/build.gradle.kts index 119ed11b9..f078c1de8 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,4 +1,4 @@ -val kmathVersion by extra("0.1.2-dev-2") +val kmathVersion by extra("0.1.2-dev-3") allprojects { // apply(plugin = "maven") diff --git a/buildSrc/src/main/kotlin/Versions.kt b/buildSrc/src/main/kotlin/Versions.kt index 5133cba48..8279b081a 100644 --- a/buildSrc/src/main/kotlin/Versions.kt +++ b/buildSrc/src/main/kotlin/Versions.kt @@ -3,7 +3,7 @@ // Also dependencies itself can be moved here object Versions { val ioVersion = "0.1.8" - val coroutinesVersion = "1.2.0" + val coroutinesVersion = "1.2.1" val atomicfuVersion = "0.12.6" // This version is not used and IDEA shows this property as unused val dokkaVersion = "0.9.18" diff --git a/kmath-commons/build.gradle.kts b/kmath-commons/build.gradle.kts index 6e3a62ef7..c610749be 100644 --- a/kmath-commons/build.gradle.kts +++ b/kmath-commons/build.gradle.kts @@ -7,7 +7,7 @@ description = "Commons math binding for kmath" dependencies { api(project(":kmath-core")) - api(project(":kmath-streaming")) + api(project(":kmath-coroutines")) api("org.apache.commons:commons-math3:3.6.1") testImplementation("org.jetbrains.kotlin:kotlin-test") testImplementation("org.jetbrains.kotlin:kotlin-test-junit") diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/structures/CoroutinesExtra.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/CoroutinesExtra.kt similarity index 87% rename from kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/structures/CoroutinesExtra.kt rename to kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/CoroutinesExtra.kt index f6f21af09..7580ef6ef 100644 --- a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/structures/CoroutinesExtra.kt +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/CoroutinesExtra.kt @@ -1,4 +1,4 @@ -package scientifik.kmath.structures +package scientifik.kmath import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope diff --git a/kmath-coroutines/src/commonMain/kotlin/sicentifik/kmath/chains/Chain.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/chains/Chain.kt similarity index 97% rename from kmath-coroutines/src/commonMain/kotlin/sicentifik/kmath/chains/Chain.kt rename to kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/chains/Chain.kt index 21c2c9eb3..4d2f604e1 100644 --- a/kmath-coroutines/src/commonMain/kotlin/sicentifik/kmath/chains/Chain.kt +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/chains/Chain.kt @@ -14,7 +14,7 @@ * limitations under the License. */ -package sicentifik.kmath.chains +package scientifik.kmath.chains import kotlinx.atomicfu.atomic import kotlinx.coroutines.FlowPreview @@ -26,7 +26,7 @@ import kotlinx.coroutines.FlowPreview */ interface Chain { /** - * Last value of the chain. Returns null if [next] was not called + * Last cached value of the chain. Returns null if [next] was not called */ val value: R? diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt new file mode 100644 index 000000000..f6c38e153 --- /dev/null +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferFlow.kt @@ -0,0 +1,95 @@ +package scientifik.kmath.streaming + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.produce +import kotlinx.coroutines.flow.* +import scientifik.kmath.structures.Buffer +import scientifik.kmath.structures.BufferFactory +import scientifik.kmath.structures.DoubleBuffer +import kotlin.coroutines.coroutineContext + +/** + * Create a [Flow] from buffer + */ +@FlowPreview +fun Buffer.asFlow() = iterator().asFlow() + +/** + * Flat map a [Flow] of [Buffer] into continuous [Flow] of elements + */ +@FlowPreview +fun Flow>.spread(): Flow = flatMapConcat { it.asFlow() } + +/** + * Collect incoming flow into fixed size chunks + */ +@FlowPreview +fun Flow.chunked(bufferSize: Int, bufferFactory: BufferFactory) = flow { + require(bufferSize > 0) { "Resulting chunk size must be more than zero" } + val list = ArrayList(bufferSize) + var counter = 0 + + this@chunked.collect { element -> + list.add(element) + counter++ + if (counter == bufferSize) { + val buffer = bufferFactory(bufferSize) { list[it] } + emit(buffer) + list.clear() + counter = 0 + } + } +} + +/** + * Specialized flow chunker for real buffer + */ +@FlowPreview +fun Flow.chunked(bufferSize: Int) = flow { + require(bufferSize > 0) { "Resulting chunk size must be more than zero" } + val array = DoubleArray(bufferSize) + var counter = 0 + + this@chunked.collect { element -> + array[counter] = element + counter++ + if (counter == bufferSize) { + val buffer = DoubleBuffer(array) + emit(buffer) + } + } +} + +/** + * Perform parallel mapping of flow elements + */ +@InternalCoroutinesApi +@ExperimentalCoroutinesApi +@FlowPreview +fun Flow.mapParallel(dispatcher: CoroutineDispatcher = Dispatchers.Default, bufferSize: Int = 16, transform: suspend (T) -> R) : Flow{ + require(bufferSize >= 0) { + "Buffer size should be positive, but was $bufferSize" + } + return flow { + coroutineScope { + val channel: ReceiveChannel> = produce(capacity = bufferSize) { + collect { value -> + send(async(dispatcher) { transform(value) }) + } + } + + // TODO semantics doesn't play well here and we pay for that with additional object + (channel as Job).invokeOnCompletion { if (it is CancellationException && it.cause == null) cancel() } + for (element in channel) { + emit(element.await()) + } + + val producer = channel as Job + if (producer.isCancelled) { + producer.join() + throw producer.getCancellationException() + } + } + } +} \ No newline at end of file diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferStreaming.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferStreaming.kt new file mode 100644 index 000000000..023199452 --- /dev/null +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/BufferStreaming.kt @@ -0,0 +1,82 @@ +package scientifik.kmath.streaming + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.produce +import kotlinx.coroutines.isActive +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import scientifik.kmath.structures.Buffer +import scientifik.kmath.structures.BufferFactory + +/** + * A processor that collects incoming elements into fixed size buffers + */ +@ExperimentalCoroutinesApi +class JoinProcessor( + scope: CoroutineScope, + bufferSize: Int, + bufferFactory: BufferFactory = Buffer.Companion::boxing +) : AbstractProcessor>(scope) { + + private val input = Channel(bufferSize) + + private val output = produce(coroutineContext) { + val list = ArrayList(bufferSize) + while (isActive) { + list.clear() + repeat(bufferSize) { + list.add(input.receive()) + } + val buffer = bufferFactory(bufferSize) { list[it] } + send(buffer) + } + } + + override suspend fun receive(): Buffer = output.receive() + + override suspend fun send(value: T) { + input.send(value) + } +} + +/** + * A processor that splits incoming buffers into individual elements + */ +class SplitProcessor(scope: CoroutineScope) : AbstractProcessor, T>(scope) { + + private val input = Channel>() + + private val mutex = Mutex() + + private var currentBuffer: Buffer? = null + + private var pos = 0 + + + override suspend fun receive(): T { + mutex.withLock { + while (currentBuffer == null || pos == currentBuffer!!.size) { + currentBuffer = input.receive() + pos = 0 + } + return currentBuffer!![pos].also { pos++ } + } + } + + override suspend fun send(value: Buffer) { + input.send(value) + } +} + +@ExperimentalCoroutinesApi +fun Producer.chunked(chunkSize: Int, bufferFactory: BufferFactory) = + JoinProcessor(this, chunkSize, bufferFactory).also { connect(it) } + +@ExperimentalCoroutinesApi +inline fun Producer.chunked(chunkSize: Int) = + JoinProcessor(this, chunkSize, Buffer.Companion::auto).also { connect(it) } + + + diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/RingBuffer.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/RingBuffer.kt new file mode 100644 index 000000000..ac64f921f --- /dev/null +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/RingBuffer.kt @@ -0,0 +1,89 @@ +package scientifik.kmath.streaming + +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import scientifik.kmath.structures.Buffer +import scientifik.kmath.structures.MutableBuffer +import scientifik.kmath.structures.VirtualBuffer + +/** + * Thread-safe ring buffer + */ +internal class RingBuffer( + private val buffer: MutableBuffer, + private var startIndex: Int = 0, + size: Int = 0 +) : Buffer { + + private val mutex = Mutex() + + override var size: Int = size + private set + + override fun get(index: Int): T { + require(index >= 0) { "Index must be positive" } + require(index < size) { "Index $index is out of circular buffer size $size" } + return buffer[startIndex.forward(index)] + } + + fun isFull() = size == buffer.size + + /** + * Iterator could provide wrong results if buffer is changed in initialization (iteration is safe) + */ + override fun iterator(): Iterator = object : AbstractIterator() { + private var count = size + private var index = startIndex + val copy = buffer.copy() + + override fun computeNext() { + if (count == 0) { + done() + } else { + setNext(copy[index]) + index = index.forward(1) + count-- + } + } + } + + /** + * A safe snapshot operation + */ + suspend fun snapshot(): Buffer { + mutex.withLock { + val copy = buffer.copy() + return VirtualBuffer(size) { i -> copy[startIndex.forward(i)] } + } + } + + suspend fun push(element: T) { + mutex.withLock { + buffer[startIndex.forward(size)] = element + if (isFull()) { + startIndex++ + } else { + size++ + } + } + } + + + @Suppress("NOTHING_TO_INLINE") + private inline fun Int.forward(n: Int): Int = (this + n) % (buffer.size) + + companion object { + inline fun build(size: Int, empty: T): RingBuffer { + val buffer = MutableBuffer.auto(size) { empty } + return RingBuffer(buffer) + } + + /** + * Slow yet universal buffer + */ + fun boxing(size: Int): RingBuffer { + val buffer: MutableBuffer = MutableBuffer.boxing(size) { null } + return RingBuffer(buffer) + } + } +} \ No newline at end of file diff --git a/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/Streaming.kt b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/Streaming.kt new file mode 100644 index 000000000..0382b6a89 --- /dev/null +++ b/kmath-coroutines/src/commonMain/kotlin/scientifik/kmath/streaming/Streaming.kt @@ -0,0 +1,273 @@ +package scientifik.kmath.streaming + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import scientifik.kmath.structures.Buffer +import kotlin.coroutines.CoroutineContext + +/** + * Initial chain block. Could produce an element sequence and be connected to single [Consumer] + * + * The general rule is that channel is created on first call. Also each element is responsible for its connection so + * while the connections are symmetric, the scope, used for making the connection is responsible for cancelation. + * + * Also connections are not reversible. Once connected block stays faithful until it finishes processing. + * Manually putting elements to connected block could lead to undetermined behavior and must be avoided. + */ +interface Producer : CoroutineScope { + fun connect(consumer: Consumer) + + suspend fun receive(): T + + val consumer: Consumer? + + val outputIsConnected: Boolean get() = consumer != null + + //fun close() +} + +/** + * Terminal chain block. Could consume an element sequence and be connected to signle [Producer] + */ +interface Consumer : CoroutineScope { + fun connect(producer: Producer) + + suspend fun send(value: T) + + val producer: Producer? + + val inputIsConnected: Boolean get() = producer != null + + //fun close() +} + +interface Processor : Consumer, Producer + +abstract class AbstractProducer(scope: CoroutineScope) : Producer { + override val coroutineContext: CoroutineContext = scope.coroutineContext + + override var consumer: Consumer? = null + protected set + + override fun connect(consumer: Consumer) { + //Ignore if already connected to specific consumer + if (consumer != this.consumer) { + if (outputIsConnected) error("The output slot of producer is occupied") + if (consumer.inputIsConnected) error("The input slot of consumer is occupied") + this.consumer = consumer + if (consumer.producer != null) { + //No need to save the job, it will be canceled on scope cancel + connectOutput(consumer) + // connect back, consumer is already set so no circular reference + consumer.connect(this) + } else error("Unreachable statement") + } + } + + protected open fun connectOutput(consumer: Consumer) { + launch { + while (this.isActive) { + consumer.send(receive()) + } + } + } +} + +abstract class AbstractConsumer(scope: CoroutineScope) : Consumer { + override val coroutineContext: CoroutineContext = scope.coroutineContext + + override var producer: Producer? = null + protected set + + override fun connect(producer: Producer) { + //Ignore if already connected to specific consumer + if (producer != this.producer) { + if (inputIsConnected) error("The input slot of consumer is occupied") + if (producer.outputIsConnected) error("The input slot of producer is occupied") + this.producer = producer + //No need to save the job, it will be canceled on scope cancel + if (producer.consumer != null) { + connectInput(producer) + // connect back + producer.connect(this) + } else error("Unreachable statement") + } + } + + protected open fun connectInput(producer: Producer) { + launch { + while (isActive) { + send(producer.receive()) + } + } + } +} + +abstract class AbstractProcessor(scope: CoroutineScope) : Processor, AbstractProducer(scope) { + + override var producer: Producer? = null + protected set + + override fun connect(producer: Producer) { + //Ignore if already connected to specific consumer + if (producer != this.producer) { + if (inputIsConnected) error("The input slot of consumer is occupied") + if (producer.outputIsConnected) error("The input slot of producer is occupied") + this.producer = producer + //No need to save the job, it will be canceled on scope cancel + if (producer.consumer != null) { + connectInput(producer) + // connect back + producer.connect(this) + } else error("Unreachable statement") + } + } + + protected open fun connectInput(producer: Producer) { + launch { + while (isActive) { + send(producer.receive()) + } + } + } +} + +/** + * A simple [produce]-based producer + */ +@ExperimentalCoroutinesApi +class GenericProducer( + scope: CoroutineScope, + capacity: Int = Channel.UNLIMITED, + block: suspend ProducerScope.() -> Unit +) : AbstractProducer(scope) { + + private val channel: ReceiveChannel by lazy { produce(capacity = capacity, block = block) } + + override suspend fun receive(): T = channel.receive() +} + +/** + * A simple pipeline [Processor] block + */ +class PipeProcessor( + scope: CoroutineScope, + capacity: Int = Channel.RENDEZVOUS, + process: suspend (T) -> R +) : AbstractProcessor(scope) { + + private val input = Channel(capacity) + private val output: ReceiveChannel = input.map(coroutineContext, process) + + override suspend fun receive(): R = output.receive() + + override suspend fun send(value: T) { + input.send(value) + } +} + + +/** + * A moving window [Processor] with circular buffer + */ +class WindowedProcessor( + scope: CoroutineScope, + window: Int, + val process: suspend (Buffer) -> R +) : AbstractProcessor(scope) { + + private val ringBuffer = RingBuffer.boxing(window) + + private val channel = Channel(Channel.RENDEZVOUS) + + override suspend fun receive(): R { + return channel.receive() + } + + override suspend fun send(value: T) { + ringBuffer.push(value) + channel.send(process(ringBuffer.snapshot())) + } +} + +/** + * Thread-safe aggregator of values from input. The aggregator does not store all incoming values, it uses fold procedure + * to incorporate them into state on-arrival. + * The current aggregated state could be accessed by [state]. The input channel is inactive unless requested + * @param T - the type of the input element + * @param S - the type of the aggregator + */ +class Reducer( + scope: CoroutineScope, + initialState: S, + val fold: suspend (S, T) -> S +) : AbstractConsumer(scope) { + + var state: S = initialState + private set + + private val mutex = Mutex() + + override suspend fun send(value: T) = mutex.withLock { + state = fold(state, value) + } +} + +/** + * Collector that accumulates all values in a list. List could be accessed from non-suspending environment via [list] value. + */ +class Collector(scope: CoroutineScope) : AbstractConsumer(scope) { + + private val _list = ArrayList() + private val mutex = Mutex() + val list: List get() = _list + + override suspend fun send(value: T) { + mutex.withLock { + _list.add(value) + } + } +} + +/** + * Convert a sequence to [Producer] + */ +fun Sequence.produce(scope: CoroutineScope = GlobalScope) = + GenericProducer(scope) { forEach { send(it) } } + +/** + * Convert a [ReceiveChannel] to [Producer] + */ +fun ReceiveChannel.produce(scope: CoroutineScope = GlobalScope) = + GenericProducer(scope) { for (e in this@produce) send(e) } + + +fun > Producer.consumer(consumerFactory: () -> C): C = + consumerFactory().also { connect(it) } + +fun Producer.map(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) = + PipeProcessor(this, capacity, process).also { connect(it) } + +/** + * Create a reducer and connect this producer to reducer + */ +fun Producer.reduce(initialState: S, fold: suspend (S, T) -> S) = + Reducer(this, initialState, fold).also { connect(it) } + +/** + * Create a [Collector] and attach it to this [Producer] + */ +fun Producer.collect() = + Collector(this).also { connect(it) } + +fun > Producer.process(processorBuilder: () -> P): P = + processorBuilder().also { connect(it) } + +fun Producer.process(capacity: Int = Channel.RENDEZVOUS, process: suspend (T) -> R) = + PipeProcessor(this, capacity, process).also { connect(it) } + + +fun Producer.windowed(window: Int, process: suspend (Buffer) -> R) = + WindowedProcessor(this, window, process).also { connect(it) } \ No newline at end of file diff --git a/kmath-coroutines/src/jvmMain/kotlin/scientifik/kmath/chains/ChainExt.kt b/kmath-coroutines/src/jvmMain/kotlin/scientifik/kmath/chains/ChainExt.kt index e23748bdb..4f2c4cb8b 100644 --- a/kmath-coroutines/src/jvmMain/kotlin/scientifik/kmath/chains/ChainExt.kt +++ b/kmath-coroutines/src/jvmMain/kotlin/scientifik/kmath/chains/ChainExt.kt @@ -1,7 +1,6 @@ package scientifik.kmath.chains import kotlinx.coroutines.runBlocking -import sicentifik.kmath.chains.Chain import kotlin.sequences.Sequence /** diff --git a/kmath-coroutines/src/jvmMain/kotlin/scientifik/kmath/structures/LazyNDStructure.kt b/kmath-coroutines/src/jvmMain/kotlin/scientifik/kmath/structures/LazyNDStructure.kt index b4832827d..fa81fc67b 100644 --- a/kmath-coroutines/src/jvmMain/kotlin/scientifik/kmath/structures/LazyNDStructure.kt +++ b/kmath-coroutines/src/jvmMain/kotlin/scientifik/kmath/structures/LazyNDStructure.kt @@ -1,6 +1,7 @@ package scientifik.kmath.structures import kotlinx.coroutines.* +import scientifik.kmath.Math class LazyNDStructure( val scope: CoroutineScope, diff --git a/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/BufferFlowTest.kt b/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/BufferFlowTest.kt new file mode 100644 index 000000000..4dd0700d1 --- /dev/null +++ b/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/BufferFlowTest.kt @@ -0,0 +1,27 @@ +package scientifik.kmath.streaming + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.runBlocking +import org.junit.Test + +@InternalCoroutinesApi +class BufferFlowTest { + + @Test(timeout = 2000) + fun mapParallel() { + runBlocking { + (1..20).asFlow().mapParallel { + Thread.sleep(200) + it + }.collect { + println("Completed $it") + } + } + } + +} \ No newline at end of file diff --git a/kmath-streaming/src/jvmTest/kotlin/scientifik/kmath/streaming/RingBufferTest.kt b/kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/RingBufferTest.kt similarity index 100% rename from kmath-streaming/src/jvmTest/kotlin/scientifik/kmath/streaming/RingBufferTest.kt rename to kmath-coroutines/src/jvmTest/kotlin/scientifik/kmath/streaming/RingBufferTest.kt diff --git a/settings.gradle.kts b/settings.gradle.kts index 32ac77e48..690d80b59 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -26,7 +26,6 @@ include( ":kmath-histograms", ":kmath-commons", ":kmath-koma", - ":kmath-streaming", ":kmath-prob", ":benchmarks" )