diff --git a/CHANGELOG.md b/CHANGELOG.md index 94aafcc1..1d27c79b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - KTor 2.0 - DataTree `items` call is blocking. - DataSet `getData` is no longer suspended and renamed to `get` +- DataSet operates with sequences of data instead of flows ### Deprecated diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt index e7001d07..157564b1 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt @@ -1,7 +1,6 @@ package space.kscience.dataforge.actions import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch import space.kscience.dataforge.data.* import space.kscience.dataforge.meta.Meta @@ -50,7 +49,7 @@ internal class MapAction( meta: Meta, scope: CoroutineScope?, ): DataSet { - suspend fun mapOne(data: NamedData): NamedData { + fun mapOne(data: NamedData): NamedData { // Creating a new environment for action using **old** name, old meta and task meta val env = ActionEnv(data.name, data.meta, meta) @@ -75,16 +74,16 @@ internal class MapAction( return newData.named(newName) } - val flow = dataSet.flowData().map(::mapOne) + val sequence = dataSet.dataSequence().map(::mapOne) return ActiveDataTree(outputType) { - populateWith(flow) + populateWith(sequence) scope?.launch { dataSet.updates.collect { name -> //clear old nodes remove(name) //collect new items - populateWith(dataSet.flowChildren(name).map(::mapOne)) + populateWith(dataSet.children(name).map(::mapOne)) } } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt index 126f1f46..5bb22bfc 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt @@ -3,7 +3,6 @@ package space.kscience.dataforge.actions import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.fold import space.kscience.dataforge.data.* import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.MutableMeta @@ -84,7 +83,7 @@ internal class ReduceAction( override fun CoroutineScope.transform(set: DataSet, meta: Meta, key: Name): Flow> = flow { ReduceGroupBuilder(inputType, this@transform, meta).apply(action).buildGroups(set).forEach { group -> - val dataFlow: Map> = group.set.flowData().fold(HashMap()) { acc, value -> + val dataFlow: Map> = group.set.dataSequence().fold(HashMap()) { acc, value -> acc.apply { acc[value.name] = value.data } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt index b55a0e08..88d047cd 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt @@ -2,10 +2,6 @@ package space.kscience.dataforge.actions import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.asFlow -import kotlinx.coroutines.flow.flatMapConcat -import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch import space.kscience.dataforge.data.* import space.kscience.dataforge.meta.Laminate @@ -58,14 +54,14 @@ internal class SplitAction( scope: CoroutineScope?, ): DataSet { - suspend fun splitOne(data: NamedData): Flow> { + fun splitOne(data: NamedData): Sequence> { val laminate = Laminate(data.meta, meta) val split = SplitBuilder(data.name, data.meta).apply(action) // apply individual fragment rules to result - return split.fragments.entries.asFlow().map { (fragmentName, rule) -> + return split.fragments.entries.asSequence().map { (fragmentName, rule) -> val env = SplitBuilder.FragmentRule(fragmentName, laminate.toMutableMeta()).apply(rule) //data.map(outputType, meta = env.meta) { env.result(it) }.named(fragmentName) @OptIn(DFInternal::class) Data(outputType, meta = env.meta, dependencies = listOf(data)) { @@ -75,13 +71,13 @@ internal class SplitAction( } return ActiveDataTree(outputType) { - populateWith(dataSet.flowData().flatMapConcat(transform = ::splitOne)) + populateWith(dataSet.dataSequence().flatMap (transform = ::splitOne)) scope?.launch { dataSet.updates.collect { name -> //clear old nodes remove(name) //collect new items - populateWith(dataSet.flowChildren(name).flatMapConcat(transform = ::splitOne)) + populateWith(dataSet.children(name).flatMap(transform = ::splitOne)) } } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt index f4838c85..d4ca9296 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt @@ -1,7 +1,9 @@ package space.kscience.dataforge.data import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.mapNotNull import space.kscience.dataforge.data.Data.Companion.TYPE_OF_NOTHING import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.set @@ -23,7 +25,7 @@ public interface DataSet { /** * Traverse this provider or its child. The order is not guaranteed. */ - public fun flowData(): Flow> + public fun dataSequence(): Sequence> /** * Get data with given name. @@ -34,8 +36,8 @@ public interface DataSet { /** * Get a snapshot of names of top level children of given node. Empty if node does not exist or is a leaf. */ - public suspend fun listTop(prefix: Name = Name.EMPTY): List = - flowData().map { it.name }.filter { it.startsWith(prefix) && (it.length == prefix.length + 1) }.toList() + public fun listTop(prefix: Name = Name.EMPTY): List = + dataSequence().map { it.name }.filter { it.startsWith(prefix) && (it.length == prefix.length + 1) }.toList() // By default, traverses the whole tree. Could be optimized in descendants public companion object { @@ -50,13 +52,15 @@ public interface DataSet { //private val nothing: Nothing get() = error("this is nothing") - override fun flowData(): Flow> = emptyFlow() + override fun dataSequence(): Sequence> = emptySequence() override fun get(name: Name): Data? = null } } } +public operator fun DataSet.get(name:String): Data? = get(name.parseAsName()) + public interface ActiveDataSet : DataSet { /** * A flow of updated item names. Updates are propagated in a form of [Flow] of names of updated nodes. @@ -72,8 +76,8 @@ public val DataSet.updates: Flow get() = if (this is ActiveDa /** * Flow all data nodes with names starting with [branchName] */ -public fun DataSet.flowChildren(branchName: Name): Flow> = - this@flowChildren.flowData().filter { +public fun DataSet.children(branchName: Name): Sequence> = + this@children.dataSequence().filter { it.name.startsWith(branchName) } @@ -81,7 +85,7 @@ public fun DataSet.flowChildren(branchName: Name): Flow DataSet.startAll(coroutineScope: CoroutineScope): Job = coroutineScope.launch { - flowData().map { + dataSequence().map { it.launch(this@launch) }.toList().joinAll() } @@ -89,7 +93,7 @@ public fun DataSet.startAll(coroutineScope: CoroutineScope): Job = public suspend fun DataSet.join(): Unit = coroutineScope { startAll(this).join() } public suspend fun DataSet<*>.toMeta(): Meta = Meta { - flowData().collect { + dataSequence().forEach { if (it.name.endsWith(DataSet.META_KEY)) { set(it.name, it.meta) } else { diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSetBuilder.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSetBuilder.kt index 0c13f2c6..d279e34c 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSetBuilder.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSetBuilder.kt @@ -30,7 +30,7 @@ public interface DataSetBuilder { } //Set new items - dataSet.flowData().collect { + dataSet.dataSequence().forEach { data(name + it.name, it.data) } } @@ -148,7 +148,7 @@ public suspend inline fun DataSetBuilder.static( */ @DFExperimental public suspend fun DataSetBuilder.populateFrom(tree: DataSet): Unit = coroutineScope { - tree.flowData().collect { + tree.dataSequence().forEach { //TODO check if the place is occupied data(it.name, it.data) } @@ -159,3 +159,9 @@ public suspend fun DataSetBuilder.populateWith(flow: Flow DataSetBuilder.populateWith(sequence: Sequence>) { + sequence.forEach { + data(it.name, it.data) + } +} diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataTree.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataTree.kt index 8eee0113..aab14e25 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataTree.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataTree.kt @@ -43,18 +43,18 @@ public interface DataTree : DataSet { override val meta: Meta get() = items[META_ITEM_NAME_TOKEN]?.meta ?: Meta.EMPTY - override fun flowData(): Flow> = flow { + override fun dataSequence(): Sequence> = sequence { items.forEach { (token, childItem: DataTreeItem) -> if (!token.body.startsWith("@")) { when (childItem) { - is DataTreeItem.Leaf -> emit(childItem.data.named(token.asName())) - is DataTreeItem.Node -> emitAll(childItem.tree.flowData().map { it.named(token + it.name) }) + is DataTreeItem.Leaf -> yield(childItem.data.named(token.asName())) + is DataTreeItem.Node -> yieldAll(childItem.tree.dataSequence().map { it.named(token + it.name) }) } } } } - override suspend fun listTop(prefix: Name): List = + override fun listTop(prefix: Name): List = getItem(prefix).tree?.items?.keys?.map { prefix + it } ?: emptyList() override fun get(name: Name): Data? = when (name.length) { diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt index beaa3da0..5ef8a6d5 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt @@ -43,7 +43,7 @@ public interface GroupRule { ): Map> { val map = HashMap>() - set.flowData().collect { data -> + set.dataSequence().forEach { data -> val tagValue = data.meta[key]?.string ?: defaultTagValue map.getOrPut(tagValue) { ActiveDataTree(set.dataType) }.data(data.name, data.data) } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt index f37459dc..06b1ff6f 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt @@ -54,7 +54,7 @@ internal class StaticDataTree( set(name, DataTreeItem.Node(dataSet)) } else { coroutineScope { - dataSet.flowData().collect { + dataSet.dataSequence().forEach { data(name + it.name, it.data) } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataFilter.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataFilter.kt index 46f07260..5ee7027d 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataFilter.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataFilter.kt @@ -24,8 +24,8 @@ public fun DataSet.filter( override val meta: Meta get() = this@filter.meta - override fun flowData(): Flow> = - this@filter.flowData().filter { predicate(it.name, it.data) } + override fun dataSequence(): Sequence> = + this@filter.dataSequence().filter { predicate(it.name, it.data) } override fun get(name: Name): Data? = this@filter.get(name)?.takeIf { predicate(name, it) @@ -48,7 +48,8 @@ else object : ActiveDataSet { override val meta: Meta get() = this@withNamePrefix.meta - override fun flowData(): Flow> = this@withNamePrefix.flowData().map { it.data.named(prefix + it.name) } + override fun dataSequence(): Sequence> = + this@withNamePrefix.dataSequence().map { it.data.named(prefix + it.name) } override fun get(name: Name): Data? = name.removeHeadOrNull(name)?.let { this@withNamePrefix.get(it) } @@ -66,7 +67,7 @@ public fun DataSet.branch(branchName: Name): DataSet = if (branc override val meta: Meta get() = this@branch.meta - override fun flowData(): Flow> = this@branch.flowData().mapNotNull { + override fun dataSequence(): Sequence> = this@branch.dataSequence().mapNotNull { it.name.removeHeadOrNull(branchName)?.let { name -> it.data.named(name) } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt index 8c3332df..c8180eb5 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt @@ -1,9 +1,7 @@ package space.kscience.dataforge.data import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.fold import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.toList import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.MutableMeta import space.kscience.dataforge.meta.seal @@ -100,11 +98,11 @@ public inline fun Map>.reduceToData( * Transform a [Flow] of [NamedData] to a single [Data]. */ @DFInternal -public suspend fun Flow>.reduceToData( +public inline fun Sequence>.reduceToData( outputType: KType, coroutineContext: CoroutineContext = EmptyCoroutineContext, meta: Meta = Meta.EMPTY, - transformation: suspend (Flow>) -> R, + crossinline transformation: suspend (Sequence>) -> R, ): Data = Data( outputType, meta, @@ -115,10 +113,10 @@ public suspend fun Flow>.reduceToData( } @OptIn(DFInternal::class) -public suspend inline fun Flow>.reduceToData( +public inline fun Sequence>.reduceToData( coroutineContext: CoroutineContext = EmptyCoroutineContext, meta: Meta = Meta.EMPTY, - noinline transformation: suspend (Flow>) -> R, + crossinline transformation: suspend (Sequence>) -> R, ): Data = reduceToData(typeOf(), coroutineContext, meta) { transformation(it) } @@ -126,15 +124,15 @@ public suspend inline fun Flow>.reduceTo /** * Fold a flow of named data into a single [Data] */ -public suspend inline fun Flow>.foldToData( +public inline fun Sequence>.foldToData( initial: R, coroutineContext: CoroutineContext = EmptyCoroutineContext, meta: Meta = Meta.EMPTY, - noinline block: suspend (result: R, data: NamedData) -> R, + crossinline block: suspend (result: R, data: NamedData) -> R, ): Data = reduceToData( coroutineContext, meta ) { - it.fold(initial, block) + it.fold(initial) { acc, t -> block(acc, t) } } //DataSet operations @@ -147,7 +145,7 @@ public suspend fun DataSet.map( block: suspend (T) -> R, ): DataTree = DataTree(outputType) { populateWith( - flowData().map { + dataSequence().map { val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal() Data(outputType, newMeta, coroutineContext, listOf(it)) { block(it.await()) @@ -165,20 +163,20 @@ public suspend inline fun DataSet.map( public suspend fun DataSet.forEach(block: suspend (NamedData) -> Unit) { contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } - flowData().collect { + dataSequence().forEach { block(it) } } -public suspend inline fun DataSet.reduceToData( +public inline fun DataSet.reduceToData( coroutineContext: CoroutineContext = EmptyCoroutineContext, meta: Meta = Meta.EMPTY, - noinline transformation: suspend (Flow>) -> R, -): Data = flowData().reduceToData(coroutineContext, meta, transformation) + crossinline transformation: suspend (Sequence>) -> R, +): Data = dataSequence().reduceToData(coroutineContext, meta, transformation) -public suspend inline fun DataSet.foldToData( +public inline fun DataSet.foldToData( initial: R, coroutineContext: CoroutineContext = EmptyCoroutineContext, meta: Meta = Meta.EMPTY, - noinline block: suspend (result: R, data: NamedData) -> R, -): Data = flowData().foldToData(initial, coroutineContext, meta, block) \ No newline at end of file + crossinline block: suspend (result: R, data: NamedData) -> R, +): Data = dataSequence().foldToData(initial, coroutineContext, meta, block) \ No newline at end of file diff --git a/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/select.kt b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/select.kt index 6c53c147..f32ff3f3 100644 --- a/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/select.kt +++ b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/select.kt @@ -2,7 +2,6 @@ package space.kscience.dataforge.data import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.map import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.names.Name @@ -45,19 +44,19 @@ public fun DataSet<*>.select( && (namePattern == null || name.matches(namePattern)) && filter(name, datum.meta) - override fun flowData(): Flow> = this@select.flowData().filter { + override fun dataSequence(): Sequence> = this@select.dataSequence().filter { checkDatum(it.name, it.data) }.map { @Suppress("UNCHECKED_CAST") it as NamedData } - override fun get(name: Name): Data? = this@select.get(name)?.let { datum -> + override fun get(name: Name): Data? = this@select[name]?.let { datum -> if (checkDatum(name, datum)) datum.castOrNull(type) else null } override val updates: Flow = this@select.updates.filter { - val datum = this@select.get(it) ?: return@filter false + val datum = this@select[it] ?: return@filter false checkDatum(it, datum) } } diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt index fc8e1e06..a1fb4e84 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt @@ -1,7 +1,5 @@ package space.kscience.dataforge.workspace -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.map import space.kscience.dataforge.data.DataSet import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.names.Name @@ -25,7 +23,7 @@ public interface TaskResult : DataSet { */ public val taskMeta: Meta - override fun flowData(): Flow> + override fun dataSequence(): Sequence> override fun get(name: Name): TaskData? } @@ -36,7 +34,7 @@ private class TaskResultImpl( override val taskMeta: Meta, ) : TaskResult, DataSet by dataSet { - override fun flowData(): Flow> = dataSet.flowData().map { + override fun dataSequence(): Sequence> = dataSet.dataSequence().map { workspace.wrapData(it, it.name, taskName, taskMeta) } diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt index 91a3ffd2..07b8fe36 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt @@ -1,6 +1,5 @@ package space.kscience.dataforge.workspace -import kotlinx.coroutines.flow.single import kotlinx.coroutines.runBlocking import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.PluginFactory @@ -16,7 +15,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() { val allData by task { val selectedData = workspace.data.select() - val result: Data = selectedData.flowData().foldToData(0) { result, data -> + val result: Data = selectedData.dataSequence().foldToData(0) { result, data -> result + data.await() } data("result", result) @@ -58,7 +57,7 @@ class DataPropagationTest { fun testAllData() { runBlocking { val node = testWorkspace.produce("Test.allData") - assertEquals(4950, node.flowData().single().await()) + assertEquals(4950, node.dataSequence().single().await()) } } @@ -66,7 +65,7 @@ class DataPropagationTest { fun testSingleData() { runBlocking { val node = testWorkspace.produce("Test.singleData") - assertEquals(12, node.flowData().single().await()) + assertEquals(12, node.dataSequence().single().await()) } } } \ No newline at end of file diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt index e1a5a466..3bd09251 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt @@ -2,8 +2,6 @@ package space.kscience.dataforge.workspace -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.flow.single import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Timeout import space.kscience.dataforge.context.* @@ -161,7 +159,7 @@ class SimpleWorkspaceTest { fun testWorkspace() { runBlocking { val node = workspace.runBlocking("sum") - val res = node.flowData().single() + val res = node.dataSequence().single() assertEquals(328350, res.await()) } } @@ -171,7 +169,7 @@ class SimpleWorkspaceTest { fun testMetaPropagation() { runBlocking { val node = workspace.produce("sum") { "testFlag" put true } - val res = node.flowData().single().await() + val res = node.dataSequence().single().await() } } @@ -194,7 +192,7 @@ class SimpleWorkspaceTest { fun testFilter() { runBlocking { val node = workspace.produce("filterOne") - assertEquals(12, node.flowData().first().await()) + assertEquals(12, node.dataSequence().first().await()) } } } \ No newline at end of file