Data traversal refactoring

This commit is contained in:
Alexander Nozik 2022-05-08 20:56:14 +03:00
parent 7d9189e15c
commit fe92e8fccf
No known key found for this signature in database
GPG Key ID: F7FCF2DD25C71357
18 changed files with 148 additions and 119 deletions

View File

@ -15,6 +15,7 @@
- DataSet operates with sequences of data instead of flows - DataSet operates with sequences of data instead of flows
- PartialEnvelope uses `Int` instead `UInt`. - PartialEnvelope uses `Int` instead `UInt`.
- `ActiveDataSet` renamed to `DataSource` - `ActiveDataSet` renamed to `DataSource`
- `selectOne`->`getByType`
### Deprecated ### Deprecated

View File

@ -4,7 +4,7 @@ plugins {
allprojects { allprojects {
group = "space.kscience" group = "space.kscience"
version = "0.6.0-dev-5" version = "0.6.0-dev-6"
} }
subprojects { subprojects {

View File

@ -89,7 +89,7 @@ internal class MapAction<in T : Any, out R : Any>(
return newData.named(newName) return newData.named(newName)
} }
val sequence = dataSet.dataSequence().map(::mapOne) val sequence = dataSet.traverse().map(::mapOne)
return if (dataSet is DataSource ) { return if (dataSet is DataSource ) {
ActiveDataTree(outputType, dataSet) { ActiveDataTree(outputType, dataSet) {

View File

@ -19,14 +19,14 @@ public class JoinGroup<T : Any, R : Any>(
public var meta: MutableMeta = MutableMeta() public var meta: MutableMeta = MutableMeta()
public lateinit var result: suspend ActionEnv.(Map<Name, Pair<Meta, T>>) -> R public lateinit var result: suspend ActionEnv.(Map<Name, ValueWithMeta<T>>) -> R
internal fun <R1 : R> result(outputType: KType, f: suspend ActionEnv.(Map<Name, Pair<Meta, T>>) -> R1) { internal fun <R1 : R> result(outputType: KType, f: suspend ActionEnv.(Map<Name, ValueWithMeta<T>>) -> R1) {
this.outputType = outputType this.outputType = outputType
this.result = f; this.result = f;
} }
public inline fun <reified R1 : R> result(noinline f: suspend ActionEnv.(Map<Name, Pair<Meta, T>>) -> R1) { public inline fun <reified R1 : R> result(noinline f: suspend ActionEnv.(Map<Name, ValueWithMeta<T>>) -> R1) {
outputType = typeOf<R1>() outputType = typeOf<R1>()
this.result = f; this.result = f;
} }
@ -66,7 +66,7 @@ public class ReduceGroupBuilder<T : Any, R : Any>(
/** /**
* Apply transformation to the whole node * Apply transformation to the whole node
*/ */
public fun result(resultName: String, f: suspend ActionEnv.(Map<Name, Pair<Meta, T>>) -> R) { public fun result(resultName: String, f: suspend ActionEnv.(Map<Name, ValueWithMeta<T>>) -> R) {
groupRules += { node -> groupRules += { node ->
listOf(JoinGroup<T, R>(resultName, node, outputType).apply { result(outputType, f) }) listOf(JoinGroup<T, R>(resultName, node, outputType).apply { result(outputType, f) })
} }
@ -87,7 +87,7 @@ internal class ReduceAction<T : Any, R : Any>(
override fun transform(set: DataSet<T>, meta: Meta, key: Name): Sequence<NamedData<R>> = sequence { override fun transform(set: DataSet<T>, meta: Meta, key: Name): Sequence<NamedData<R>> = sequence {
ReduceGroupBuilder<T, R>(meta, outputType).apply(action).buildGroups(set).forEach { group -> ReduceGroupBuilder<T, R>(meta, outputType).apply(action).buildGroups(set).forEach { group ->
val dataFlow: Map<Name, Data<T>> = group.set.dataSequence().fold(HashMap()) { acc, value -> val dataFlow: Map<Name, Data<T>> = group.set.traverse().fold(HashMap()) { acc, value ->
acc.apply { acc.apply {
acc[value.name] = value.data acc[value.name] = value.data
} }

View File

@ -77,7 +77,7 @@ internal class SplitAction<T : Any, R : Any>(
return if (dataSet is DataSource) { return if (dataSet is DataSource) {
ActiveDataTree<R>(outputType, dataSet) { ActiveDataTree<R>(outputType, dataSet) {
populateFrom(dataSet.dataSequence().flatMap(transform = ::splitOne)) populateFrom(dataSet.traverse().flatMap(transform = ::splitOne))
launch { launch {
dataSet.updates.collect { name -> dataSet.updates.collect { name ->
//clear old nodes //clear old nodes
@ -89,7 +89,7 @@ internal class SplitAction<T : Any, R : Any>(
} }
} else { } else {
DataTree<R>(outputType) { DataTree<R>(outputType) {
populateFrom(dataSet.dataSequence().flatMap(transform = ::splitOne)) populateFrom(dataSet.traverse().flatMap(transform = ::splitOne))
} }
} }
} }

View File

@ -10,8 +10,7 @@ import space.kscience.dataforge.meta.set
import space.kscience.dataforge.names.* import space.kscience.dataforge.names.*
import kotlin.reflect.KType import kotlin.reflect.KType
public interface public interface DataSet<out T : Any> {
DataSet<out T : Any> {
/** /**
* The minimal common ancestor to all data in the node * The minimal common ancestor to all data in the node
@ -24,23 +23,15 @@ DataSet<out T : Any> {
public val meta: Meta public val meta: Meta
/** /**
* Traverse this provider or its child. The order is not guaranteed. * Traverse this [DataSet] returning named data instances. The order is not guaranteed.
*/ */
public fun dataSequence(): Sequence<NamedData<T>> public fun traverse(): Sequence<NamedData<T>>
/** /**
* Get data with given name. * Get data with given name.
*/ */
public operator fun get(name: Name): Data<T>? public operator fun get(name: Name): Data<T>?
/**
* Get a snapshot of names of top level children of given node. Empty if node does not exist or is a leaf.
*/
public fun listTop(prefix: Name = Name.EMPTY): List<Name> =
dataSequence().map { it.name }.filter { it.startsWith(prefix) && (it.length == prefix.length + 1) }.toList()
// By default, traverses the whole tree. Could be optimized in descendants
public companion object { public companion object {
public val META_KEY: Name = "@meta".asName() public val META_KEY: Name = "@meta".asName()
@ -51,16 +42,14 @@ DataSet<out T : Any> {
override val dataType: KType = TYPE_OF_NOTHING override val dataType: KType = TYPE_OF_NOTHING
override val meta: Meta get() = Meta.EMPTY override val meta: Meta get() = Meta.EMPTY
//private val nothing: Nothing get() = error("this is nothing") override fun traverse(): Sequence<NamedData<Nothing>> = emptySequence()
override fun dataSequence(): Sequence<NamedData<Nothing>> = emptySequence()
override fun get(name: Name): Data<Nothing>? = null override fun get(name: Name): Data<Nothing>? = null
} }
} }
} }
public operator fun <T: Any> DataSet<T>.get(name:String): Data<T>? = get(name.parseAsName()) public operator fun <T : Any> DataSet<T>.get(name: String): Data<T>? = get(name.parseAsName())
/** /**
* A [DataSet] with propagated updates. * A [DataSet] with propagated updates.
@ -78,7 +67,7 @@ public interface DataSource<T : Any> : DataSet<T>, CoroutineScope {
/** /**
* Stop generating updates from this [DataSource] * Stop generating updates from this [DataSource]
*/ */
public fun close(){ public fun close() {
coroutineContext[Job]?.cancel() coroutineContext[Job]?.cancel()
} }
} }
@ -89,7 +78,7 @@ public val <T : Any> DataSet<T>.updates: Flow<Name> get() = if (this is DataSour
* Flow all data nodes with names starting with [branchName] * Flow all data nodes with names starting with [branchName]
*/ */
public fun <T : Any> DataSet<T>.children(branchName: Name): Sequence<NamedData<T>> = public fun <T : Any> DataSet<T>.children(branchName: Name): Sequence<NamedData<T>> =
this@children.dataSequence().filter { this@children.traverse().filter {
it.name.startsWith(branchName) it.name.startsWith(branchName)
} }
@ -97,7 +86,7 @@ public fun <T : Any> DataSet<T>.children(branchName: Name): Sequence<NamedData<T
* Start computation for all goals in data node and return a job for the whole node * Start computation for all goals in data node and return a job for the whole node
*/ */
public fun <T : Any> DataSet<T>.startAll(coroutineScope: CoroutineScope): Job = coroutineScope.launch { public fun <T : Any> DataSet<T>.startAll(coroutineScope: CoroutineScope): Job = coroutineScope.launch {
dataSequence().map { traverse().map {
it.launch(this@launch) it.launch(this@launch)
}.toList().joinAll() }.toList().joinAll()
} }
@ -105,7 +94,7 @@ public fun <T : Any> DataSet<T>.startAll(coroutineScope: CoroutineScope): Job =
public suspend fun <T : Any> DataSet<T>.join(): Unit = coroutineScope { startAll(this).join() } public suspend fun <T : Any> DataSet<T>.join(): Unit = coroutineScope { startAll(this).join() }
public suspend fun DataSet<*>.toMeta(): Meta = Meta { public suspend fun DataSet<*>.toMeta(): Meta = Meta {
dataSequence().forEach { traverse().forEach {
if (it.name.endsWith(DataSet.META_KEY)) { if (it.name.endsWith(DataSet.META_KEY)) {
set(it.name, it.meta) set(it.name, it.meta)
} else { } else {

View File

@ -28,7 +28,7 @@ public interface DataSetBuilder<in T : Any> {
} }
//Set new items //Set new items
dataSet.dataSequence().forEach { dataSet.traverse().forEach {
data(name + it.name, it.data) data(name + it.name, it.data)
} }
} }
@ -146,7 +146,7 @@ public inline fun <reified T : Any> DataSetBuilder<T>.static(
*/ */
@DFExperimental @DFExperimental
public fun <T : Any> DataSetBuilder<T>.populateFrom(tree: DataSet<T>): Unit { public fun <T : Any> DataSetBuilder<T>.populateFrom(tree: DataSet<T>): Unit {
tree.dataSequence().forEach { tree.traverse().forEach {
//TODO check if the place is occupied //TODO check if the place is occupied
data(it.name, it.data) data(it.name, it.data)
} }

View File

@ -1,9 +1,5 @@
package space.kscience.dataforge.data package space.kscience.dataforge.data
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.Type import space.kscience.dataforge.misc.Type
import space.kscience.dataforge.names.* import space.kscience.dataforge.names.*
@ -43,20 +39,17 @@ public interface DataTree<out T : Any> : DataSet<T> {
override val meta: Meta get() = items[META_ITEM_NAME_TOKEN]?.meta ?: Meta.EMPTY override val meta: Meta get() = items[META_ITEM_NAME_TOKEN]?.meta ?: Meta.EMPTY
override fun dataSequence(): Sequence<NamedData<T>> = sequence { override fun traverse(): Sequence<NamedData<T>> = sequence {
items.forEach { (token, childItem: DataTreeItem<T>) -> items.forEach { (token, childItem: DataTreeItem<T>) ->
if (!token.body.startsWith("@")) { if (!token.body.startsWith("@")) {
when (childItem) { when (childItem) {
is DataTreeItem.Leaf -> yield(childItem.data.named(token.asName())) is DataTreeItem.Leaf -> yield(childItem.data.named(token.asName()))
is DataTreeItem.Node -> yieldAll(childItem.tree.dataSequence().map { it.named(token + it.name) }) is DataTreeItem.Node -> yieldAll(childItem.tree.traverse().map { it.named(token + it.name) })
} }
} }
} }
} }
override fun listTop(prefix: Name): List<Name> =
getItem(prefix).tree?.items?.keys?.map { prefix + it } ?: emptyList()
override fun get(name: Name): Data<T>? = when (name.length) { override fun get(name: Name): Data<T>? = when (name.length) {
0 -> null 0 -> null
1 -> items[name.firstOrNull()!!].data 1 -> items[name.firstOrNull()!!].data
@ -73,6 +66,9 @@ public interface DataTree<out T : Any> : DataSet<T> {
} }
} }
public fun <T : Any> DataTree<T>.listChildren(prefix: Name): List<Name> =
getItem(prefix).tree?.items?.keys?.map { prefix + it } ?: emptyList()
/** /**
* Get a [DataTreeItem] with given [name] or null if the item does not exist * Get a [DataTreeItem] with given [name] or null if the item does not exist
*/ */
@ -86,15 +82,15 @@ public val <T : Any> DataTreeItem<T>?.tree: DataTree<T>? get() = (this as? DataT
public val <T : Any> DataTreeItem<T>?.data: Data<T>? get() = (this as? DataTreeItem.Leaf<T>)?.data public val <T : Any> DataTreeItem<T>?.data: Data<T>? get() = (this as? DataTreeItem.Leaf<T>)?.data
/** /**
* Flow of all children including nodes * A [Sequence] of all children including nodes
*/ */
public fun <T : Any> DataTree<T>.itemFlow(): Flow<Pair<Name, DataTreeItem<T>>> = flow { public fun <T : Any> DataTree<T>.traverseItems(): Sequence<Pair<Name, DataTreeItem<T>>> = sequence {
items.forEach { (head, item) -> items.forEach { (head, item) ->
emit(head.asName() to item) yield(head.asName() to item)
if (item is DataTreeItem.Node) { if (item is DataTreeItem.Node) {
val subSequence = item.tree.itemFlow() val subSequence = item.tree.traverseItems()
.map { (name, data) -> (head.asName() + name) to data } .map { (name, data) -> (head.asName() + name) to data }
emitAll(subSequence) yieldAll(subSequence)
} }
} }
} }

View File

@ -42,7 +42,7 @@ public interface GroupRule {
val map = HashMap<String, DataSet<T>>() val map = HashMap<String, DataSet<T>>()
if (set is DataSource) { if (set is DataSource) {
set.dataSequence().forEach { data -> set.traverse().forEach { data ->
val tagValue: String = data.meta[key]?.string ?: defaultTagValue val tagValue: String = data.meta[key]?.string ?: defaultTagValue
(map.getOrPut(tagValue) { DataSourceBuilder(set.dataType, set.coroutineContext) } as DataSourceBuilder<T>) (map.getOrPut(tagValue) { DataSourceBuilder(set.dataType, set.coroutineContext) } as DataSourceBuilder<T>)
.data(data.name, data.data) .data(data.name, data.data)
@ -61,7 +61,7 @@ public interface GroupRule {
} }
} }
} else { } else {
set.dataSequence().forEach { data -> set.traverse().forEach { data ->
val tagValue: String = data.meta[key]?.string ?: defaultTagValue val tagValue: String = data.meta[key]?.string ?: defaultTagValue
(map.getOrPut(tagValue) { StaticDataTree(set.dataType) } as StaticDataTree<T>) (map.getOrPut(tagValue) { StaticDataTree(set.dataType) } as StaticDataTree<T>)
.data(data.name, data.data) .data(data.name, data.data)

View File

@ -52,7 +52,7 @@ internal class StaticDataTree<T : Any>(
if (dataSet is StaticDataTree) { if (dataSet is StaticDataTree) {
set(name, DataTreeItem.Node(dataSet)) set(name, DataTreeItem.Node(dataSet))
} else { } else {
dataSet.dataSequence().forEach { dataSet.traverse().forEach {
data(name + it.name, it.data) data(name + it.name, it.data)
} }
} }

View File

@ -30,8 +30,8 @@ public fun <T : Any> DataSet<T>.filter(
override val meta: Meta get() = this@filter.meta override val meta: Meta get() = this@filter.meta
override fun dataSequence(): Sequence<NamedData<T>> = override fun traverse(): Sequence<NamedData<T>> =
this@filter.dataSequence().filter { predicate(it.name, it.meta) } this@filter.traverse().filter { predicate(it.name, it.meta) }
override fun get(name: Name): Data<T>? = this@filter.get(name)?.takeIf { override fun get(name: Name): Data<T>? = this@filter.get(name)?.takeIf {
predicate(name, it.meta) predicate(name, it.meta)
@ -58,8 +58,8 @@ public fun <T : Any> DataSet<T>.withNamePrefix(prefix: Name): DataSet<T> = if (p
override val meta: Meta get() = this@withNamePrefix.meta override val meta: Meta get() = this@withNamePrefix.meta
override fun dataSequence(): Sequence<NamedData<T>> = override fun traverse(): Sequence<NamedData<T>> =
this@withNamePrefix.dataSequence().map { it.data.named(prefix + it.name) } this@withNamePrefix.traverse().map { it.data.named(prefix + it.name) }
override fun get(name: Name): Data<T>? = override fun get(name: Name): Data<T>? =
name.removeHeadOrNull(name)?.let { this@withNamePrefix.get(it) } name.removeHeadOrNull(name)?.let { this@withNamePrefix.get(it) }
@ -80,7 +80,7 @@ public fun <T : Any> DataSet<T>.branch(branchName: Name): DataSet<T> = if (branc
override val meta: Meta get() = this@branch.meta override val meta: Meta get() = this@branch.meta
override fun dataSequence(): Sequence<NamedData<T>> = this@branch.dataSequence().mapNotNull { override fun traverse(): Sequence<NamedData<T>> = this@branch.traverse().mapNotNull {
it.name.removeHeadOrNull(branchName)?.let { name -> it.name.removeHeadOrNull(branchName)?.let { name ->
it.data.named(name) it.data.named(name)
} }

View File

@ -1,12 +1,12 @@
package space.kscience.dataforge.data package space.kscience.dataforge.data
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.meta.seal import space.kscience.dataforge.meta.seal
import space.kscience.dataforge.meta.toMutableMeta import space.kscience.dataforge.meta.toMutableMeta
import space.kscience.dataforge.misc.DFInternal import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.Name
import kotlin.contracts.InvocationKind import kotlin.contracts.InvocationKind
import kotlin.contracts.contract import kotlin.contracts.contract
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
@ -14,6 +14,15 @@ import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KType import kotlin.reflect.KType
import kotlin.reflect.typeOf import kotlin.reflect.typeOf
public data class ValueWithMeta<T>(val meta: Meta, val value: T)
public suspend fun <T: Any> Data<T>.awaitWithMeta(): ValueWithMeta<T> = ValueWithMeta(meta, await())
public data class NamedValueWithMeta<T>(val name: Name, val meta: Meta, val value: T)
public suspend fun <T: Any> NamedData<T>.awaitWithMeta(): NamedValueWithMeta<T> = NamedValueWithMeta(name, meta, await())
/** /**
* Lazily transform this data to another data. By convention [block] should not use external data (be pure). * Lazily transform this data to another data. By convention [block] should not use external data (be pure).
* @param coroutineContext additional [CoroutineContext] elements used for data computation. * @param coroutineContext additional [CoroutineContext] elements used for data computation.
@ -49,13 +58,13 @@ public inline fun <T1 : Any, T2 : Any, reified R : Any> Data<T1>.combine(
public inline fun <T : Any, reified R : Any> Collection<Data<T>>.reduceToData( public inline fun <T : Any, reified R : Any> Collection<Data<T>>.reduceToData(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
crossinline block: suspend (List<Pair<Meta, T>>) -> R, crossinline block: suspend (List<ValueWithMeta<T>>) -> R,
): Data<R> = Data( ): Data<R> = Data(
meta, meta,
coroutineContext, coroutineContext,
this this
) { ) {
block(map { it.meta to it.await() }) block(map { it.awaitWithMeta() })
} }
@DFInternal @DFInternal
@ -63,17 +72,16 @@ public fun <K, T : Any, R : Any> Map<K, Data<T>>.reduceToData(
outputType: KType, outputType: KType,
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
block: suspend (Map<K, Pair<Meta, T>>) -> R, block: suspend (Map<K, ValueWithMeta<T>>) -> R,
): Data<R> = Data( ): Data<R> = Data(
outputType, outputType,
meta, meta,
coroutineContext, coroutineContext,
this.values this.values
) { ) {
block(mapValues { it.value.meta to it.value.await() }) block(mapValues { it.value.awaitWithMeta() })
} }
/** /**
* Lazily reduce a [Map] of [Data] with any static key. * Lazily reduce a [Map] of [Data] with any static key.
* @param K type of the map key * @param K type of the map key
@ -83,58 +91,93 @@ public fun <K, T : Any, R : Any> Map<K, Data<T>>.reduceToData(
public inline fun <K, T : Any, reified R : Any> Map<K, Data<T>>.reduceToData( public inline fun <K, T : Any, reified R : Any> Map<K, Data<T>>.reduceToData(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
noinline block: suspend (Map<K, T>) -> R, crossinline block: suspend (Map<K, ValueWithMeta<T>>) -> R,
): Data<R> = Data( ): Data<R> = Data(
meta, meta,
coroutineContext, coroutineContext,
this.values this.values
) { ) {
block(mapValues { it.value.await() }) block(mapValues { it.value.awaitWithMeta() })
} }
//flow operations //Iterable operations
/**
* Transform a [Flow] of [NamedData] to a single [Data].
*/
@DFInternal @DFInternal
public inline fun <T : Any, R : Any> Sequence<NamedData<T>>.reduceToData( public inline fun <T : Any, R : Any> Iterable<Data<T>>.reduceToData(
outputType: KType, outputType: KType,
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
crossinline transformation: suspend (Sequence<NamedData<T>>) -> R, crossinline transformation: suspend (Collection<ValueWithMeta<T>>) -> R,
): Data<R> = Data( ): Data<R> = Data(
outputType, outputType,
meta, meta,
coroutineContext, coroutineContext,
toList() toList()
) { ) {
transformation(this) transformation(map { it.awaitWithMeta() })
} }
@OptIn(DFInternal::class) @OptIn(DFInternal::class)
public inline fun <T : Any, reified R : Any> Sequence<NamedData<T>>.reduceToData( public inline fun <T : Any, reified R : Any> Iterable<Data<T>>.reduceToData(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
crossinline transformation: suspend (Sequence<NamedData<T>>) -> R, crossinline transformation: suspend (Collection<ValueWithMeta<T>>) -> R,
): Data<R> = reduceToData(typeOf<R>(), coroutineContext, meta) { ): Data<R> = reduceToData(typeOf<R>(), coroutineContext, meta) {
transformation(it) transformation(it)
} }
/** public inline fun <T : Any, reified R : Any> Iterable<Data<T>>.foldToData(
* Fold a flow of named data into a single [Data]
*/
public inline fun <T : Any, reified R : Any> Sequence<NamedData<T>>.foldToData(
initial: R, initial: R,
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
crossinline block: suspend (result: R, data: NamedData<T>) -> R, crossinline block: suspend (result: R, data: ValueWithMeta<T>) -> R,
): Data<R> = reduceToData( ): Data<R> = reduceToData(
coroutineContext, meta coroutineContext, meta
) { ) {
it.fold(initial) { acc, t -> block(acc, t) } it.fold(initial) { acc, t -> block(acc, t) }
} }
/**
* Transform an [Iterable] of [NamedData] to a single [Data].
*/
@DFInternal
public inline fun <T : Any, R : Any> Iterable<NamedData<T>>.reduceNamedToData(
outputType: KType,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
crossinline transformation: suspend (Collection<NamedValueWithMeta<T>>) -> R,
): Data<R> = Data(
outputType,
meta,
coroutineContext,
toList()
) {
transformation(map { it.awaitWithMeta() })
}
@OptIn(DFInternal::class)
public inline fun <T : Any, reified R : Any> Iterable<NamedData<T>>.reduceNamedToData(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
crossinline transformation: suspend (Collection<NamedValueWithMeta<T>>) -> R,
): Data<R> = reduceNamedToData(typeOf<R>(), coroutineContext, meta) {
transformation(it)
}
/**
* Fold a [Iterable] of named data into a single [Data]
*/
public inline fun <T : Any, reified R : Any> Iterable<NamedData<T>>.foldNamedToData(
initial: R,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
crossinline block: suspend (result: R, data: NamedValueWithMeta<T>) -> R,
): Data<R> = reduceNamedToData(
coroutineContext, meta
) {
it.fold(initial) { acc, t -> block(acc, t) }
}
//DataSet operations //DataSet operations
@DFInternal @DFInternal
@ -142,13 +185,13 @@ public suspend fun <T : Any, R : Any> DataSet<T>.map(
outputType: KType, outputType: KType,
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
metaTransform: MutableMeta.() -> Unit = {}, metaTransform: MutableMeta.() -> Unit = {},
block: suspend (T) -> R, block: suspend (NamedValueWithMeta<T>) -> R,
): DataTree<R> = DataTree<R>(outputType) { ): DataTree<R> = DataTree<R>(outputType) {
populateFrom( populateFrom(
dataSequence().map { traverse().map {
val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal() val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal()
Data(outputType, newMeta, coroutineContext, listOf(it)) { Data(outputType, newMeta, coroutineContext, listOf(it)) {
block(it.await()) block(it.awaitWithMeta())
}.named(it.name) }.named(it.name)
} }
) )
@ -158,12 +201,12 @@ public suspend fun <T : Any, R : Any> DataSet<T>.map(
public suspend inline fun <T : Any, reified R : Any> DataSet<T>.map( public suspend inline fun <T : Any, reified R : Any> DataSet<T>.map(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
noinline metaTransform: MutableMeta.() -> Unit = {}, noinline metaTransform: MutableMeta.() -> Unit = {},
noinline block: suspend (T) -> R, noinline block: suspend (NamedValueWithMeta<T>) -> R,
): DataTree<R> = map(typeOf<R>(), coroutineContext, metaTransform, block) ): DataTree<R> = map(typeOf<R>(), coroutineContext, metaTransform, block)
public suspend fun <T : Any> DataSet<T>.forEach(block: suspend (NamedData<T>) -> Unit) { public suspend fun <T : Any> DataSet<T>.forEach(block: suspend (NamedData<T>) -> Unit) {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
dataSequence().forEach { traverse().forEach {
block(it) block(it)
} }
} }
@ -171,12 +214,12 @@ public suspend fun <T : Any> DataSet<T>.forEach(block: suspend (NamedData<T>) ->
public inline fun <T : Any, reified R : Any> DataSet<T>.reduceToData( public inline fun <T : Any, reified R : Any> DataSet<T>.reduceToData(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
crossinline transformation: suspend (Sequence<NamedData<T>>) -> R, crossinline transformation: suspend (Iterable<NamedValueWithMeta<T>>) -> R,
): Data<R> = dataSequence().reduceToData(coroutineContext, meta, transformation) ): Data<R> = traverse().asIterable().reduceNamedToData(coroutineContext, meta, transformation)
public inline fun <T : Any, reified R : Any> DataSet<T>.foldToData( public inline fun <T : Any, reified R : Any> DataSet<T>.foldToData(
initial: R, initial: R,
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
crossinline block: suspend (result: R, data: NamedData<T>) -> R, crossinline block: suspend (result: R, data: NamedValueWithMeta<T>) -> R,
): Data<R> = dataSequence().foldToData(initial, coroutineContext, meta, block) ): Data<R> = traverse().asIterable().foldNamedToData(initial, coroutineContext, meta, block)

View File

@ -33,32 +33,32 @@ private fun <R : Any> Data<*>.castOrNull(type: KType): Data<R>? =
* @param predicate addition filtering condition based on item name and meta. By default, accepts all * @param predicate addition filtering condition based on item name and meta. By default, accepts all
*/ */
@OptIn(DFExperimental::class) @OptIn(DFExperimental::class)
public fun <R : Any> DataSet<*>.filterIsInstance( public fun <R : Any> DataSet<*>.filterByType(
type: KType, type: KType,
predicate: (name: Name, meta: Meta) -> Boolean = { _, _ -> true }, predicate: (name: Name, meta: Meta) -> Boolean = { _, _ -> true },
): DataSource<R> = object : DataSource<R> { ): DataSource<R> = object : DataSource<R> {
override val dataType = type override val dataType = type
override val coroutineContext: CoroutineContext override val coroutineContext: CoroutineContext
get() = (this@filterIsInstance as? DataSource)?.coroutineContext ?: EmptyCoroutineContext get() = (this@filterByType as? DataSource)?.coroutineContext ?: EmptyCoroutineContext
override val meta: Meta get() = this@filterIsInstance.meta override val meta: Meta get() = this@filterByType.meta
private fun checkDatum(name: Name, datum: Data<*>): Boolean = datum.type.isSubtypeOf(type) private fun checkDatum(name: Name, datum: Data<*>): Boolean = datum.type.isSubtypeOf(type)
&& predicate(name, datum.meta) && predicate(name, datum.meta)
override fun dataSequence(): Sequence<NamedData<R>> = this@filterIsInstance.dataSequence().filter { override fun traverse(): Sequence<NamedData<R>> = this@filterByType.traverse().filter {
checkDatum(it.name, it.data) checkDatum(it.name, it.data)
}.map { }.map {
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
it as NamedData<R> it as NamedData<R>
} }
override fun get(name: Name): Data<R>? = this@filterIsInstance[name]?.let { datum -> override fun get(name: Name): Data<R>? = this@filterByType[name]?.let { datum ->
if (checkDatum(name, datum)) datum.castOrNull(type) else null if (checkDatum(name, datum)) datum.castOrNull(type) else null
} }
override val updates: Flow<Name> = this@filterIsInstance.updates.filter { name -> override val updates: Flow<Name> = this@filterByType.updates.filter { name ->
get(name)?.let { datum -> get(name)?.let { datum ->
checkDatum(name, datum) checkDatum(name, datum)
} ?: false } ?: false
@ -68,18 +68,18 @@ public fun <R : Any> DataSet<*>.filterIsInstance(
/** /**
* Select a single datum of the appropriate type * Select a single datum of the appropriate type
*/ */
public inline fun <reified R : Any> DataSet<*>.filterIsInstance( public inline fun <reified R : Any> DataSet<*>.filterByType(
noinline predicate: (name: Name, meta: Meta) -> Boolean = { _, _ -> true }, noinline predicate: (name: Name, meta: Meta) -> Boolean = { _, _ -> true },
): DataSet<R> = filterIsInstance(typeOf<R>(), predicate) ): DataSet<R> = filterByType(typeOf<R>(), predicate)
/** /**
* Select a single datum if it is present and of given [type] * Select a single datum if it is present and of given [type]
*/ */
public fun <R : Any> DataSet<*>.selectOne(type: KType, name: Name): NamedData<R>? = public fun <R : Any> DataSet<*>.getByType(type: KType, name: Name): NamedData<R>? =
get(name)?.castOrNull<R>(type)?.named(name) get(name)?.castOrNull<R>(type)?.named(name)
public inline fun <reified R : Any> DataSet<*>.selectOne(name: Name): NamedData<R>? = public inline fun <reified R : Any> DataSet<*>.getByType(name: Name): NamedData<R>? =
selectOne(typeOf<R>(), name) this@getByType.getByType(typeOf<R>(), name)
public inline fun <reified R : Any> DataSet<*>.selectOne(name: String): NamedData<R>? = public inline fun <reified R : Any> DataSet<*>.getByType(name: String): NamedData<R>? =
selectOne(typeOf<R>(), Name.parse(name)) this@getByType.getByType(typeOf<R>(), Name.parse(name))

View File

@ -23,7 +23,7 @@ public interface TaskResult<out T : Any> : DataSet<T> {
*/ */
public val taskMeta: Meta public val taskMeta: Meta
override fun dataSequence(): Sequence<TaskData<T>> override fun traverse(): Sequence<TaskData<T>>
override fun get(name: Name): TaskData<T>? override fun get(name: Name): TaskData<T>?
} }
@ -34,7 +34,7 @@ private class TaskResultImpl<out T : Any>(
override val taskMeta: Meta, override val taskMeta: Meta,
) : TaskResult<T>, DataSet<T> by dataSet { ) : TaskResult<T>, DataSet<T> by dataSet {
override fun dataSequence(): Sequence<TaskData<T>> = dataSet.dataSequence().map { override fun traverse(): Sequence<TaskData<T>> = dataSet.traverse().map {
workspace.wrapData(it, it.name, taskName, taskMeta) workspace.wrapData(it, it.name, taskName, taskMeta)
} }

View File

@ -35,7 +35,7 @@ public interface Workspace : ContextAware, Provider {
return when (target) { return when (target) {
"target", Meta.TYPE -> targets.mapKeys { Name.parse(it.key)} "target", Meta.TYPE -> targets.mapKeys { Name.parse(it.key)}
Task.TYPE -> tasks Task.TYPE -> tasks
Data.TYPE -> data.dataSequence().associateBy { it.name } Data.TYPE -> data.traverse().associateBy { it.name }
else -> emptyMap() else -> emptyMap()
} }
} }

View File

@ -1,7 +1,7 @@
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import space.kscience.dataforge.data.DataSet import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.filterIsInstance import space.kscience.dataforge.data.filterByType
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.matches import space.kscience.dataforge.names.matches
@ -13,7 +13,7 @@ import space.kscience.dataforge.names.matches
public inline fun <reified T : Any> TaskResultBuilder<*>.data(namePattern: Name? = null): DataSelector<T> = public inline fun <reified T : Any> TaskResultBuilder<*>.data(namePattern: Name? = null): DataSelector<T> =
object : DataSelector<T> { object : DataSelector<T> {
override suspend fun select(workspace: Workspace, meta: Meta): DataSet<T> = override suspend fun select(workspace: Workspace, meta: Meta): DataSet<T> =
workspace.data.filterIsInstance { name, _ -> workspace.data.filterByType { name, _ ->
namePattern == null || name.matches(namePattern) namePattern == null || name.matches(namePattern)
} }
} }
@ -21,4 +21,4 @@ public inline fun <reified T : Any> TaskResultBuilder<*>.data(namePattern: Name?
public suspend inline fun <reified T : Any> TaskResultBuilder<*>.fromTask( public suspend inline fun <reified T : Any> TaskResultBuilder<*>.fromTask(
task: Name, task: Name,
taskMeta: Meta = Meta.EMPTY, taskMeta: Meta = Meta.EMPTY,
): DataSet<T> = workspace.produce(task, taskMeta).filterIsInstance() ): DataSet<T> = workspace.produce(task, taskMeta).filterByType()

View File

@ -14,16 +14,16 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
override val tag: PluginTag = Companion.tag override val tag: PluginTag = Companion.tag
val allData by task<Int> { val allData by task<Int> {
val selectedData = workspace.data.filterIsInstance<Int>() val selectedData = workspace.data.filterByType<Int>()
val result: Data<Int> = selectedData.dataSequence().foldToData(0) { result, data -> val result: Data<Int> = selectedData.traverse().asIterable().foldToData(0) { result, data ->
result + data.await() result + data.value
} }
data("result", result) data("result", result)
} }
val singleData by task<Int> { val singleData by task<Int> {
workspace.data.filterIsInstance<Int>()["myData[12]"]?.let { workspace.data.filterByType<Int>()["myData[12]"]?.let {
data("result", it) data("result", it)
} }
} }
@ -57,7 +57,7 @@ class DataPropagationTest {
fun testAllData() { fun testAllData() {
runBlocking { runBlocking {
val node = testWorkspace.produce("Test.allData") val node = testWorkspace.produce("Test.allData")
assertEquals(4950, node.dataSequence().single().await()) assertEquals(4950, node.traverse().single().await())
} }
} }
@ -65,7 +65,7 @@ class DataPropagationTest {
fun testSingleData() { fun testSingleData() {
runBlocking { runBlocking {
val node = testWorkspace.produce("Test.singleData") val node = testWorkspace.produce("Test.singleData")
assertEquals(12, node.dataSequence().single().await()) assertEquals(12, node.traverse().single().await())
} }
} }
} }

View File

@ -63,7 +63,7 @@ class SimpleWorkspaceTest {
} }
val filterOne by task<Int> { val filterOne by task<Int> {
workspace.data.selectOne<Int>("myData[12]")?.let { source -> workspace.data.getByType<Int>("myData[12]")?.let { source ->
data(source.name, source.map { it }) data(source.name, source.map { it })
} }
} }
@ -111,23 +111,23 @@ class SimpleWorkspaceTest {
val sum by task<Int> { val sum by task<Int> {
workspace.logger.info { "Starting sum" } workspace.logger.info { "Starting sum" }
val res = from(square).foldToData(0) { l, r -> val res = from(square).foldToData(0) { l, r ->
l + r.await() l + r.value
} }
data("sum", res) data("sum", res)
} }
val averageByGroup by task<Int> { val averageByGroup by task<Int> {
val evenSum = workspace.data.filterIsInstance<Int> { name, _ -> val evenSum = workspace.data.filterByType<Int> { name, _ ->
name.toString().toInt() % 2 == 0 name.toString().toInt() % 2 == 0
}.foldToData(0) { l, r -> }.foldToData(0) { l, r ->
l + r.await() l + r.value
} }
data("even", evenSum) data("even", evenSum)
val oddSum = workspace.data.filterIsInstance<Int> { name, _ -> val oddSum = workspace.data.filterByType<Int> { name, _ ->
name.toString().toInt() % 2 == 1 name.toString().toInt() % 2 == 1
}.foldToData(0) { l, r -> }.foldToData(0) { l, r ->
l + r.await() l + r.value
} }
data("odd", oddSum) data("odd", oddSum)
} }
@ -143,7 +143,7 @@ class SimpleWorkspaceTest {
} }
val customPipe by task<Int> { val customPipe by task<Int> {
workspace.data.filterIsInstance<Int>().forEach { data -> workspace.data.filterByType<Int>().forEach { data ->
val meta = data.meta.toMutableMeta().apply { val meta = data.meta.toMutableMeta().apply {
"newValue" put 22 "newValue" put 22
} }
@ -159,7 +159,7 @@ class SimpleWorkspaceTest {
fun testWorkspace() { fun testWorkspace() {
runBlocking { runBlocking {
val node = workspace.runBlocking("sum") val node = workspace.runBlocking("sum")
val res = node.dataSequence().single() val res = node.traverse().single()
assertEquals(328350, res.await()) assertEquals(328350, res.await())
} }
} }
@ -169,7 +169,7 @@ class SimpleWorkspaceTest {
fun testMetaPropagation() { fun testMetaPropagation() {
runBlocking { runBlocking {
val node = workspace.produce("sum") { "testFlag" put true } val node = workspace.produce("sum") { "testFlag" put true }
val res = node.dataSequence().single().await() val res = node.traverse().single().await()
} }
} }
@ -192,7 +192,7 @@ class SimpleWorkspaceTest {
fun testFilter() { fun testFilter() {
runBlocking { runBlocking {
val node = workspace.produce("filterOne") val node = workspace.produce("filterOne")
assertEquals(12, node.dataSequence().first().await()) assertEquals(12, node.traverse().first().await())
} }
} }
} }