From a546552540ccdad0607b14540dcb8682832e33c6 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Tue, 10 May 2022 14:45:58 +0300 Subject: [PATCH] Replace sequences by iterators in `DataSet` --- CHANGELOG.md | 1 + build.gradle.kts | 2 +- .../{CachingAction.kt => AbstractAction.kt} | 40 ++++++---- .../kscience/dataforge/actions/MapAction.kt | 76 +++++++------------ .../dataforge/actions/ReduceAction.kt | 11 ++- .../kscience/dataforge/actions/SplitAction.kt | 67 +++++++--------- .../space/kscience/dataforge/data/DataSet.kt | 43 +++++++---- .../kscience/dataforge/data/DataSetBuilder.kt | 4 +- .../space/kscience/dataforge/data/DataTree.kt | 4 +- ...ataSourceBuilder.kt => DataTreeBuilder.kt} | 54 +++++++------ .../kscience/dataforge/data/GroupRule.kt | 10 ++- .../kscience/dataforge/data/StaticDataTree.kt | 2 +- .../kscience/dataforge/data/dataFilter.kt | 24 ++++-- .../kscience/dataforge/data/dataTransform.kt | 31 ++++---- .../kscience/dataforge/data/dataFilterJvm.kt | 14 ++-- .../dataforge/data/dataSetBuilderInContext.kt | 2 +- .../kscience/dataforge/data/ActionsTest.kt | 5 +- .../dataforge/data/DataTreeBuilderTest.kt | 4 +- .../space/kscience/dataforge/meta/Meta.kt | 1 + .../dataforge/workspace/TaskResult.kt | 9 ++- .../kscience/dataforge/workspace/Workspace.kt | 3 +- .../dataforge/workspace/WorkspaceBuilder.kt | 6 +- .../dataforge/workspace/workspaceJvm.kt | 2 + .../workspace/DataPropagationTest.kt | 31 ++++---- .../workspace/SimpleWorkspaceTest.kt | 25 +++--- 25 files changed, 242 insertions(+), 229 deletions(-) rename dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/{CachingAction.kt => AbstractAction.kt} (52%) rename dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/{DataSourceBuilder.kt => DataTreeBuilder.kt} (66%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 58f82326..17b972cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - PartialEnvelope uses `Int` instead `UInt`. - `ActiveDataSet` renamed to `DataSource` - `selectOne`->`getByType` +- Data traversal in `DataSet` is done via iterator ### Deprecated diff --git a/build.gradle.kts b/build.gradle.kts index f8072b6a..a7560698 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -4,7 +4,7 @@ plugins { allprojects { group = "space.kscience" - version = "0.6.0-dev-6" + version = "0.6.0-dev-7" } subprojects { diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/CachingAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/AbstractAction.kt similarity index 52% rename from dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/CachingAction.kt rename to dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/AbstractAction.kt index 469b6e60..e7bbe6f6 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/CachingAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/AbstractAction.kt @@ -3,6 +3,7 @@ package space.kscience.dataforge.actions import kotlinx.coroutines.launch import space.kscience.dataforge.data.* import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.misc.DFInternal import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.startsWith import kotlin.reflect.KType @@ -18,36 +19,47 @@ internal fun MutableMap.removeWhatStartsWith(name: Name) { /** * An action that caches results on-demand and recalculates them on source push */ -public abstract class CachingAction( +public abstract class AbstractAction( public val outputType: KType, ) : Action { - protected abstract fun transform( - set: DataSet, + /** + * Generate initial content of the output + */ + protected abstract fun DataSetBuilder.generate( + data: DataSet, meta: Meta, - key: Name = Name.EMPTY, - ): Sequence> + ) + /** + * Update part of the data set when given [updateKey] is triggered by the source + */ + protected open fun DataSourceBuilder.update( + dataSet: DataSet, + meta: Meta, + updateKey: Name, + ) { + // By default, recalculate the whole dataset + generate(dataSet, meta) + } + + @OptIn(DFInternal::class) override fun execute( dataSet: DataSet, meta: Meta, ): DataSet = if (dataSet is DataSource) { - DataSourceBuilder(outputType, dataSet.coroutineContext).apply { - populateFrom(transform(dataSet, meta)) + DataSource(outputType, dataSet){ + generate(dataSet, meta) launch { - dataSet.updates.collect { - //clear old nodes - remove(it) - //collect new items - populateFrom(transform(dataSet, meta, it)) - //FIXME if the target is data, updates are fired twice + dataSet.updates.collect { name -> + update(dataSet, meta, name) } } } } else { DataTree(outputType) { - populateFrom(transform(dataSet, meta)) + generate(dataSet, meta) } } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt index 91f58c95..883b3928 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt @@ -1,6 +1,5 @@ package space.kscience.dataforge.actions -import kotlinx.coroutines.launch import space.kscience.dataforge.data.* import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.MutableMeta @@ -53,61 +52,44 @@ public class MapActionBuilder( } @PublishedApi -internal class MapAction( - private val outputType: KType, +internal class MapAction( + outputType: KType, private val block: MapActionBuilder.() -> Unit, -) : Action { +) : AbstractAction(outputType) { - override fun execute( - dataSet: DataSet, - meta: Meta, - ): DataSet { + private fun DataSetBuilder.mapOne(name: Name, data: Data, meta: Meta) { + // Creating a new environment for action using **old** name, old meta and task meta + val env = ActionEnv(name, data.meta, meta) - fun mapOne(data: NamedData): NamedData { - // Creating a new environment for action using **old** name, old meta and task meta - val env = ActionEnv(data.name, data.meta, meta) + //applying transformation from builder + val builder = MapActionBuilder( + name, + data.meta.toMutableMeta(), // using data meta + meta, + outputType + ).apply(block) - //applying transformation from builder - val builder = MapActionBuilder( - data.name, - data.meta.toMutableMeta(), // using data meta - meta, - outputType - ).apply(block) + //getting new name + val newName = builder.name - //getting new name - val newName = builder.name + //getting new meta + val newMeta = builder.meta.seal() - //getting new meta - val newMeta = builder.meta.seal() - - @OptIn(DFInternal::class) - val newData = Data(builder.outputType, newMeta, dependencies = listOf(data)) { - builder.result(env, data.await()) - } - //setting the data node - return newData.named(newName) + @OptIn(DFInternal::class) + val newData = Data(builder.outputType, newMeta, dependencies = listOf(data)) { + builder.result(env, data.await()) } + //setting the data node + data(newName, newData) + } - val sequence = dataSet.traverse().map(::mapOne) + override fun DataSetBuilder.generate(data: DataSet, meta: Meta) { + data.forEach { mapOne(it.name, it.data, meta) } + } - return if (dataSet is DataSource ) { - ActiveDataTree(outputType, dataSet) { - populateFrom(sequence) - launch { - dataSet.updates.collect { name -> - //clear old nodes - remove(name) - //collect new items - populateFrom(dataSet.children(name).map(::mapOne)) - } - } - } - } else { - DataTree(outputType) { - populateFrom(sequence) - } - } + override fun DataSourceBuilder.update(dataSet: DataSet, meta: Meta, updateKey: Name) { + remove(updateKey) + dataSet[updateKey]?.let { mapOne(updateKey, it, meta) } } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt index d3be1ce1..1af3d5ec 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt @@ -81,13 +81,12 @@ public class ReduceGroupBuilder( internal class ReduceAction( outputType: KType, private val action: ReduceGroupBuilder.() -> Unit, -) : CachingAction(outputType) { +) : AbstractAction(outputType) { //TODO optimize reduction. Currently, the whole action recalculates on push - - override fun transform(set: DataSet, meta: Meta, key: Name): Sequence> = sequence { - ReduceGroupBuilder(meta, outputType).apply(action).buildGroups(set).forEach { group -> - val dataFlow: Map> = group.set.traverse().fold(HashMap()) { acc, value -> + override fun DataSetBuilder.generate(data: DataSet, meta: Meta) { + ReduceGroupBuilder(meta, outputType).apply(action).buildGroups(data).forEach { group -> + val dataFlow: Map> = group.set.asSequence().fold(HashMap()) { acc, value -> acc.apply { acc[value.name] = value.data } @@ -103,7 +102,7 @@ internal class ReduceAction( meta = groupMeta ) { group.result.invoke(env, it) } - yield(res.named(env.name)) + data(env.name, res) } } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt index ee778f6d..b71c5fc5 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt @@ -1,13 +1,11 @@ package space.kscience.dataforge.actions -import kotlinx.coroutines.launch import space.kscience.dataforge.data.* import space.kscience.dataforge.meta.Laminate import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.MutableMeta import space.kscience.dataforge.meta.toMutableMeta import space.kscience.dataforge.misc.DFExperimental -import space.kscience.dataforge.misc.DFInternal import space.kscience.dataforge.names.Name import kotlin.collections.set import kotlin.reflect.KType @@ -46,53 +44,42 @@ public class SplitBuilder(public val name: Name, public val me */ @PublishedApi internal class SplitAction( - private val outputType: KType, + outputType: KType, private val action: SplitBuilder.() -> Unit, -) : Action { +) : AbstractAction(outputType) { - override fun execute( - dataSet: DataSet, - meta: Meta, - ): DataSet { + private fun DataSetBuilder.splitOne(name: Name, data: Data, meta: Meta) { + val laminate = Laminate(data.meta, meta) - fun splitOne(data: NamedData): Sequence> { - val laminate = Laminate(data.meta, meta) - - val split = SplitBuilder(data.name, data.meta).apply(action) + val split = SplitBuilder(name, data.meta).apply(action) - // apply individual fragment rules to result - return split.fragments.entries.asSequence().map { (fragmentName, rule) -> - val env = SplitBuilder.FragmentRule( - fragmentName, - laminate.toMutableMeta(), - outputType - ).apply(rule) - //data.map(outputType, meta = env.meta) { env.result(it) }.named(fragmentName) - @OptIn(DFInternal::class) Data(outputType, meta = env.meta, dependencies = listOf(data)) { + // apply individual fragment rules to result + split.fragments.forEach { (fragmentName, rule) -> + val env = SplitBuilder.FragmentRule( + fragmentName, + laminate.toMutableMeta(), + outputType + ).apply(rule) + //data.map(outputType, meta = env.meta) { env.result(it) }.named(fragmentName) + + data( + fragmentName, + @Suppress("OPT_IN_USAGE") Data(outputType, meta = env.meta, dependencies = listOf(data)) { env.result(data.await()) - }.named(fragmentName) - } - } - - return if (dataSet is DataSource) { - ActiveDataTree(outputType, dataSet) { - populateFrom(dataSet.traverse().flatMap(transform = ::splitOne)) - launch { - dataSet.updates.collect { name -> - //clear old nodes - remove(name) - //collect new items - populateFrom(dataSet.children(name).flatMap(transform = ::splitOne)) - } } - } - } else { - DataTree(outputType) { - populateFrom(dataSet.traverse().flatMap(transform = ::splitOne)) - } + ) } } + + override fun DataSetBuilder.generate(data: DataSet, meta: Meta) { + data.forEach { splitOne(it.name, it.data, meta) } + } + + override fun DataSourceBuilder.update(dataSet: DataSet, meta: Meta, updateKey: Name) { + remove(updateKey) + dataSet[updateKey]?.let { splitOne(updateKey, it, meta) } + } } /** diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt index 3a3497b3..cf327c9b 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt @@ -7,7 +7,10 @@ import kotlinx.coroutines.flow.mapNotNull import space.kscience.dataforge.data.Data.Companion.TYPE_OF_NOTHING import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.set -import space.kscience.dataforge.names.* +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.names.endsWith +import space.kscience.dataforge.names.parseAsName import kotlin.reflect.KType public interface DataSet { @@ -25,7 +28,7 @@ public interface DataSet { /** * Traverse this [DataSet] returning named data instances. The order is not guaranteed. */ - public fun traverse(): Sequence> + public operator fun iterator(): Iterator> /** * Get data with given name. @@ -42,19 +45,27 @@ public interface DataSet { override val dataType: KType = TYPE_OF_NOTHING override val meta: Meta get() = Meta.EMPTY - override fun traverse(): Sequence> = emptySequence() + override fun iterator(): Iterator> = emptySequence>().iterator() override fun get(name: Name): Data? = null } } } +public fun DataSet.asSequence(): Sequence> = object : Sequence> { + override fun iterator(): Iterator> = this@asSequence.iterator() +} + +public fun DataSet.asIterable(): Iterable> = object : Iterable> { + override fun iterator(): Iterator> = this@asIterable.iterator() +} + public operator fun DataSet.get(name: String): Data? = get(name.parseAsName()) /** * A [DataSet] with propagated updates. */ -public interface DataSource : DataSet, CoroutineScope { +public interface DataSource : DataSet, CoroutineScope { /** * A flow of updated item names. Updates are propagated in a form of [Flow] of names of updated nodes. @@ -73,28 +84,28 @@ public interface DataSource : DataSet, CoroutineScope { } public val DataSet.updates: Flow get() = if (this is DataSource) updates else emptyFlow() - -/** - * Flow all data nodes with names starting with [branchName] - */ -public fun DataSet.children(branchName: Name): Sequence> = - this@children.traverse().filter { - it.name.startsWith(branchName) - } +// +///** +// * Flow all data nodes with names starting with [branchName] +// */ +//public fun DataSet.children(branchName: Name): Sequence> = +// this@children.asSequence().filter { +// it.name.startsWith(branchName) +// } /** * Start computation for all goals in data node and return a job for the whole node */ public fun DataSet.startAll(coroutineScope: CoroutineScope): Job = coroutineScope.launch { - traverse().map { + asIterable().map { it.launch(this@launch) - }.toList().joinAll() + }.joinAll() } public suspend fun DataSet.join(): Unit = coroutineScope { startAll(this).join() } -public suspend fun DataSet<*>.toMeta(): Meta = Meta { - traverse().forEach { +public fun DataSet<*>.toMeta(): Meta = Meta { + forEach { if (it.name.endsWith(DataSet.META_KEY)) { set(it.name, it.meta) } else { diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSetBuilder.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSetBuilder.kt index ca50019a..f9f14f37 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSetBuilder.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSetBuilder.kt @@ -28,7 +28,7 @@ public interface DataSetBuilder { } //Set new items - dataSet.traverse().forEach { + dataSet.forEach { data(name + it.name, it.data) } } @@ -146,7 +146,7 @@ public inline fun DataSetBuilder.static( */ @DFExperimental public fun DataSetBuilder.populateFrom(tree: DataSet): Unit { - tree.traverse().forEach { + tree.forEach { //TODO check if the place is occupied data(it.name, it.data) } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataTree.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataTree.kt index 0540d6f6..79a44b01 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataTree.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataTree.kt @@ -39,12 +39,12 @@ public interface DataTree : DataSet { override val meta: Meta get() = items[META_ITEM_NAME_TOKEN]?.meta ?: Meta.EMPTY - override fun traverse(): Sequence> = sequence { + override fun iterator(): Iterator> = iterator { items.forEach { (token, childItem: DataTreeItem) -> if (!token.body.startsWith("@")) { when (childItem) { is DataTreeItem.Leaf -> yield(childItem.data.named(token.asName())) - is DataTreeItem.Node -> yieldAll(childItem.tree.traverse().map { it.named(token + it.name) }) + is DataTreeItem.Node -> yieldAll(childItem.tree.asSequence().map { it.named(token + it.name) }) } } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSourceBuilder.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataTreeBuilder.kt similarity index 66% rename from dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSourceBuilder.kt rename to dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataTreeBuilder.kt index 23b44aa0..675d6566 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSourceBuilder.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataTreeBuilder.kt @@ -3,9 +3,9 @@ package space.kscience.dataforge.data import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.launch import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.misc.DFInternal import space.kscience.dataforge.names.* import kotlin.collections.set import kotlin.coroutines.CoroutineContext @@ -14,13 +14,19 @@ import kotlin.jvm.Synchronized import kotlin.reflect.KType import kotlin.reflect.typeOf +public interface DataSourceBuilder : DataSetBuilder, DataSource { + override val updates: MutableSharedFlow +} + /** * A mutable [DataTree] that propagates updates */ -public class DataSourceBuilder( +@PublishedApi +internal class DataTreeBuilder( override val dataType: KType, coroutineContext: CoroutineContext, -) : DataTree, DataSetBuilder, DataSource { +) : DataTree, DataSourceBuilder { + override val coroutineContext: CoroutineContext = coroutineContext + Job(coroutineContext[Job]) + GoalExecutionRestriction() @@ -29,23 +35,20 @@ public class DataSourceBuilder( override val items: Map> get() = treeItems.filter { !it.key.body.startsWith("@") } - private val _updates = MutableSharedFlow() - - override val updates: SharedFlow - get() = _updates + override val updates = MutableSharedFlow() @Synchronized private fun remove(token: NameToken) { if (treeItems.remove(token) != null) { launch { - _updates.emit(token.asName()) + updates.emit(token.asName()) } } } override fun remove(name: Name) { if (name.isEmpty()) error("Can't remove the root node") - (getItem(name.cutLast()).tree as? DataSourceBuilder)?.remove(name.lastOrNull()!!) + (getItem(name.cutLast()).tree as? DataTreeBuilder)?.remove(name.lastOrNull()!!) } @Synchronized @@ -58,11 +61,11 @@ public class DataSourceBuilder( treeItems[token] = DataTreeItem.Node(node) } - private fun getOrCreateNode(token: NameToken): DataSourceBuilder = - (treeItems[token] as? DataTreeItem.Node)?.tree as? DataSourceBuilder - ?: DataSourceBuilder(dataType, coroutineContext).also { set(token, it) } + private fun getOrCreateNode(token: NameToken): DataTreeBuilder = + (treeItems[token] as? DataTreeItem.Node)?.tree as? DataTreeBuilder + ?: DataTreeBuilder(dataType, coroutineContext).also { set(token, it) } - private fun getOrCreateNode(name: Name): DataSourceBuilder = when (name.length) { + private fun getOrCreateNode(name: Name): DataTreeBuilder = when (name.length) { 0 -> this 1 -> getOrCreateNode(name.firstOrNull()!!) else -> getOrCreateNode(name.firstOrNull()!!).getOrCreateNode(name.cutFirst()) @@ -79,7 +82,7 @@ public class DataSourceBuilder( } } launch { - _updates.emit(name) + updates.emit(name) } } @@ -91,32 +94,39 @@ public class DataSourceBuilder( } /** - * Create a dynamic tree. Initial data is placed synchronously. + * Create a dynamic [DataSource]. Initial data is placed synchronously. */ +@DFInternal @Suppress("FunctionName") -public fun ActiveDataTree( +public fun DataSource( type: KType, parent: CoroutineScope, block: DataSourceBuilder.() -> Unit, -): DataSourceBuilder { - val tree = DataSourceBuilder(type, parent.coroutineContext) +): DataSource { + val tree = DataTreeBuilder(type, parent.coroutineContext) tree.block() return tree } +@Suppress("OPT_IN_USAGE","FunctionName") +public inline fun DataSource( + parent: CoroutineScope, + crossinline block: DataSourceBuilder.() -> Unit, +): DataSource = DataSource(typeOf(), parent) { block() } + @Suppress("FunctionName") -public suspend inline fun ActiveDataTree( +public suspend inline fun DataSource( crossinline block: DataSourceBuilder.() -> Unit = {}, -): DataSourceBuilder = DataSourceBuilder(typeOf(), coroutineContext).apply { block() } +): DataSourceBuilder = DataTreeBuilder(typeOf(), coroutineContext).apply { block() } public inline fun DataSourceBuilder.emit( name: Name, parent: CoroutineScope, noinline block: DataSourceBuilder.() -> Unit, -): Unit = node(name, ActiveDataTree(typeOf(), parent, block)) +): Unit = node(name, DataSource(parent, block)) public inline fun DataSourceBuilder.emit( name: String, parent: CoroutineScope, noinline block: DataSourceBuilder.() -> Unit, -): Unit = node(Name.parse(name), ActiveDataTree(typeOf(), parent, block)) +): Unit = node(Name.parse(name), DataSource(parent, block)) diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt index 1c4787f6..189087a3 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt @@ -18,6 +18,7 @@ package space.kscience.dataforge.data import kotlinx.coroutines.launch import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.string +import space.kscience.dataforge.misc.DFInternal public interface GroupRule { public fun gather(set: DataSet): Map> @@ -31,6 +32,7 @@ public interface GroupRule { * @param defaultTagValue * @return */ + @OptIn(DFInternal::class) public fun byMetaValue( key: String, defaultTagValue: String, @@ -42,9 +44,9 @@ public interface GroupRule { val map = HashMap>() if (set is DataSource) { - set.traverse().forEach { data -> + set.forEach { data -> val tagValue: String = data.meta[key]?.string ?: defaultTagValue - (map.getOrPut(tagValue) { DataSourceBuilder(set.dataType, set.coroutineContext) } as DataSourceBuilder) + (map.getOrPut(tagValue) { DataTreeBuilder(set.dataType, set.coroutineContext) } as DataTreeBuilder) .data(data.name, data.data) set.launch { @@ -53,7 +55,7 @@ public interface GroupRule { val updateTagValue = dataUpdate?.meta?.get(key)?.string ?: defaultTagValue map.getOrPut(updateTagValue) { - ActiveDataTree(set.dataType, this) { + DataSource(set.dataType, this) { data(name, dataUpdate) } } @@ -61,7 +63,7 @@ public interface GroupRule { } } } else { - set.traverse().forEach { data -> + set.forEach { data -> val tagValue: String = data.meta[key]?.string ?: defaultTagValue (map.getOrPut(tagValue) { StaticDataTree(set.dataType) } as StaticDataTree) .data(data.name, data.data) diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt index 04a1ebee..4f0f455e 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt @@ -52,7 +52,7 @@ internal class StaticDataTree( if (dataSet is StaticDataTree) { set(name, DataTreeItem.Node(dataSet)) } else { - dataSet.traverse().forEach { + dataSet.forEach { data(name + it.name, it.data) } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataFilter.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataFilter.kt index d186c636..e405f6c5 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataFilter.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataFilter.kt @@ -30,8 +30,13 @@ public fun DataSet.filter( override val meta: Meta get() = this@filter.meta - override fun traverse(): Sequence> = - this@filter.traverse().filter { predicate(it.name, it.meta) } + override fun iterator(): Iterator> = iterator { + for(d in this@filter){ + if(predicate(d.name, d.meta)){ + yield(d) + } + } + } override fun get(name: Name): Data? = this@filter.get(name)?.takeIf { predicate(name, it.meta) @@ -58,8 +63,11 @@ public fun DataSet.withNamePrefix(prefix: Name): DataSet = if (p override val meta: Meta get() = this@withNamePrefix.meta - override fun traverse(): Sequence> = - this@withNamePrefix.traverse().map { it.data.named(prefix + it.name) } + override fun iterator(): Iterator> = iterator { + for(d in this@withNamePrefix){ + yield(d.data.named(prefix + d.name)) + } + } override fun get(name: Name): Data? = name.removeHeadOrNull(name)?.let { this@withNamePrefix.get(it) } @@ -80,9 +88,11 @@ public fun DataSet.branch(branchName: Name): DataSet = if (branc override val meta: Meta get() = this@branch.meta - override fun traverse(): Sequence> = this@branch.traverse().mapNotNull { - it.name.removeHeadOrNull(branchName)?.let { name -> - it.data.named(name) + override fun iterator(): Iterator> = iterator { + for(d in this@branch){ + d.name.removeHeadOrNull(branchName)?.let { name -> + yield(d.data.named(name)) + } } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt index 287d6383..6d683f12 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt @@ -7,8 +7,6 @@ import space.kscience.dataforge.meta.seal import space.kscience.dataforge.meta.toMutableMeta import space.kscience.dataforge.misc.DFInternal import space.kscience.dataforge.names.Name -import kotlin.contracts.InvocationKind -import kotlin.contracts.contract import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext import kotlin.reflect.KType @@ -16,11 +14,12 @@ import kotlin.reflect.typeOf public data class ValueWithMeta(val meta: Meta, val value: T) -public suspend fun Data.awaitWithMeta(): ValueWithMeta = ValueWithMeta(meta, await()) +public suspend fun Data.awaitWithMeta(): ValueWithMeta = ValueWithMeta(meta, await()) public data class NamedValueWithMeta(val name: Name, val meta: Meta, val value: T) -public suspend fun NamedData.awaitWithMeta(): NamedValueWithMeta = NamedValueWithMeta(name, meta, await()) +public suspend fun NamedData.awaitWithMeta(): NamedValueWithMeta = + NamedValueWithMeta(name, meta, await()) /** @@ -187,14 +186,13 @@ public suspend fun DataSet.map( metaTransform: MutableMeta.() -> Unit = {}, block: suspend (NamedValueWithMeta) -> R, ): DataTree = DataTree(outputType) { - populateFrom( - traverse().map { - val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal() - Data(outputType, newMeta, coroutineContext, listOf(it)) { - block(it.awaitWithMeta()) - }.named(it.name) + forEach { + val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal() + val d = Data(outputType, newMeta, coroutineContext, listOf(it)) { + block(it.awaitWithMeta()) } - ) + data(it.name, d) + } } @OptIn(DFInternal::class) @@ -204,10 +202,9 @@ public suspend inline fun DataSet.map( noinline block: suspend (NamedValueWithMeta) -> R, ): DataTree = map(typeOf(), coroutineContext, metaTransform, block) -public suspend fun DataSet.forEach(block: suspend (NamedData) -> Unit) { - contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } - traverse().forEach { - block(it) +public inline fun DataSet.forEach(block: (NamedData) -> Unit) { + for (d in this) { + block(d) } } @@ -215,11 +212,11 @@ public inline fun DataSet.reduceToData( coroutineContext: CoroutineContext = EmptyCoroutineContext, meta: Meta = Meta.EMPTY, crossinline transformation: suspend (Iterable>) -> R, -): Data = traverse().asIterable().reduceNamedToData(coroutineContext, meta, transformation) +): Data = asIterable().reduceNamedToData(coroutineContext, meta, transformation) public inline fun DataSet.foldToData( initial: R, coroutineContext: CoroutineContext = EmptyCoroutineContext, meta: Meta = Meta.EMPTY, crossinline block: suspend (result: R, data: NamedValueWithMeta) -> R, -): Data = traverse().asIterable().foldNamedToData(initial, coroutineContext, meta, block) \ No newline at end of file +): Data = asIterable().foldNamedToData(initial, coroutineContext, meta, block) \ No newline at end of file diff --git a/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataFilterJvm.kt b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataFilterJvm.kt index d5c5eb56..74d67d9d 100644 --- a/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataFilterJvm.kt +++ b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataFilterJvm.kt @@ -5,7 +5,6 @@ import kotlinx.coroutines.flow.filter import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.names.Name -import space.kscience.dataforge.names.matches import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext import kotlin.reflect.KType @@ -29,7 +28,6 @@ private fun Data<*>.castOrNull(type: KType): Data? = /** * Select all data matching given type and filters. Does not modify paths * - * @param namePattern a name match patter according to [Name.matches] * @param predicate addition filtering condition based on item name and meta. By default, accepts all */ @OptIn(DFExperimental::class) @@ -47,11 +45,13 @@ public fun DataSet<*>.filterByType( private fun checkDatum(name: Name, datum: Data<*>): Boolean = datum.type.isSubtypeOf(type) && predicate(name, datum.meta) - override fun traverse(): Sequence> = this@filterByType.traverse().filter { - checkDatum(it.name, it.data) - }.map { - @Suppress("UNCHECKED_CAST") - it as NamedData + override fun iterator(): Iterator> = iterator { + for(d in this@filterByType){ + if(checkDatum(d.name,d.data)){ + @Suppress("UNCHECKED_CAST") + yield(d as NamedData) + } + } } override fun get(name: Name): Data? = this@filterByType[name]?.let { datum -> diff --git a/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataSetBuilderInContext.kt b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataSetBuilderInContext.kt index 3fc68141..cb222ea0 100644 --- a/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataSetBuilderInContext.kt +++ b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataSetBuilderInContext.kt @@ -27,7 +27,7 @@ context(DataSetBuilder) public infix fun String.put( ): Unit = node(Name.parse(this), block) /** - * Copy given data set and mirror its changes to this [DataSourceBuilder] in [this@setAndObserve]. Returns an update [Job] + * Copy given data set and mirror its changes to this [DataTreeBuilder] in [this@setAndObserve]. Returns an update [Job] */ context(DataSetBuilder) public fun CoroutineScope.setAndWatch( name: Name, diff --git a/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt b/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt index 9fd05357..3987cd19 100644 --- a/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt +++ b/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt @@ -1,5 +1,6 @@ package space.kscience.dataforge.data +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Test @@ -9,7 +10,7 @@ import space.kscience.dataforge.actions.map import space.kscience.dataforge.misc.DFExperimental import kotlin.test.assertEquals -@OptIn(DFExperimental::class) +@OptIn(DFExperimental::class, ExperimentalCoroutinesApi::class) internal class ActionsTest { @Test fun testStaticMapAction() = runTest { @@ -28,7 +29,7 @@ internal class ActionsTest { @Test fun testDynamicMapAction() = runTest { - val data: DataSourceBuilder = ActiveDataTree() + val data: DataSourceBuilder = DataSource() val plusOne = Action.map { result { it + 1 } diff --git a/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt b/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt index 9509092e..b77f7ea2 100644 --- a/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt +++ b/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt @@ -56,7 +56,7 @@ internal class DataTreeBuilderTest { try { lateinit var updateJob: Job supervisorScope { - val subNode = ActiveDataTree { + val subNode = DataSource { updateJob = launch { repeat(10) { delay(10) @@ -70,7 +70,7 @@ internal class DataTreeBuilderTest { println(it) } } - val rootNode = ActiveDataTree { + val rootNode = DataSource { setAndWatch("sub".asName(), subNode) } diff --git a/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/Meta.kt b/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/Meta.kt index 48a65c6d..e8f392ac 100644 --- a/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/Meta.kt +++ b/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/Meta.kt @@ -133,6 +133,7 @@ public fun Meta.getIndexed(name: Name): Map { } } +public fun Meta.getIndexed(name: String): Map = getIndexed(name.parseAsName()) /** * A meta node that ensures that all of its descendants has at least the same type. diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt index 1c9b59fd..0c676fb7 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt @@ -1,6 +1,7 @@ package space.kscience.dataforge.workspace import space.kscience.dataforge.data.DataSet +import space.kscience.dataforge.data.forEach import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.names.Name @@ -23,7 +24,7 @@ public interface TaskResult : DataSet { */ public val taskMeta: Meta - override fun traverse(): Sequence> + override fun iterator(): Iterator> override fun get(name: Name): TaskData? } @@ -34,8 +35,10 @@ private class TaskResultImpl( override val taskMeta: Meta, ) : TaskResult, DataSet by dataSet { - override fun traverse(): Sequence> = dataSet.traverse().map { - workspace.wrapData(it, it.name, taskName, taskMeta) + override fun iterator(): Iterator> = iterator { + dataSet.forEach { + yield(workspace.wrapData(it, it.name, taskName, taskMeta)) + } } override fun get(name: Name): TaskData? = dataSet.get(name)?.let { diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Workspace.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Workspace.kt index 6fa04c94..0053850d 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Workspace.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Workspace.kt @@ -3,6 +3,7 @@ package space.kscience.dataforge.workspace import space.kscience.dataforge.context.ContextAware import space.kscience.dataforge.data.Data import space.kscience.dataforge.data.DataSet +import space.kscience.dataforge.data.asSequence import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.MutableMeta import space.kscience.dataforge.misc.Type @@ -35,7 +36,7 @@ public interface Workspace : ContextAware, Provider { return when (target) { "target", Meta.TYPE -> targets.mapKeys { Name.parse(it.key)} Task.TYPE -> tasks - Data.TYPE -> data.traverse().associateBy { it.name } + Data.TYPE -> data.asSequence().associateBy { it.name } else -> emptyMap() } } diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt index 54d027ac..4a847e64 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt @@ -15,11 +15,9 @@ import space.kscience.dataforge.misc.DFBuilder import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.asName -import kotlin.collections.HashMap import kotlin.collections.set import kotlin.properties.PropertyDelegateProvider import kotlin.properties.ReadOnlyProperty -import kotlin.reflect.typeOf public data class TaskReference(public val taskName: Name, public val task: Task) : DataSelector { @@ -106,8 +104,8 @@ public class WorkspaceBuilder(private val parentContext: Context = Global) : Tas } @DFExperimental - public fun buildActiveData(scope: CoroutineScope, builder: DataSourceBuilder.() -> Unit) { - data = ActiveDataTree(typeOf(), scope, builder) + public fun data(scope: CoroutineScope, builder: DataSourceBuilder.() -> Unit) { + data = DataSource(scope, builder) } /** diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt index 06a97869..02bf9001 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt @@ -3,6 +3,7 @@ package space.kscience.dataforge.workspace import space.kscience.dataforge.data.DataSet import space.kscience.dataforge.data.filterByType import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.matches @@ -10,6 +11,7 @@ import space.kscience.dataforge.names.matches // data(builder) //} +@OptIn(DFExperimental::class) public inline fun TaskResultBuilder<*>.data(namePattern: Name? = null): DataSelector = object : DataSelector { override suspend fun select(workspace: Workspace, meta: Meta): DataSet = diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt index a7426d61..b0c2ebf4 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt @@ -1,6 +1,9 @@ +@file:OptIn(ExperimentalCoroutinesApi::class) + package space.kscience.dataforge.workspace -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runTest import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.PluginFactory import space.kscience.dataforge.context.PluginTag @@ -15,7 +18,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() { val allData by task { val selectedData = workspace.data.filterByType() - val result: Data = selectedData.traverse().asIterable().foldToData(0) { result, data -> + val result: Data = selectedData.foldToData(0) { result, data -> result + data.value } data("result", result) @@ -44,28 +47,22 @@ class DataPropagationTest { context { plugin(DataPropagationTestPlugin) } - runBlocking { - data { - repeat(100) { - static("myData[$it]", it) - } + data { + repeat(100) { + static("myData[$it]", it) } } } @Test - fun testAllData() { - runBlocking { - val node = testWorkspace.produce("Test.allData") - assertEquals(4950, node.traverse().single().await()) - } + fun testAllData() = runTest { + val node = testWorkspace.produce("Test.allData") + assertEquals(4950, node.asSequence().single().await()) } @Test - fun testSingleData() { - runBlocking { - val node = testWorkspace.produce("Test.singleData") - assertEquals(12, node.traverse().single().await()) - } + fun testSingleData() = runTest { + val node = testWorkspace.produce("Test.singleData") + assertEquals(12, node.asSequence().single().await()) } } \ No newline at end of file diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt index 64d13f30..db231c80 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt @@ -1,8 +1,11 @@ @file:Suppress("UNUSED_VARIABLE") +@file:OptIn(ExperimentalCoroutinesApi::class) package space.kscience.dataforge.workspace +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Timeout import space.kscience.dataforge.context.* import space.kscience.dataforge.data.* @@ -26,7 +29,7 @@ public inline fun P.toFactory(): PluginFactory

= object override val type: KClass = P::class } -public fun Workspace.runBlocking(task: String, block: MutableMeta.() -> Unit = {}): DataSet = runBlocking { +public fun Workspace.produceBlocking(task: String, block: MutableMeta.() -> Unit = {}): DataSet = runBlocking { produce(task, block) } @@ -156,21 +159,17 @@ class SimpleWorkspaceTest { @Test @Timeout(1) - fun testWorkspace() { - runBlocking { - val node = workspace.runBlocking("sum") - val res = node.traverse().single() - assertEquals(328350, res.await()) - } + fun testWorkspace() = runTest { + val node = workspace.produce("sum") + val res = node.asSequence().single() + assertEquals(328350, res.await()) } @Test @Timeout(1) - fun testMetaPropagation() { - runBlocking { - val node = workspace.produce("sum") { "testFlag" put true } - val res = node.traverse().single().await() - } + fun testMetaPropagation() = runTest { + val node = workspace.produce("sum") { "testFlag" put true } + val res = node.asSequence().single().await() } @Test @@ -192,7 +191,7 @@ class SimpleWorkspaceTest { fun testFilter() { runBlocking { val node = workspace.produce("filterOne") - assertEquals(12, node.traverse().first().await()) + assertEquals(12, node.asSequence().first().await()) } } } \ No newline at end of file