From fe92e8fccf796ce5c4d4fffe29987c048d403b16 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sun, 8 May 2022 20:56:14 +0300 Subject: [PATCH] Data traversal refactoring --- CHANGELOG.md | 1 + build.gradle.kts | 2 +- .../kscience/dataforge/actions/MapAction.kt | 2 +- .../dataforge/actions/ReduceAction.kt | 10 +- .../kscience/dataforge/actions/SplitAction.kt | 4 +- .../space/kscience/dataforge/data/DataSet.kt | 29 ++--- .../kscience/dataforge/data/DataSetBuilder.kt | 4 +- .../space/kscience/dataforge/data/DataTree.kt | 24 ++-- .../kscience/dataforge/data/GroupRule.kt | 4 +- .../kscience/dataforge/data/StaticDataTree.kt | 2 +- .../kscience/dataforge/data/dataFilter.kt | 10 +- .../kscience/dataforge/data/dataTransform.kt | 105 ++++++++++++------ .../kscience/dataforge/data/dataFilterJvm.kt | 26 ++--- .../dataforge/workspace/TaskResult.kt | 4 +- .../kscience/dataforge/workspace/Workspace.kt | 2 +- .../dataforge/workspace/workspaceJvm.kt | 6 +- .../workspace/DataPropagationTest.kt | 12 +- .../workspace/SimpleWorkspaceTest.kt | 20 ++-- 18 files changed, 148 insertions(+), 119 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f73ebf1..58f82326 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ - DataSet operates with sequences of data instead of flows - PartialEnvelope uses `Int` instead `UInt`. - `ActiveDataSet` renamed to `DataSource` +- `selectOne`->`getByType` ### Deprecated diff --git a/build.gradle.kts b/build.gradle.kts index a7140851..f8072b6a 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -4,7 +4,7 @@ plugins { allprojects { group = "space.kscience" - version = "0.6.0-dev-5" + version = "0.6.0-dev-6" } subprojects { diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt index f2165e7d..91f58c95 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt @@ -89,7 +89,7 @@ internal class MapAction( return newData.named(newName) } - val sequence = dataSet.dataSequence().map(::mapOne) + val sequence = dataSet.traverse().map(::mapOne) return if (dataSet is DataSource ) { ActiveDataTree(outputType, dataSet) { diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt index 99da6cd2..d3be1ce1 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt @@ -19,14 +19,14 @@ public class JoinGroup( public var meta: MutableMeta = MutableMeta() - public lateinit var result: suspend ActionEnv.(Map>) -> R + public lateinit var result: suspend ActionEnv.(Map>) -> R - internal fun result(outputType: KType, f: suspend ActionEnv.(Map>) -> R1) { + internal fun result(outputType: KType, f: suspend ActionEnv.(Map>) -> R1) { this.outputType = outputType this.result = f; } - public inline fun result(noinline f: suspend ActionEnv.(Map>) -> R1) { + public inline fun result(noinline f: suspend ActionEnv.(Map>) -> R1) { outputType = typeOf() this.result = f; } @@ -66,7 +66,7 @@ public class ReduceGroupBuilder( /** * Apply transformation to the whole node */ - public fun result(resultName: String, f: suspend ActionEnv.(Map>) -> R) { + public fun result(resultName: String, f: suspend ActionEnv.(Map>) -> R) { groupRules += { node -> listOf(JoinGroup(resultName, node, outputType).apply { result(outputType, f) }) } @@ -87,7 +87,7 @@ internal class ReduceAction( override fun transform(set: DataSet, meta: Meta, key: Name): Sequence> = sequence { ReduceGroupBuilder(meta, outputType).apply(action).buildGroups(set).forEach { group -> - val dataFlow: Map> = group.set.dataSequence().fold(HashMap()) { acc, value -> + val dataFlow: Map> = group.set.traverse().fold(HashMap()) { acc, value -> acc.apply { acc[value.name] = value.data } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt index 471a8057..ee778f6d 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt @@ -77,7 +77,7 @@ internal class SplitAction( return if (dataSet is DataSource) { ActiveDataTree(outputType, dataSet) { - populateFrom(dataSet.dataSequence().flatMap(transform = ::splitOne)) + populateFrom(dataSet.traverse().flatMap(transform = ::splitOne)) launch { dataSet.updates.collect { name -> //clear old nodes @@ -89,7 +89,7 @@ internal class SplitAction( } } else { DataTree(outputType) { - populateFrom(dataSet.dataSequence().flatMap(transform = ::splitOne)) + populateFrom(dataSet.traverse().flatMap(transform = ::splitOne)) } } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt index f468632f..3a3497b3 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt @@ -10,8 +10,7 @@ import space.kscience.dataforge.meta.set import space.kscience.dataforge.names.* import kotlin.reflect.KType -public interface -DataSet { +public interface DataSet { /** * The minimal common ancestor to all data in the node @@ -24,23 +23,15 @@ DataSet { public val meta: Meta /** - * Traverse this provider or its child. The order is not guaranteed. + * Traverse this [DataSet] returning named data instances. The order is not guaranteed. */ - public fun dataSequence(): Sequence> + public fun traverse(): Sequence> /** * Get data with given name. */ public operator fun get(name: Name): Data? - - /** - * Get a snapshot of names of top level children of given node. Empty if node does not exist or is a leaf. - */ - public fun listTop(prefix: Name = Name.EMPTY): List = - dataSequence().map { it.name }.filter { it.startsWith(prefix) && (it.length == prefix.length + 1) }.toList() - // By default, traverses the whole tree. Could be optimized in descendants - public companion object { public val META_KEY: Name = "@meta".asName() @@ -51,16 +42,14 @@ DataSet { override val dataType: KType = TYPE_OF_NOTHING override val meta: Meta get() = Meta.EMPTY - //private val nothing: Nothing get() = error("this is nothing") - - override fun dataSequence(): Sequence> = emptySequence() + override fun traverse(): Sequence> = emptySequence() override fun get(name: Name): Data? = null } } } -public operator fun DataSet.get(name:String): Data? = get(name.parseAsName()) +public operator fun DataSet.get(name: String): Data? = get(name.parseAsName()) /** * A [DataSet] with propagated updates. @@ -78,7 +67,7 @@ public interface DataSource : DataSet, CoroutineScope { /** * Stop generating updates from this [DataSource] */ - public fun close(){ + public fun close() { coroutineContext[Job]?.cancel() } } @@ -89,7 +78,7 @@ public val DataSet.updates: Flow get() = if (this is DataSour * Flow all data nodes with names starting with [branchName] */ public fun DataSet.children(branchName: Name): Sequence> = - this@children.dataSequence().filter { + this@children.traverse().filter { it.name.startsWith(branchName) } @@ -97,7 +86,7 @@ public fun DataSet.children(branchName: Name): Sequence DataSet.startAll(coroutineScope: CoroutineScope): Job = coroutineScope.launch { - dataSequence().map { + traverse().map { it.launch(this@launch) }.toList().joinAll() } @@ -105,7 +94,7 @@ public fun DataSet.startAll(coroutineScope: CoroutineScope): Job = public suspend fun DataSet.join(): Unit = coroutineScope { startAll(this).join() } public suspend fun DataSet<*>.toMeta(): Meta = Meta { - dataSequence().forEach { + traverse().forEach { if (it.name.endsWith(DataSet.META_KEY)) { set(it.name, it.meta) } else { diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSetBuilder.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSetBuilder.kt index cfcc1c97..ca50019a 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSetBuilder.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSetBuilder.kt @@ -28,7 +28,7 @@ public interface DataSetBuilder { } //Set new items - dataSet.dataSequence().forEach { + dataSet.traverse().forEach { data(name + it.name, it.data) } } @@ -146,7 +146,7 @@ public inline fun DataSetBuilder.static( */ @DFExperimental public fun DataSetBuilder.populateFrom(tree: DataSet): Unit { - tree.dataSequence().forEach { + tree.traverse().forEach { //TODO check if the place is occupied data(it.name, it.data) } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataTree.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataTree.kt index 3ef065e1..0540d6f6 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataTree.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataTree.kt @@ -1,9 +1,5 @@ package space.kscience.dataforge.data -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.emitAll -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.map import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.misc.Type import space.kscience.dataforge.names.* @@ -43,20 +39,17 @@ public interface DataTree : DataSet { override val meta: Meta get() = items[META_ITEM_NAME_TOKEN]?.meta ?: Meta.EMPTY - override fun dataSequence(): Sequence> = sequence { + override fun traverse(): Sequence> = sequence { items.forEach { (token, childItem: DataTreeItem) -> if (!token.body.startsWith("@")) { when (childItem) { is DataTreeItem.Leaf -> yield(childItem.data.named(token.asName())) - is DataTreeItem.Node -> yieldAll(childItem.tree.dataSequence().map { it.named(token + it.name) }) + is DataTreeItem.Node -> yieldAll(childItem.tree.traverse().map { it.named(token + it.name) }) } } } } - override fun listTop(prefix: Name): List = - getItem(prefix).tree?.items?.keys?.map { prefix + it } ?: emptyList() - override fun get(name: Name): Data? = when (name.length) { 0 -> null 1 -> items[name.firstOrNull()!!].data @@ -73,6 +66,9 @@ public interface DataTree : DataSet { } } +public fun DataTree.listChildren(prefix: Name): List = + getItem(prefix).tree?.items?.keys?.map { prefix + it } ?: emptyList() + /** * Get a [DataTreeItem] with given [name] or null if the item does not exist */ @@ -86,15 +82,15 @@ public val DataTreeItem?.tree: DataTree? get() = (this as? DataT public val DataTreeItem?.data: Data? get() = (this as? DataTreeItem.Leaf)?.data /** - * Flow of all children including nodes + * A [Sequence] of all children including nodes */ -public fun DataTree.itemFlow(): Flow>> = flow { +public fun DataTree.traverseItems(): Sequence>> = sequence { items.forEach { (head, item) -> - emit(head.asName() to item) + yield(head.asName() to item) if (item is DataTreeItem.Node) { - val subSequence = item.tree.itemFlow() + val subSequence = item.tree.traverseItems() .map { (name, data) -> (head.asName() + name) to data } - emitAll(subSequence) + yieldAll(subSequence) } } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt index c00fac8d..1c4787f6 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt @@ -42,7 +42,7 @@ public interface GroupRule { val map = HashMap>() if (set is DataSource) { - set.dataSequence().forEach { data -> + set.traverse().forEach { data -> val tagValue: String = data.meta[key]?.string ?: defaultTagValue (map.getOrPut(tagValue) { DataSourceBuilder(set.dataType, set.coroutineContext) } as DataSourceBuilder) .data(data.name, data.data) @@ -61,7 +61,7 @@ public interface GroupRule { } } } else { - set.dataSequence().forEach { data -> + set.traverse().forEach { data -> val tagValue: String = data.meta[key]?.string ?: defaultTagValue (map.getOrPut(tagValue) { StaticDataTree(set.dataType) } as StaticDataTree) .data(data.name, data.data) diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt index 2353f97d..04a1ebee 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt @@ -52,7 +52,7 @@ internal class StaticDataTree( if (dataSet is StaticDataTree) { set(name, DataTreeItem.Node(dataSet)) } else { - dataSet.dataSequence().forEach { + dataSet.traverse().forEach { data(name + it.name, it.data) } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataFilter.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataFilter.kt index f5037918..d186c636 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataFilter.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataFilter.kt @@ -30,8 +30,8 @@ public fun DataSet.filter( override val meta: Meta get() = this@filter.meta - override fun dataSequence(): Sequence> = - this@filter.dataSequence().filter { predicate(it.name, it.meta) } + override fun traverse(): Sequence> = + this@filter.traverse().filter { predicate(it.name, it.meta) } override fun get(name: Name): Data? = this@filter.get(name)?.takeIf { predicate(name, it.meta) @@ -58,8 +58,8 @@ public fun DataSet.withNamePrefix(prefix: Name): DataSet = if (p override val meta: Meta get() = this@withNamePrefix.meta - override fun dataSequence(): Sequence> = - this@withNamePrefix.dataSequence().map { it.data.named(prefix + it.name) } + override fun traverse(): Sequence> = + this@withNamePrefix.traverse().map { it.data.named(prefix + it.name) } override fun get(name: Name): Data? = name.removeHeadOrNull(name)?.let { this@withNamePrefix.get(it) } @@ -80,7 +80,7 @@ public fun DataSet.branch(branchName: Name): DataSet = if (branc override val meta: Meta get() = this@branch.meta - override fun dataSequence(): Sequence> = this@branch.dataSequence().mapNotNull { + override fun traverse(): Sequence> = this@branch.traverse().mapNotNull { it.name.removeHeadOrNull(branchName)?.let { name -> it.data.named(name) } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt index aba33561..287d6383 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt @@ -1,12 +1,12 @@ package space.kscience.dataforge.data -import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.MutableMeta import space.kscience.dataforge.meta.seal import space.kscience.dataforge.meta.toMutableMeta import space.kscience.dataforge.misc.DFInternal +import space.kscience.dataforge.names.Name import kotlin.contracts.InvocationKind import kotlin.contracts.contract import kotlin.coroutines.CoroutineContext @@ -14,6 +14,15 @@ import kotlin.coroutines.EmptyCoroutineContext import kotlin.reflect.KType import kotlin.reflect.typeOf +public data class ValueWithMeta(val meta: Meta, val value: T) + +public suspend fun Data.awaitWithMeta(): ValueWithMeta = ValueWithMeta(meta, await()) + +public data class NamedValueWithMeta(val name: Name, val meta: Meta, val value: T) + +public suspend fun NamedData.awaitWithMeta(): NamedValueWithMeta = NamedValueWithMeta(name, meta, await()) + + /** * Lazily transform this data to another data. By convention [block] should not use external data (be pure). * @param coroutineContext additional [CoroutineContext] elements used for data computation. @@ -49,13 +58,13 @@ public inline fun Data.combine( public inline fun Collection>.reduceToData( coroutineContext: CoroutineContext = EmptyCoroutineContext, meta: Meta = Meta.EMPTY, - crossinline block: suspend (List>) -> R, + crossinline block: suspend (List>) -> R, ): Data = Data( meta, coroutineContext, this ) { - block(map { it.meta to it.await() }) + block(map { it.awaitWithMeta() }) } @DFInternal @@ -63,17 +72,16 @@ public fun Map>.reduceToData( outputType: KType, coroutineContext: CoroutineContext = EmptyCoroutineContext, meta: Meta = Meta.EMPTY, - block: suspend (Map>) -> R, + block: suspend (Map>) -> R, ): Data = Data( outputType, meta, coroutineContext, this.values ) { - block(mapValues { it.value.meta to it.value.await() }) + block(mapValues { it.value.awaitWithMeta() }) } - /** * Lazily reduce a [Map] of [Data] with any static key. * @param K type of the map key @@ -83,58 +91,93 @@ public fun Map>.reduceToData( public inline fun Map>.reduceToData( coroutineContext: CoroutineContext = EmptyCoroutineContext, meta: Meta = Meta.EMPTY, - noinline block: suspend (Map) -> R, + crossinline block: suspend (Map>) -> R, ): Data = Data( meta, coroutineContext, this.values ) { - block(mapValues { it.value.await() }) + block(mapValues { it.value.awaitWithMeta() }) } -//flow operations +//Iterable operations -/** - * Transform a [Flow] of [NamedData] to a single [Data]. - */ @DFInternal -public inline fun Sequence>.reduceToData( +public inline fun Iterable>.reduceToData( outputType: KType, coroutineContext: CoroutineContext = EmptyCoroutineContext, meta: Meta = Meta.EMPTY, - crossinline transformation: suspend (Sequence>) -> R, + crossinline transformation: suspend (Collection>) -> R, ): Data = Data( outputType, meta, coroutineContext, toList() ) { - transformation(this) + transformation(map { it.awaitWithMeta() }) } @OptIn(DFInternal::class) -public inline fun Sequence>.reduceToData( +public inline fun Iterable>.reduceToData( coroutineContext: CoroutineContext = EmptyCoroutineContext, meta: Meta = Meta.EMPTY, - crossinline transformation: suspend (Sequence>) -> R, + crossinline transformation: suspend (Collection>) -> R, ): Data = reduceToData(typeOf(), coroutineContext, meta) { transformation(it) } -/** - * Fold a flow of named data into a single [Data] - */ -public inline fun Sequence>.foldToData( +public inline fun Iterable>.foldToData( initial: R, coroutineContext: CoroutineContext = EmptyCoroutineContext, meta: Meta = Meta.EMPTY, - crossinline block: suspend (result: R, data: NamedData) -> R, + crossinline block: suspend (result: R, data: ValueWithMeta) -> R, ): Data = reduceToData( coroutineContext, meta ) { it.fold(initial) { acc, t -> block(acc, t) } } +/** + * Transform an [Iterable] of [NamedData] to a single [Data]. + */ +@DFInternal +public inline fun Iterable>.reduceNamedToData( + outputType: KType, + coroutineContext: CoroutineContext = EmptyCoroutineContext, + meta: Meta = Meta.EMPTY, + crossinline transformation: suspend (Collection>) -> R, +): Data = Data( + outputType, + meta, + coroutineContext, + toList() +) { + transformation(map { it.awaitWithMeta() }) +} + +@OptIn(DFInternal::class) +public inline fun Iterable>.reduceNamedToData( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + meta: Meta = Meta.EMPTY, + crossinline transformation: suspend (Collection>) -> R, +): Data = reduceNamedToData(typeOf(), coroutineContext, meta) { + transformation(it) +} + +/** + * Fold a [Iterable] of named data into a single [Data] + */ +public inline fun Iterable>.foldNamedToData( + initial: R, + coroutineContext: CoroutineContext = EmptyCoroutineContext, + meta: Meta = Meta.EMPTY, + crossinline block: suspend (result: R, data: NamedValueWithMeta) -> R, +): Data = reduceNamedToData( + coroutineContext, meta +) { + it.fold(initial) { acc, t -> block(acc, t) } +} + //DataSet operations @DFInternal @@ -142,13 +185,13 @@ public suspend fun DataSet.map( outputType: KType, coroutineContext: CoroutineContext = EmptyCoroutineContext, metaTransform: MutableMeta.() -> Unit = {}, - block: suspend (T) -> R, + block: suspend (NamedValueWithMeta) -> R, ): DataTree = DataTree(outputType) { populateFrom( - dataSequence().map { + traverse().map { val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal() Data(outputType, newMeta, coroutineContext, listOf(it)) { - block(it.await()) + block(it.awaitWithMeta()) }.named(it.name) } ) @@ -158,12 +201,12 @@ public suspend fun DataSet.map( public suspend inline fun DataSet.map( coroutineContext: CoroutineContext = EmptyCoroutineContext, noinline metaTransform: MutableMeta.() -> Unit = {}, - noinline block: suspend (T) -> R, + noinline block: suspend (NamedValueWithMeta) -> R, ): DataTree = map(typeOf(), coroutineContext, metaTransform, block) public suspend fun DataSet.forEach(block: suspend (NamedData) -> Unit) { contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } - dataSequence().forEach { + traverse().forEach { block(it) } } @@ -171,12 +214,12 @@ public suspend fun DataSet.forEach(block: suspend (NamedData) -> public inline fun DataSet.reduceToData( coroutineContext: CoroutineContext = EmptyCoroutineContext, meta: Meta = Meta.EMPTY, - crossinline transformation: suspend (Sequence>) -> R, -): Data = dataSequence().reduceToData(coroutineContext, meta, transformation) + crossinline transformation: suspend (Iterable>) -> R, +): Data = traverse().asIterable().reduceNamedToData(coroutineContext, meta, transformation) public inline fun DataSet.foldToData( initial: R, coroutineContext: CoroutineContext = EmptyCoroutineContext, meta: Meta = Meta.EMPTY, - crossinline block: suspend (result: R, data: NamedData) -> R, -): Data = dataSequence().foldToData(initial, coroutineContext, meta, block) \ No newline at end of file + crossinline block: suspend (result: R, data: NamedValueWithMeta) -> R, +): Data = traverse().asIterable().foldNamedToData(initial, coroutineContext, meta, block) \ No newline at end of file diff --git a/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataFilterJvm.kt b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataFilterJvm.kt index 5efe6d2a..d5c5eb56 100644 --- a/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataFilterJvm.kt +++ b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataFilterJvm.kt @@ -33,32 +33,32 @@ private fun Data<*>.castOrNull(type: KType): Data? = * @param predicate addition filtering condition based on item name and meta. By default, accepts all */ @OptIn(DFExperimental::class) -public fun DataSet<*>.filterIsInstance( +public fun DataSet<*>.filterByType( type: KType, predicate: (name: Name, meta: Meta) -> Boolean = { _, _ -> true }, ): DataSource = object : DataSource { override val dataType = type override val coroutineContext: CoroutineContext - get() = (this@filterIsInstance as? DataSource)?.coroutineContext ?: EmptyCoroutineContext + get() = (this@filterByType as? DataSource)?.coroutineContext ?: EmptyCoroutineContext - override val meta: Meta get() = this@filterIsInstance.meta + override val meta: Meta get() = this@filterByType.meta private fun checkDatum(name: Name, datum: Data<*>): Boolean = datum.type.isSubtypeOf(type) && predicate(name, datum.meta) - override fun dataSequence(): Sequence> = this@filterIsInstance.dataSequence().filter { + override fun traverse(): Sequence> = this@filterByType.traverse().filter { checkDatum(it.name, it.data) }.map { @Suppress("UNCHECKED_CAST") it as NamedData } - override fun get(name: Name): Data? = this@filterIsInstance[name]?.let { datum -> + override fun get(name: Name): Data? = this@filterByType[name]?.let { datum -> if (checkDatum(name, datum)) datum.castOrNull(type) else null } - override val updates: Flow = this@filterIsInstance.updates.filter { name -> + override val updates: Flow = this@filterByType.updates.filter { name -> get(name)?.let { datum -> checkDatum(name, datum) } ?: false @@ -68,18 +68,18 @@ public fun DataSet<*>.filterIsInstance( /** * Select a single datum of the appropriate type */ -public inline fun DataSet<*>.filterIsInstance( +public inline fun DataSet<*>.filterByType( noinline predicate: (name: Name, meta: Meta) -> Boolean = { _, _ -> true }, -): DataSet = filterIsInstance(typeOf(), predicate) +): DataSet = filterByType(typeOf(), predicate) /** * Select a single datum if it is present and of given [type] */ -public fun DataSet<*>.selectOne(type: KType, name: Name): NamedData? = +public fun DataSet<*>.getByType(type: KType, name: Name): NamedData? = get(name)?.castOrNull(type)?.named(name) -public inline fun DataSet<*>.selectOne(name: Name): NamedData? = - selectOne(typeOf(), name) +public inline fun DataSet<*>.getByType(name: Name): NamedData? = + this@getByType.getByType(typeOf(), name) -public inline fun DataSet<*>.selectOne(name: String): NamedData? = - selectOne(typeOf(), Name.parse(name)) \ No newline at end of file +public inline fun DataSet<*>.getByType(name: String): NamedData? = + this@getByType.getByType(typeOf(), Name.parse(name)) \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt index a1fb4e84..1c9b59fd 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt @@ -23,7 +23,7 @@ public interface TaskResult : DataSet { */ public val taskMeta: Meta - override fun dataSequence(): Sequence> + override fun traverse(): Sequence> override fun get(name: Name): TaskData? } @@ -34,7 +34,7 @@ private class TaskResultImpl( override val taskMeta: Meta, ) : TaskResult, DataSet by dataSet { - override fun dataSequence(): Sequence> = dataSet.dataSequence().map { + override fun traverse(): Sequence> = dataSet.traverse().map { workspace.wrapData(it, it.name, taskName, taskMeta) } diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Workspace.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Workspace.kt index 6b120c76..6fa04c94 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Workspace.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Workspace.kt @@ -35,7 +35,7 @@ public interface Workspace : ContextAware, Provider { return when (target) { "target", Meta.TYPE -> targets.mapKeys { Name.parse(it.key)} Task.TYPE -> tasks - Data.TYPE -> data.dataSequence().associateBy { it.name } + Data.TYPE -> data.traverse().associateBy { it.name } else -> emptyMap() } } diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt index 6634a2a0..06a97869 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt @@ -1,7 +1,7 @@ package space.kscience.dataforge.workspace import space.kscience.dataforge.data.DataSet -import space.kscience.dataforge.data.filterIsInstance +import space.kscience.dataforge.data.filterByType import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.matches @@ -13,7 +13,7 @@ import space.kscience.dataforge.names.matches public inline fun TaskResultBuilder<*>.data(namePattern: Name? = null): DataSelector = object : DataSelector { override suspend fun select(workspace: Workspace, meta: Meta): DataSet = - workspace.data.filterIsInstance { name, _ -> + workspace.data.filterByType { name, _ -> namePattern == null || name.matches(namePattern) } } @@ -21,4 +21,4 @@ public inline fun TaskResultBuilder<*>.data(namePattern: Name? public suspend inline fun TaskResultBuilder<*>.fromTask( task: Name, taskMeta: Meta = Meta.EMPTY, -): DataSet = workspace.produce(task, taskMeta).filterIsInstance() \ No newline at end of file +): DataSet = workspace.produce(task, taskMeta).filterByType() \ No newline at end of file diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt index f4534bc7..a7426d61 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt @@ -14,16 +14,16 @@ class DataPropagationTestPlugin : WorkspacePlugin() { override val tag: PluginTag = Companion.tag val allData by task { - val selectedData = workspace.data.filterIsInstance() - val result: Data = selectedData.dataSequence().foldToData(0) { result, data -> - result + data.await() + val selectedData = workspace.data.filterByType() + val result: Data = selectedData.traverse().asIterable().foldToData(0) { result, data -> + result + data.value } data("result", result) } val singleData by task { - workspace.data.filterIsInstance()["myData[12]"]?.let { + workspace.data.filterByType()["myData[12]"]?.let { data("result", it) } } @@ -57,7 +57,7 @@ class DataPropagationTest { fun testAllData() { runBlocking { val node = testWorkspace.produce("Test.allData") - assertEquals(4950, node.dataSequence().single().await()) + assertEquals(4950, node.traverse().single().await()) } } @@ -65,7 +65,7 @@ class DataPropagationTest { fun testSingleData() { runBlocking { val node = testWorkspace.produce("Test.singleData") - assertEquals(12, node.dataSequence().single().await()) + assertEquals(12, node.traverse().single().await()) } } } \ No newline at end of file diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt index 367e8489..64d13f30 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt @@ -63,7 +63,7 @@ class SimpleWorkspaceTest { } val filterOne by task { - workspace.data.selectOne("myData[12]")?.let { source -> + workspace.data.getByType("myData[12]")?.let { source -> data(source.name, source.map { it }) } } @@ -111,23 +111,23 @@ class SimpleWorkspaceTest { val sum by task { workspace.logger.info { "Starting sum" } val res = from(square).foldToData(0) { l, r -> - l + r.await() + l + r.value } data("sum", res) } val averageByGroup by task { - val evenSum = workspace.data.filterIsInstance { name, _ -> + val evenSum = workspace.data.filterByType { name, _ -> name.toString().toInt() % 2 == 0 }.foldToData(0) { l, r -> - l + r.await() + l + r.value } data("even", evenSum) - val oddSum = workspace.data.filterIsInstance { name, _ -> + val oddSum = workspace.data.filterByType { name, _ -> name.toString().toInt() % 2 == 1 }.foldToData(0) { l, r -> - l + r.await() + l + r.value } data("odd", oddSum) } @@ -143,7 +143,7 @@ class SimpleWorkspaceTest { } val customPipe by task { - workspace.data.filterIsInstance().forEach { data -> + workspace.data.filterByType().forEach { data -> val meta = data.meta.toMutableMeta().apply { "newValue" put 22 } @@ -159,7 +159,7 @@ class SimpleWorkspaceTest { fun testWorkspace() { runBlocking { val node = workspace.runBlocking("sum") - val res = node.dataSequence().single() + val res = node.traverse().single() assertEquals(328350, res.await()) } } @@ -169,7 +169,7 @@ class SimpleWorkspaceTest { fun testMetaPropagation() { runBlocking { val node = workspace.produce("sum") { "testFlag" put true } - val res = node.dataSequence().single().await() + val res = node.traverse().single().await() } } @@ -192,7 +192,7 @@ class SimpleWorkspaceTest { fun testFilter() { runBlocking { val node = workspace.produce("filterOne") - assertEquals(12, node.dataSequence().first().await()) + assertEquals(12, node.traverse().first().await()) } } } \ No newline at end of file