Replace sequences by iterators in DataSet

This commit is contained in:
Alexander Nozik 2022-05-10 14:45:58 +03:00
parent fe92e8fccf
commit a546552540
No known key found for this signature in database
GPG Key ID: F7FCF2DD25C71357
25 changed files with 242 additions and 229 deletions

View File

@ -16,6 +16,7 @@
- PartialEnvelope uses `Int` instead `UInt`. - PartialEnvelope uses `Int` instead `UInt`.
- `ActiveDataSet` renamed to `DataSource` - `ActiveDataSet` renamed to `DataSource`
- `selectOne`->`getByType` - `selectOne`->`getByType`
- Data traversal in `DataSet` is done via iterator
### Deprecated ### Deprecated

View File

@ -4,7 +4,7 @@ plugins {
allprojects { allprojects {
group = "space.kscience" group = "space.kscience"
version = "0.6.0-dev-6" version = "0.6.0-dev-7"
} }
subprojects { subprojects {

View File

@ -3,6 +3,7 @@ package space.kscience.dataforge.actions
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import space.kscience.dataforge.data.* import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.startsWith import space.kscience.dataforge.names.startsWith
import kotlin.reflect.KType import kotlin.reflect.KType
@ -18,36 +19,47 @@ internal fun MutableMap<Name, *>.removeWhatStartsWith(name: Name) {
/** /**
* An action that caches results on-demand and recalculates them on source push * An action that caches results on-demand and recalculates them on source push
*/ */
public abstract class CachingAction<in T : Any, out R : Any>( public abstract class AbstractAction<in T : Any, R : Any>(
public val outputType: KType, public val outputType: KType,
) : Action<T, R> { ) : Action<T, R> {
protected abstract fun transform( /**
set: DataSet<T>, * Generate initial content of the output
*/
protected abstract fun DataSetBuilder<R>.generate(
data: DataSet<T>,
meta: Meta, meta: Meta,
key: Name = Name.EMPTY, )
): Sequence<NamedData<R>>
/**
* Update part of the data set when given [updateKey] is triggered by the source
*/
protected open fun DataSourceBuilder<R>.update(
dataSet: DataSet<T>,
meta: Meta,
updateKey: Name,
) {
// By default, recalculate the whole dataset
generate(dataSet, meta)
}
@OptIn(DFInternal::class)
override fun execute( override fun execute(
dataSet: DataSet<T>, dataSet: DataSet<T>,
meta: Meta, meta: Meta,
): DataSet<R> = if (dataSet is DataSource) { ): DataSet<R> = if (dataSet is DataSource) {
DataSourceBuilder<R>(outputType, dataSet.coroutineContext).apply { DataSource(outputType, dataSet){
populateFrom(transform(dataSet, meta)) generate(dataSet, meta)
launch { launch {
dataSet.updates.collect { dataSet.updates.collect { name ->
//clear old nodes update(dataSet, meta, name)
remove(it)
//collect new items
populateFrom(transform(dataSet, meta, it))
//FIXME if the target is data, updates are fired twice
} }
} }
} }
} else { } else {
DataTree<R>(outputType) { DataTree<R>(outputType) {
populateFrom(transform(dataSet, meta)) generate(dataSet, meta)
} }
} }
} }

View File

@ -1,6 +1,5 @@
package space.kscience.dataforge.actions package space.kscience.dataforge.actions
import kotlinx.coroutines.launch
import space.kscience.dataforge.data.* import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta import space.kscience.dataforge.meta.MutableMeta
@ -53,23 +52,18 @@ public class MapActionBuilder<T, R>(
} }
@PublishedApi @PublishedApi
internal class MapAction<in T : Any, out R : Any>( internal class MapAction<in T : Any, R : Any>(
private val outputType: KType, outputType: KType,
private val block: MapActionBuilder<T, R>.() -> Unit, private val block: MapActionBuilder<T, R>.() -> Unit,
) : Action<T, R> { ) : AbstractAction<T, R>(outputType) {
override fun execute( private fun DataSetBuilder<R>.mapOne(name: Name, data: Data<T>, meta: Meta) {
dataSet: DataSet<T>,
meta: Meta,
): DataSet<R> {
fun mapOne(data: NamedData<T>): NamedData<R> {
// Creating a new environment for action using **old** name, old meta and task meta // Creating a new environment for action using **old** name, old meta and task meta
val env = ActionEnv(data.name, data.meta, meta) val env = ActionEnv(name, data.meta, meta)
//applying transformation from builder //applying transformation from builder
val builder = MapActionBuilder<T, R>( val builder = MapActionBuilder<T, R>(
data.name, name,
data.meta.toMutableMeta(), // using data meta data.meta.toMutableMeta(), // using data meta
meta, meta,
outputType outputType
@ -86,28 +80,16 @@ internal class MapAction<in T : Any, out R : Any>(
builder.result(env, data.await()) builder.result(env, data.await())
} }
//setting the data node //setting the data node
return newData.named(newName) data(newName, newData)
} }
val sequence = dataSet.traverse().map(::mapOne) override fun DataSetBuilder<R>.generate(data: DataSet<T>, meta: Meta) {
data.forEach { mapOne(it.name, it.data, meta) }
}
return if (dataSet is DataSource ) { override fun DataSourceBuilder<R>.update(dataSet: DataSet<T>, meta: Meta, updateKey: Name) {
ActiveDataTree(outputType, dataSet) { remove(updateKey)
populateFrom(sequence) dataSet[updateKey]?.let { mapOne(updateKey, it, meta) }
launch {
dataSet.updates.collect { name ->
//clear old nodes
remove(name)
//collect new items
populateFrom(dataSet.children(name).map(::mapOne))
}
}
}
} else {
DataTree(outputType) {
populateFrom(sequence)
}
}
} }
} }

View File

@ -81,13 +81,12 @@ public class ReduceGroupBuilder<T : Any, R : Any>(
internal class ReduceAction<T : Any, R : Any>( internal class ReduceAction<T : Any, R : Any>(
outputType: KType, outputType: KType,
private val action: ReduceGroupBuilder<T, R>.() -> Unit, private val action: ReduceGroupBuilder<T, R>.() -> Unit,
) : CachingAction<T, R>(outputType) { ) : AbstractAction<T, R>(outputType) {
//TODO optimize reduction. Currently, the whole action recalculates on push //TODO optimize reduction. Currently, the whole action recalculates on push
override fun DataSetBuilder<R>.generate(data: DataSet<T>, meta: Meta) {
override fun transform(set: DataSet<T>, meta: Meta, key: Name): Sequence<NamedData<R>> = sequence { ReduceGroupBuilder<T, R>(meta, outputType).apply(action).buildGroups(data).forEach { group ->
ReduceGroupBuilder<T, R>(meta, outputType).apply(action).buildGroups(set).forEach { group -> val dataFlow: Map<Name, Data<T>> = group.set.asSequence().fold(HashMap()) { acc, value ->
val dataFlow: Map<Name, Data<T>> = group.set.traverse().fold(HashMap()) { acc, value ->
acc.apply { acc.apply {
acc[value.name] = value.data acc[value.name] = value.data
} }
@ -103,7 +102,7 @@ internal class ReduceAction<T : Any, R : Any>(
meta = groupMeta meta = groupMeta
) { group.result.invoke(env, it) } ) { group.result.invoke(env, it) }
yield(res.named(env.name)) data(env.name, res)
} }
} }
} }

View File

@ -1,13 +1,11 @@
package space.kscience.dataforge.actions package space.kscience.dataforge.actions
import kotlinx.coroutines.launch
import space.kscience.dataforge.data.* import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Laminate import space.kscience.dataforge.meta.Laminate
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.meta.toMutableMeta import space.kscience.dataforge.meta.toMutableMeta
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import kotlin.collections.set import kotlin.collections.set
import kotlin.reflect.KType import kotlin.reflect.KType
@ -46,52 +44,41 @@ public class SplitBuilder<T : Any, R : Any>(public val name: Name, public val me
*/ */
@PublishedApi @PublishedApi
internal class SplitAction<T : Any, R : Any>( internal class SplitAction<T : Any, R : Any>(
private val outputType: KType, outputType: KType,
private val action: SplitBuilder<T, R>.() -> Unit, private val action: SplitBuilder<T, R>.() -> Unit,
) : Action<T, R> { ) : AbstractAction<T, R>(outputType) {
override fun execute( private fun DataSetBuilder<R>.splitOne(name: Name, data: Data<T>, meta: Meta) {
dataSet: DataSet<T>,
meta: Meta,
): DataSet<R> {
fun splitOne(data: NamedData<T>): Sequence<NamedData<R>> {
val laminate = Laminate(data.meta, meta) val laminate = Laminate(data.meta, meta)
val split = SplitBuilder<T, R>(data.name, data.meta).apply(action) val split = SplitBuilder<T, R>(name, data.meta).apply(action)
// apply individual fragment rules to result // apply individual fragment rules to result
return split.fragments.entries.asSequence().map { (fragmentName, rule) -> split.fragments.forEach { (fragmentName, rule) ->
val env = SplitBuilder.FragmentRule<T, R>( val env = SplitBuilder.FragmentRule<T, R>(
fragmentName, fragmentName,
laminate.toMutableMeta(), laminate.toMutableMeta(),
outputType outputType
).apply(rule) ).apply(rule)
//data.map<R>(outputType, meta = env.meta) { env.result(it) }.named(fragmentName) //data.map<R>(outputType, meta = env.meta) { env.result(it) }.named(fragmentName)
@OptIn(DFInternal::class) Data(outputType, meta = env.meta, dependencies = listOf(data)) {
data(
fragmentName,
@Suppress("OPT_IN_USAGE") Data(outputType, meta = env.meta, dependencies = listOf(data)) {
env.result(data.await()) env.result(data.await())
}.named(fragmentName) }
)
} }
} }
return if (dataSet is DataSource) { override fun DataSetBuilder<R>.generate(data: DataSet<T>, meta: Meta) {
ActiveDataTree<R>(outputType, dataSet) { data.forEach { splitOne(it.name, it.data, meta) }
populateFrom(dataSet.traverse().flatMap(transform = ::splitOne))
launch {
dataSet.updates.collect { name ->
//clear old nodes
remove(name)
//collect new items
populateFrom(dataSet.children(name).flatMap(transform = ::splitOne))
}
}
}
} else {
DataTree<R>(outputType) {
populateFrom(dataSet.traverse().flatMap(transform = ::splitOne))
}
} }
override fun DataSourceBuilder<R>.update(dataSet: DataSet<T>, meta: Meta, updateKey: Name) {
remove(updateKey)
dataSet[updateKey]?.let { splitOne(updateKey, it, meta) }
} }
} }

View File

@ -7,7 +7,10 @@ import kotlinx.coroutines.flow.mapNotNull
import space.kscience.dataforge.data.Data.Companion.TYPE_OF_NOTHING import space.kscience.dataforge.data.Data.Companion.TYPE_OF_NOTHING
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.set import space.kscience.dataforge.meta.set
import space.kscience.dataforge.names.* import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.endsWith
import space.kscience.dataforge.names.parseAsName
import kotlin.reflect.KType import kotlin.reflect.KType
public interface DataSet<out T : Any> { public interface DataSet<out T : Any> {
@ -25,7 +28,7 @@ public interface DataSet<out T : Any> {
/** /**
* Traverse this [DataSet] returning named data instances. The order is not guaranteed. * Traverse this [DataSet] returning named data instances. The order is not guaranteed.
*/ */
public fun traverse(): Sequence<NamedData<T>> public operator fun iterator(): Iterator<NamedData<T>>
/** /**
* Get data with given name. * Get data with given name.
@ -42,19 +45,27 @@ public interface DataSet<out T : Any> {
override val dataType: KType = TYPE_OF_NOTHING override val dataType: KType = TYPE_OF_NOTHING
override val meta: Meta get() = Meta.EMPTY override val meta: Meta get() = Meta.EMPTY
override fun traverse(): Sequence<NamedData<Nothing>> = emptySequence() override fun iterator(): Iterator<NamedData<Nothing>> = emptySequence<NamedData<Nothing>>().iterator()
override fun get(name: Name): Data<Nothing>? = null override fun get(name: Name): Data<Nothing>? = null
} }
} }
} }
public fun <T : Any> DataSet<T>.asSequence(): Sequence<NamedData<T>> = object : Sequence<NamedData<T>> {
override fun iterator(): Iterator<NamedData<T>> = this@asSequence.iterator()
}
public fun <T : Any> DataSet<T>.asIterable(): Iterable<NamedData<T>> = object : Iterable<NamedData<T>> {
override fun iterator(): Iterator<NamedData<T>> = this@asIterable.iterator()
}
public operator fun <T : Any> DataSet<T>.get(name: String): Data<T>? = get(name.parseAsName()) public operator fun <T : Any> DataSet<T>.get(name: String): Data<T>? = get(name.parseAsName())
/** /**
* A [DataSet] with propagated updates. * A [DataSet] with propagated updates.
*/ */
public interface DataSource<T : Any> : DataSet<T>, CoroutineScope { public interface DataSource<out T : Any> : DataSet<T>, CoroutineScope {
/** /**
* A flow of updated item names. Updates are propagated in a form of [Flow] of names of updated nodes. * A flow of updated item names. Updates are propagated in a form of [Flow] of names of updated nodes.
@ -73,28 +84,28 @@ public interface DataSource<T : Any> : DataSet<T>, CoroutineScope {
} }
public val <T : Any> DataSet<T>.updates: Flow<Name> get() = if (this is DataSource) updates else emptyFlow() public val <T : Any> DataSet<T>.updates: Flow<Name> get() = if (this is DataSource) updates else emptyFlow()
//
/** ///**
* Flow all data nodes with names starting with [branchName] // * Flow all data nodes with names starting with [branchName]
*/ // */
public fun <T : Any> DataSet<T>.children(branchName: Name): Sequence<NamedData<T>> = //public fun <T : Any> DataSet<T>.children(branchName: Name): Sequence<NamedData<T>> =
this@children.traverse().filter { // this@children.asSequence().filter {
it.name.startsWith(branchName) // it.name.startsWith(branchName)
} // }
/** /**
* Start computation for all goals in data node and return a job for the whole node * Start computation for all goals in data node and return a job for the whole node
*/ */
public fun <T : Any> DataSet<T>.startAll(coroutineScope: CoroutineScope): Job = coroutineScope.launch { public fun <T : Any> DataSet<T>.startAll(coroutineScope: CoroutineScope): Job = coroutineScope.launch {
traverse().map { asIterable().map {
it.launch(this@launch) it.launch(this@launch)
}.toList().joinAll() }.joinAll()
} }
public suspend fun <T : Any> DataSet<T>.join(): Unit = coroutineScope { startAll(this).join() } public suspend fun <T : Any> DataSet<T>.join(): Unit = coroutineScope { startAll(this).join() }
public suspend fun DataSet<*>.toMeta(): Meta = Meta { public fun DataSet<*>.toMeta(): Meta = Meta {
traverse().forEach { forEach {
if (it.name.endsWith(DataSet.META_KEY)) { if (it.name.endsWith(DataSet.META_KEY)) {
set(it.name, it.meta) set(it.name, it.meta)
} else { } else {

View File

@ -28,7 +28,7 @@ public interface DataSetBuilder<in T : Any> {
} }
//Set new items //Set new items
dataSet.traverse().forEach { dataSet.forEach {
data(name + it.name, it.data) data(name + it.name, it.data)
} }
} }
@ -146,7 +146,7 @@ public inline fun <reified T : Any> DataSetBuilder<T>.static(
*/ */
@DFExperimental @DFExperimental
public fun <T : Any> DataSetBuilder<T>.populateFrom(tree: DataSet<T>): Unit { public fun <T : Any> DataSetBuilder<T>.populateFrom(tree: DataSet<T>): Unit {
tree.traverse().forEach { tree.forEach {
//TODO check if the place is occupied //TODO check if the place is occupied
data(it.name, it.data) data(it.name, it.data)
} }

View File

@ -39,12 +39,12 @@ public interface DataTree<out T : Any> : DataSet<T> {
override val meta: Meta get() = items[META_ITEM_NAME_TOKEN]?.meta ?: Meta.EMPTY override val meta: Meta get() = items[META_ITEM_NAME_TOKEN]?.meta ?: Meta.EMPTY
override fun traverse(): Sequence<NamedData<T>> = sequence { override fun iterator(): Iterator<NamedData<T>> = iterator {
items.forEach { (token, childItem: DataTreeItem<T>) -> items.forEach { (token, childItem: DataTreeItem<T>) ->
if (!token.body.startsWith("@")) { if (!token.body.startsWith("@")) {
when (childItem) { when (childItem) {
is DataTreeItem.Leaf -> yield(childItem.data.named(token.asName())) is DataTreeItem.Leaf -> yield(childItem.data.named(token.asName()))
is DataTreeItem.Node -> yieldAll(childItem.tree.traverse().map { it.named(token + it.name) }) is DataTreeItem.Node -> yieldAll(childItem.tree.asSequence().map { it.named(token + it.name) })
} }
} }
} }

View File

@ -3,9 +3,9 @@ package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.* import space.kscience.dataforge.names.*
import kotlin.collections.set import kotlin.collections.set
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
@ -14,13 +14,19 @@ import kotlin.jvm.Synchronized
import kotlin.reflect.KType import kotlin.reflect.KType
import kotlin.reflect.typeOf import kotlin.reflect.typeOf
public interface DataSourceBuilder<T : Any> : DataSetBuilder<T>, DataSource<T> {
override val updates: MutableSharedFlow<Name>
}
/** /**
* A mutable [DataTree] that propagates updates * A mutable [DataTree] that propagates updates
*/ */
public class DataSourceBuilder<T : Any>( @PublishedApi
internal class DataTreeBuilder<T : Any>(
override val dataType: KType, override val dataType: KType,
coroutineContext: CoroutineContext, coroutineContext: CoroutineContext,
) : DataTree<T>, DataSetBuilder<T>, DataSource<T> { ) : DataTree<T>, DataSourceBuilder<T> {
override val coroutineContext: CoroutineContext = override val coroutineContext: CoroutineContext =
coroutineContext + Job(coroutineContext[Job]) + GoalExecutionRestriction() coroutineContext + Job(coroutineContext[Job]) + GoalExecutionRestriction()
@ -29,23 +35,20 @@ public class DataSourceBuilder<T : Any>(
override val items: Map<NameToken, DataTreeItem<T>> override val items: Map<NameToken, DataTreeItem<T>>
get() = treeItems.filter { !it.key.body.startsWith("@") } get() = treeItems.filter { !it.key.body.startsWith("@") }
private val _updates = MutableSharedFlow<Name>() override val updates = MutableSharedFlow<Name>()
override val updates: SharedFlow<Name>
get() = _updates
@Synchronized @Synchronized
private fun remove(token: NameToken) { private fun remove(token: NameToken) {
if (treeItems.remove(token) != null) { if (treeItems.remove(token) != null) {
launch { launch {
_updates.emit(token.asName()) updates.emit(token.asName())
} }
} }
} }
override fun remove(name: Name) { override fun remove(name: Name) {
if (name.isEmpty()) error("Can't remove the root node") if (name.isEmpty()) error("Can't remove the root node")
(getItem(name.cutLast()).tree as? DataSourceBuilder)?.remove(name.lastOrNull()!!) (getItem(name.cutLast()).tree as? DataTreeBuilder)?.remove(name.lastOrNull()!!)
} }
@Synchronized @Synchronized
@ -58,11 +61,11 @@ public class DataSourceBuilder<T : Any>(
treeItems[token] = DataTreeItem.Node(node) treeItems[token] = DataTreeItem.Node(node)
} }
private fun getOrCreateNode(token: NameToken): DataSourceBuilder<T> = private fun getOrCreateNode(token: NameToken): DataTreeBuilder<T> =
(treeItems[token] as? DataTreeItem.Node<T>)?.tree as? DataSourceBuilder<T> (treeItems[token] as? DataTreeItem.Node<T>)?.tree as? DataTreeBuilder<T>
?: DataSourceBuilder<T>(dataType, coroutineContext).also { set(token, it) } ?: DataTreeBuilder<T>(dataType, coroutineContext).also { set(token, it) }
private fun getOrCreateNode(name: Name): DataSourceBuilder<T> = when (name.length) { private fun getOrCreateNode(name: Name): DataTreeBuilder<T> = when (name.length) {
0 -> this 0 -> this
1 -> getOrCreateNode(name.firstOrNull()!!) 1 -> getOrCreateNode(name.firstOrNull()!!)
else -> getOrCreateNode(name.firstOrNull()!!).getOrCreateNode(name.cutFirst()) else -> getOrCreateNode(name.firstOrNull()!!).getOrCreateNode(name.cutFirst())
@ -79,7 +82,7 @@ public class DataSourceBuilder<T : Any>(
} }
} }
launch { launch {
_updates.emit(name) updates.emit(name)
} }
} }
@ -91,32 +94,39 @@ public class DataSourceBuilder<T : Any>(
} }
/** /**
* Create a dynamic tree. Initial data is placed synchronously. * Create a dynamic [DataSource]. Initial data is placed synchronously.
*/ */
@DFInternal
@Suppress("FunctionName") @Suppress("FunctionName")
public fun <T : Any> ActiveDataTree( public fun <T : Any> DataSource(
type: KType, type: KType,
parent: CoroutineScope, parent: CoroutineScope,
block: DataSourceBuilder<T>.() -> Unit, block: DataSourceBuilder<T>.() -> Unit,
): DataSourceBuilder<T> { ): DataSource<T> {
val tree = DataSourceBuilder<T>(type, parent.coroutineContext) val tree = DataTreeBuilder<T>(type, parent.coroutineContext)
tree.block() tree.block()
return tree return tree
} }
@Suppress("OPT_IN_USAGE","FunctionName")
public inline fun <reified T : Any> DataSource(
parent: CoroutineScope,
crossinline block: DataSourceBuilder<T>.() -> Unit,
): DataSource<T> = DataSource(typeOf<T>(), parent) { block() }
@Suppress("FunctionName") @Suppress("FunctionName")
public suspend inline fun <reified T : Any> ActiveDataTree( public suspend inline fun <reified T : Any> DataSource(
crossinline block: DataSourceBuilder<T>.() -> Unit = {}, crossinline block: DataSourceBuilder<T>.() -> Unit = {},
): DataSourceBuilder<T> = DataSourceBuilder<T>(typeOf<T>(), coroutineContext).apply { block() } ): DataSourceBuilder<T> = DataTreeBuilder<T>(typeOf<T>(), coroutineContext).apply { block() }
public inline fun <reified T : Any> DataSourceBuilder<T>.emit( public inline fun <reified T : Any> DataSourceBuilder<T>.emit(
name: Name, name: Name,
parent: CoroutineScope, parent: CoroutineScope,
noinline block: DataSourceBuilder<T>.() -> Unit, noinline block: DataSourceBuilder<T>.() -> Unit,
): Unit = node(name, ActiveDataTree(typeOf<T>(), parent, block)) ): Unit = node(name, DataSource(parent, block))
public inline fun <reified T : Any> DataSourceBuilder<T>.emit( public inline fun <reified T : Any> DataSourceBuilder<T>.emit(
name: String, name: String,
parent: CoroutineScope, parent: CoroutineScope,
noinline block: DataSourceBuilder<T>.() -> Unit, noinline block: DataSourceBuilder<T>.() -> Unit,
): Unit = node(Name.parse(name), ActiveDataTree(typeOf<T>(), parent, block)) ): Unit = node(Name.parse(name), DataSource(parent, block))

View File

@ -18,6 +18,7 @@ package space.kscience.dataforge.data
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFInternal
public interface GroupRule { public interface GroupRule {
public fun <T : Any> gather(set: DataSet<T>): Map<String, DataSet<T>> public fun <T : Any> gather(set: DataSet<T>): Map<String, DataSet<T>>
@ -31,6 +32,7 @@ public interface GroupRule {
* @param defaultTagValue * @param defaultTagValue
* @return * @return
*/ */
@OptIn(DFInternal::class)
public fun byMetaValue( public fun byMetaValue(
key: String, key: String,
defaultTagValue: String, defaultTagValue: String,
@ -42,9 +44,9 @@ public interface GroupRule {
val map = HashMap<String, DataSet<T>>() val map = HashMap<String, DataSet<T>>()
if (set is DataSource) { if (set is DataSource) {
set.traverse().forEach { data -> set.forEach { data ->
val tagValue: String = data.meta[key]?.string ?: defaultTagValue val tagValue: String = data.meta[key]?.string ?: defaultTagValue
(map.getOrPut(tagValue) { DataSourceBuilder(set.dataType, set.coroutineContext) } as DataSourceBuilder<T>) (map.getOrPut(tagValue) { DataTreeBuilder(set.dataType, set.coroutineContext) } as DataTreeBuilder<T>)
.data(data.name, data.data) .data(data.name, data.data)
set.launch { set.launch {
@ -53,7 +55,7 @@ public interface GroupRule {
val updateTagValue = dataUpdate?.meta?.get(key)?.string ?: defaultTagValue val updateTagValue = dataUpdate?.meta?.get(key)?.string ?: defaultTagValue
map.getOrPut(updateTagValue) { map.getOrPut(updateTagValue) {
ActiveDataTree(set.dataType, this) { DataSource(set.dataType, this) {
data(name, dataUpdate) data(name, dataUpdate)
} }
} }
@ -61,7 +63,7 @@ public interface GroupRule {
} }
} }
} else { } else {
set.traverse().forEach { data -> set.forEach { data ->
val tagValue: String = data.meta[key]?.string ?: defaultTagValue val tagValue: String = data.meta[key]?.string ?: defaultTagValue
(map.getOrPut(tagValue) { StaticDataTree(set.dataType) } as StaticDataTree<T>) (map.getOrPut(tagValue) { StaticDataTree(set.dataType) } as StaticDataTree<T>)
.data(data.name, data.data) .data(data.name, data.data)

View File

@ -52,7 +52,7 @@ internal class StaticDataTree<T : Any>(
if (dataSet is StaticDataTree) { if (dataSet is StaticDataTree) {
set(name, DataTreeItem.Node(dataSet)) set(name, DataTreeItem.Node(dataSet))
} else { } else {
dataSet.traverse().forEach { dataSet.forEach {
data(name + it.name, it.data) data(name + it.name, it.data)
} }
} }

View File

@ -30,8 +30,13 @@ public fun <T : Any> DataSet<T>.filter(
override val meta: Meta get() = this@filter.meta override val meta: Meta get() = this@filter.meta
override fun traverse(): Sequence<NamedData<T>> = override fun iterator(): Iterator<NamedData<T>> = iterator {
this@filter.traverse().filter { predicate(it.name, it.meta) } for(d in this@filter){
if(predicate(d.name, d.meta)){
yield(d)
}
}
}
override fun get(name: Name): Data<T>? = this@filter.get(name)?.takeIf { override fun get(name: Name): Data<T>? = this@filter.get(name)?.takeIf {
predicate(name, it.meta) predicate(name, it.meta)
@ -58,8 +63,11 @@ public fun <T : Any> DataSet<T>.withNamePrefix(prefix: Name): DataSet<T> = if (p
override val meta: Meta get() = this@withNamePrefix.meta override val meta: Meta get() = this@withNamePrefix.meta
override fun traverse(): Sequence<NamedData<T>> = override fun iterator(): Iterator<NamedData<T>> = iterator {
this@withNamePrefix.traverse().map { it.data.named(prefix + it.name) } for(d in this@withNamePrefix){
yield(d.data.named(prefix + d.name))
}
}
override fun get(name: Name): Data<T>? = override fun get(name: Name): Data<T>? =
name.removeHeadOrNull(name)?.let { this@withNamePrefix.get(it) } name.removeHeadOrNull(name)?.let { this@withNamePrefix.get(it) }
@ -80,9 +88,11 @@ public fun <T : Any> DataSet<T>.branch(branchName: Name): DataSet<T> = if (branc
override val meta: Meta get() = this@branch.meta override val meta: Meta get() = this@branch.meta
override fun traverse(): Sequence<NamedData<T>> = this@branch.traverse().mapNotNull { override fun iterator(): Iterator<NamedData<T>> = iterator {
it.name.removeHeadOrNull(branchName)?.let { name -> for(d in this@branch){
it.data.named(name) d.name.removeHeadOrNull(branchName)?.let { name ->
yield(d.data.named(name))
}
} }
} }

View File

@ -7,8 +7,6 @@ import space.kscience.dataforge.meta.seal
import space.kscience.dataforge.meta.toMutableMeta import space.kscience.dataforge.meta.toMutableMeta
import space.kscience.dataforge.misc.DFInternal import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KType import kotlin.reflect.KType
@ -20,7 +18,8 @@ public suspend fun <T: Any> Data<T>.awaitWithMeta(): ValueWithMeta<T> = ValueWit
public data class NamedValueWithMeta<T>(val name: Name, val meta: Meta, val value: T) public data class NamedValueWithMeta<T>(val name: Name, val meta: Meta, val value: T)
public suspend fun <T: Any> NamedData<T>.awaitWithMeta(): NamedValueWithMeta<T> = NamedValueWithMeta(name, meta, await()) public suspend fun <T : Any> NamedData<T>.awaitWithMeta(): NamedValueWithMeta<T> =
NamedValueWithMeta(name, meta, await())
/** /**
@ -187,14 +186,13 @@ public suspend fun <T : Any, R : Any> DataSet<T>.map(
metaTransform: MutableMeta.() -> Unit = {}, metaTransform: MutableMeta.() -> Unit = {},
block: suspend (NamedValueWithMeta<T>) -> R, block: suspend (NamedValueWithMeta<T>) -> R,
): DataTree<R> = DataTree<R>(outputType) { ): DataTree<R> = DataTree<R>(outputType) {
populateFrom( forEach {
traverse().map {
val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal() val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal()
Data(outputType, newMeta, coroutineContext, listOf(it)) { val d = Data(outputType, newMeta, coroutineContext, listOf(it)) {
block(it.awaitWithMeta()) block(it.awaitWithMeta())
}.named(it.name)
} }
) data(it.name, d)
}
} }
@OptIn(DFInternal::class) @OptIn(DFInternal::class)
@ -204,10 +202,9 @@ public suspend inline fun <T : Any, reified R : Any> DataSet<T>.map(
noinline block: suspend (NamedValueWithMeta<T>) -> R, noinline block: suspend (NamedValueWithMeta<T>) -> R,
): DataTree<R> = map(typeOf<R>(), coroutineContext, metaTransform, block) ): DataTree<R> = map(typeOf<R>(), coroutineContext, metaTransform, block)
public suspend fun <T : Any> DataSet<T>.forEach(block: suspend (NamedData<T>) -> Unit) { public inline fun <T : Any> DataSet<T>.forEach(block: (NamedData<T>) -> Unit) {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } for (d in this) {
traverse().forEach { block(d)
block(it)
} }
} }
@ -215,11 +212,11 @@ public inline fun <T : Any, reified R : Any> DataSet<T>.reduceToData(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
crossinline transformation: suspend (Iterable<NamedValueWithMeta<T>>) -> R, crossinline transformation: suspend (Iterable<NamedValueWithMeta<T>>) -> R,
): Data<R> = traverse().asIterable().reduceNamedToData(coroutineContext, meta, transformation) ): Data<R> = asIterable().reduceNamedToData(coroutineContext, meta, transformation)
public inline fun <T : Any, reified R : Any> DataSet<T>.foldToData( public inline fun <T : Any, reified R : Any> DataSet<T>.foldToData(
initial: R, initial: R,
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
crossinline block: suspend (result: R, data: NamedValueWithMeta<T>) -> R, crossinline block: suspend (result: R, data: NamedValueWithMeta<T>) -> R,
): Data<R> = traverse().asIterable().foldNamedToData(initial, coroutineContext, meta, block) ): Data<R> = asIterable().foldNamedToData(initial, coroutineContext, meta, block)

View File

@ -5,7 +5,6 @@ import kotlinx.coroutines.flow.filter
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.matches
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KType import kotlin.reflect.KType
@ -29,7 +28,6 @@ private fun <R : Any> Data<*>.castOrNull(type: KType): Data<R>? =
/** /**
* Select all data matching given type and filters. Does not modify paths * Select all data matching given type and filters. Does not modify paths
* *
* @param namePattern a name match patter according to [Name.matches]
* @param predicate addition filtering condition based on item name and meta. By default, accepts all * @param predicate addition filtering condition based on item name and meta. By default, accepts all
*/ */
@OptIn(DFExperimental::class) @OptIn(DFExperimental::class)
@ -47,11 +45,13 @@ public fun <R : Any> DataSet<*>.filterByType(
private fun checkDatum(name: Name, datum: Data<*>): Boolean = datum.type.isSubtypeOf(type) private fun checkDatum(name: Name, datum: Data<*>): Boolean = datum.type.isSubtypeOf(type)
&& predicate(name, datum.meta) && predicate(name, datum.meta)
override fun traverse(): Sequence<NamedData<R>> = this@filterByType.traverse().filter { override fun iterator(): Iterator<NamedData<R>> = iterator {
checkDatum(it.name, it.data) for(d in this@filterByType){
}.map { if(checkDatum(d.name,d.data)){
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
it as NamedData<R> yield(d as NamedData<R>)
}
}
} }
override fun get(name: Name): Data<R>? = this@filterByType[name]?.let { datum -> override fun get(name: Name): Data<R>? = this@filterByType[name]?.let { datum ->

View File

@ -27,7 +27,7 @@ context(DataSetBuilder<T>) public infix fun <T : Any> String.put(
): Unit = node(Name.parse(this), block) ): Unit = node(Name.parse(this), block)
/** /**
* Copy given data set and mirror its changes to this [DataSourceBuilder] in [this@setAndObserve]. Returns an update [Job] * Copy given data set and mirror its changes to this [DataTreeBuilder] in [this@setAndObserve]. Returns an update [Job]
*/ */
context(DataSetBuilder<T>) public fun <T : Any> CoroutineScope.setAndWatch( context(DataSetBuilder<T>) public fun <T : Any> CoroutineScope.setAndWatch(
name: Name, name: Name,

View File

@ -1,5 +1,6 @@
package space.kscience.dataforge.data package space.kscience.dataforge.data
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
@ -9,7 +10,7 @@ import space.kscience.dataforge.actions.map
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import kotlin.test.assertEquals import kotlin.test.assertEquals
@OptIn(DFExperimental::class) @OptIn(DFExperimental::class, ExperimentalCoroutinesApi::class)
internal class ActionsTest { internal class ActionsTest {
@Test @Test
fun testStaticMapAction() = runTest { fun testStaticMapAction() = runTest {
@ -28,7 +29,7 @@ internal class ActionsTest {
@Test @Test
fun testDynamicMapAction() = runTest { fun testDynamicMapAction() = runTest {
val data: DataSourceBuilder<Int> = ActiveDataTree() val data: DataSourceBuilder<Int> = DataSource()
val plusOne = Action.map<Int, Int> { val plusOne = Action.map<Int, Int> {
result { it + 1 } result { it + 1 }

View File

@ -56,7 +56,7 @@ internal class DataTreeBuilderTest {
try { try {
lateinit var updateJob: Job lateinit var updateJob: Job
supervisorScope { supervisorScope {
val subNode = ActiveDataTree<Int> { val subNode = DataSource<Int> {
updateJob = launch { updateJob = launch {
repeat(10) { repeat(10) {
delay(10) delay(10)
@ -70,7 +70,7 @@ internal class DataTreeBuilderTest {
println(it) println(it)
} }
} }
val rootNode = ActiveDataTree<Int> { val rootNode = DataSource<Int> {
setAndWatch("sub".asName(), subNode) setAndWatch("sub".asName(), subNode)
} }

View File

@ -133,6 +133,7 @@ public fun Meta.getIndexed(name: Name): Map<String?, Meta> {
} }
} }
public fun Meta.getIndexed(name: String): Map<String?, Meta> = getIndexed(name.parseAsName())
/** /**
* A meta node that ensures that all of its descendants has at least the same type. * A meta node that ensures that all of its descendants has at least the same type.

View File

@ -1,6 +1,7 @@
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import space.kscience.dataforge.data.DataSet import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.forEach
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
@ -23,7 +24,7 @@ public interface TaskResult<out T : Any> : DataSet<T> {
*/ */
public val taskMeta: Meta public val taskMeta: Meta
override fun traverse(): Sequence<TaskData<T>> override fun iterator(): Iterator<TaskData<T>>
override fun get(name: Name): TaskData<T>? override fun get(name: Name): TaskData<T>?
} }
@ -34,8 +35,10 @@ private class TaskResultImpl<out T : Any>(
override val taskMeta: Meta, override val taskMeta: Meta,
) : TaskResult<T>, DataSet<T> by dataSet { ) : TaskResult<T>, DataSet<T> by dataSet {
override fun traverse(): Sequence<TaskData<T>> = dataSet.traverse().map { override fun iterator(): Iterator<TaskData<T>> = iterator {
workspace.wrapData(it, it.name, taskName, taskMeta) dataSet.forEach {
yield(workspace.wrapData(it, it.name, taskName, taskMeta))
}
} }
override fun get(name: Name): TaskData<T>? = dataSet.get(name)?.let { override fun get(name: Name): TaskData<T>? = dataSet.get(name)?.let {

View File

@ -3,6 +3,7 @@ package space.kscience.dataforge.workspace
import space.kscience.dataforge.context.ContextAware import space.kscience.dataforge.context.ContextAware
import space.kscience.dataforge.data.Data import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.DataSet import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.asSequence
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.misc.Type import space.kscience.dataforge.misc.Type
@ -35,7 +36,7 @@ public interface Workspace : ContextAware, Provider {
return when (target) { return when (target) {
"target", Meta.TYPE -> targets.mapKeys { Name.parse(it.key)} "target", Meta.TYPE -> targets.mapKeys { Name.parse(it.key)}
Task.TYPE -> tasks Task.TYPE -> tasks
Data.TYPE -> data.traverse().associateBy { it.name } Data.TYPE -> data.asSequence().associateBy { it.name }
else -> emptyMap() else -> emptyMap()
} }
} }

View File

@ -15,11 +15,9 @@ import space.kscience.dataforge.misc.DFBuilder
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName import space.kscience.dataforge.names.asName
import kotlin.collections.HashMap
import kotlin.collections.set import kotlin.collections.set
import kotlin.properties.PropertyDelegateProvider import kotlin.properties.PropertyDelegateProvider
import kotlin.properties.ReadOnlyProperty import kotlin.properties.ReadOnlyProperty
import kotlin.reflect.typeOf
public data class TaskReference<T : Any>(public val taskName: Name, public val task: Task<T>) : DataSelector<T> { public data class TaskReference<T : Any>(public val taskName: Name, public val task: Task<T>) : DataSelector<T> {
@ -106,8 +104,8 @@ public class WorkspaceBuilder(private val parentContext: Context = Global) : Tas
} }
@DFExperimental @DFExperimental
public fun buildActiveData(scope: CoroutineScope, builder: DataSourceBuilder<Any>.() -> Unit) { public fun data(scope: CoroutineScope, builder: DataSourceBuilder<Any>.() -> Unit) {
data = ActiveDataTree(typeOf<Any>(), scope, builder) data = DataSource(scope, builder)
} }
/** /**

View File

@ -3,6 +3,7 @@ package space.kscience.dataforge.workspace
import space.kscience.dataforge.data.DataSet import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.filterByType import space.kscience.dataforge.data.filterByType
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.matches import space.kscience.dataforge.names.matches
@ -10,6 +11,7 @@ import space.kscience.dataforge.names.matches
// data(builder) // data(builder)
//} //}
@OptIn(DFExperimental::class)
public inline fun <reified T : Any> TaskResultBuilder<*>.data(namePattern: Name? = null): DataSelector<T> = public inline fun <reified T : Any> TaskResultBuilder<*>.data(namePattern: Name? = null): DataSelector<T> =
object : DataSelector<T> { object : DataSelector<T> {
override suspend fun select(workspace: Workspace, meta: Meta): DataSet<T> = override suspend fun select(workspace: Workspace, meta: Meta): DataSet<T> =

View File

@ -1,6 +1,9 @@
@file:OptIn(ExperimentalCoroutinesApi::class)
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.runTest
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.PluginFactory import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag import space.kscience.dataforge.context.PluginTag
@ -15,7 +18,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
val allData by task<Int> { val allData by task<Int> {
val selectedData = workspace.data.filterByType<Int>() val selectedData = workspace.data.filterByType<Int>()
val result: Data<Int> = selectedData.traverse().asIterable().foldToData(0) { result, data -> val result: Data<Int> = selectedData.foldToData(0) { result, data ->
result + data.value result + data.value
} }
data("result", result) data("result", result)
@ -44,28 +47,22 @@ class DataPropagationTest {
context { context {
plugin(DataPropagationTestPlugin) plugin(DataPropagationTestPlugin)
} }
runBlocking {
data { data {
repeat(100) { repeat(100) {
static("myData[$it]", it) static("myData[$it]", it)
} }
} }
} }
}
@Test @Test
fun testAllData() { fun testAllData() = runTest {
runBlocking {
val node = testWorkspace.produce("Test.allData") val node = testWorkspace.produce("Test.allData")
assertEquals(4950, node.traverse().single().await()) assertEquals(4950, node.asSequence().single().await())
}
} }
@Test @Test
fun testSingleData() { fun testSingleData() = runTest {
runBlocking {
val node = testWorkspace.produce("Test.singleData") val node = testWorkspace.produce("Test.singleData")
assertEquals(12, node.traverse().single().await()) assertEquals(12, node.asSequence().single().await())
}
} }
} }

View File

@ -1,8 +1,11 @@
@file:Suppress("UNUSED_VARIABLE") @file:Suppress("UNUSED_VARIABLE")
@file:OptIn(ExperimentalCoroutinesApi::class)
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.Timeout
import space.kscience.dataforge.context.* import space.kscience.dataforge.context.*
import space.kscience.dataforge.data.* import space.kscience.dataforge.data.*
@ -26,7 +29,7 @@ public inline fun <reified P : Plugin> P.toFactory(): PluginFactory<P> = object
override val type: KClass<out P> = P::class override val type: KClass<out P> = P::class
} }
public fun Workspace.runBlocking(task: String, block: MutableMeta.() -> Unit = {}): DataSet<Any> = runBlocking { public fun Workspace.produceBlocking(task: String, block: MutableMeta.() -> Unit = {}): DataSet<Any> = runBlocking {
produce(task, block) produce(task, block)
} }
@ -156,21 +159,17 @@ class SimpleWorkspaceTest {
@Test @Test
@Timeout(1) @Timeout(1)
fun testWorkspace() { fun testWorkspace() = runTest {
runBlocking { val node = workspace.produce("sum")
val node = workspace.runBlocking("sum") val res = node.asSequence().single()
val res = node.traverse().single()
assertEquals(328350, res.await()) assertEquals(328350, res.await())
} }
}
@Test @Test
@Timeout(1) @Timeout(1)
fun testMetaPropagation() { fun testMetaPropagation() = runTest {
runBlocking {
val node = workspace.produce("sum") { "testFlag" put true } val node = workspace.produce("sum") { "testFlag" put true }
val res = node.traverse().single().await() val res = node.asSequence().single().await()
}
} }
@Test @Test
@ -192,7 +191,7 @@ class SimpleWorkspaceTest {
fun testFilter() { fun testFilter() {
runBlocking { runBlocking {
val node = workspace.produce("filterOne") val node = workspace.produce("filterOne")
assertEquals(12, node.traverse().first().await()) assertEquals(12, node.asSequence().first().await())
} }
} }
} }