From 0622bacc4d26cdfdb47a9e1b95910f19aa67c48c Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Wed, 4 May 2022 17:27:56 +0300 Subject: [PATCH] Refactor DataSet. Remove suspends where it is possible. --- CHANGELOG.md | 1 + build.gradle.kts | 5 +- .../kscience/dataforge/actions/Action.kt | 23 ++-- .../dataforge/actions/CachingAction.kt | 53 ++++++++ .../kscience/dataforge/actions/MapAction.kt | 27 ++-- .../dataforge/actions/ReduceAction.kt | 22 ++-- .../kscience/dataforge/actions/SplitAction.kt | 26 ++-- .../kscience/dataforge/data/ActiveDataTree.kt | 103 --------------- .../kscience/dataforge/data/CachingAction.kt | 51 -------- .../space/kscience/dataforge/data/DataSet.kt | 15 ++- .../kscience/dataforge/data/DataSetBuilder.kt | 60 +++++---- .../dataforge/data/DataSourceBuilder.kt | 122 ++++++++++++++++++ .../kscience/dataforge/data/GroupRule.kt | 41 +++--- .../kscience/dataforge/data/StaticDataTree.kt | 29 ++--- .../kscience/dataforge/data/dataFilter.kt | 31 +++-- .../kscience/dataforge/data/dataTransform.kt | 2 +- .../dataforge/data/actionInContext.kt | 2 + .../data/{select.kt => dataFilterJvm.kt} | 37 +++--- .../dataforge/data/dataSetBuilderInContext.kt | 10 +- .../kscience/dataforge/data/ActionsTest.kt | 35 ++--- .../dataforge/data/DataTreeBuilderTest.kt | 4 +- .../dataforge/workspace/WorkspaceBuilder.kt | 15 ++- .../dataforge/workspace/workspaceJvm.kt | 23 ++-- .../workspace/DataPropagationTest.kt | 4 +- .../dataforge/workspace/FileDataTest.kt | 19 ++- .../workspace/SimpleWorkspaceTest.kt | 10 +- 26 files changed, 418 insertions(+), 352 deletions(-) create mode 100644 dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/CachingAction.kt delete mode 100644 dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/ActiveDataTree.kt delete mode 100644 dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/CachingAction.kt create mode 100644 dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSourceBuilder.kt create mode 100644 dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/actionInContext.kt rename dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/{select.kt => dataFilterJvm.kt} (58%) diff --git a/CHANGELOG.md b/CHANGELOG.md index d3c1b1e4..9f73ebf1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - DataSet `getData` is no longer suspended and renamed to `get` - DataSet operates with sequences of data instead of flows - PartialEnvelope uses `Int` instead `UInt`. +- `ActiveDataSet` renamed to `DataSource` ### Deprecated diff --git a/build.gradle.kts b/build.gradle.kts index e577b9d2..a7140851 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -4,10 +4,7 @@ plugins { allprojects { group = "space.kscience" - version = "0.6.0-dev-4" - repositories{ - mavenCentral() - } + version = "0.6.0-dev-5" } subprojects { diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/Action.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/Action.kt index 50dc56c8..4fed8e51 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/Action.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/Action.kt @@ -1,6 +1,5 @@ package space.kscience.dataforge.actions -import kotlinx.coroutines.CoroutineScope import space.kscience.dataforge.data.DataSet import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.misc.DFExperimental @@ -9,13 +8,12 @@ import space.kscience.dataforge.misc.DFExperimental * A simple data transformation on a data node. Actions should avoid doing actual dependency evaluation in [execute]. */ public interface Action { + /** - * Transform the data in the node, producing a new node. By default it is assumed that all calculations are lazy + * Transform the data in the node, producing a new node. By default, it is assumed that all calculations are lazy * so not actual computation is started at this moment. - * - * [scope] context used to compute the initial result, also it is used for updates propagation */ - public suspend fun execute(dataSet: DataSet, meta: Meta = Meta.EMPTY, scope: CoroutineScope? = null): DataSet + public fun execute(dataSet: DataSet, meta: Meta = Meta.EMPTY): DataSet public companion object } @@ -26,16 +24,17 @@ public interface Action { public infix fun Action.then(action: Action): Action { // TODO introduce composite action and add optimize by adding action to the list return object : Action { - override suspend fun execute(dataSet: DataSet, meta: Meta, scope: CoroutineScope?): DataSet { - return action.execute(this@then.execute(dataSet, meta, scope), meta, scope) - } + + override fun execute( + dataSet: DataSet, + meta: Meta, + ): DataSet = action.execute(this@then.execute(dataSet, meta), meta) } } @DFExperimental -public suspend fun DataSet.transformWith( - action: Action, +public operator fun Action.invoke( + dataSet: DataSet, meta: Meta = Meta.EMPTY, - scope: CoroutineScope? = null, -): DataSet = action.execute(this, meta, scope) +): DataSet = execute(dataSet, meta) diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/CachingAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/CachingAction.kt new file mode 100644 index 00000000..469b6e60 --- /dev/null +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/CachingAction.kt @@ -0,0 +1,53 @@ +package space.kscience.dataforge.actions + +import kotlinx.coroutines.launch +import space.kscience.dataforge.data.* +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.startsWith +import kotlin.reflect.KType + +/** + * Remove all values with keys starting with [name] + */ +internal fun MutableMap.removeWhatStartsWith(name: Name) { + val toRemove = keys.filter { it.startsWith(name) } + toRemove.forEach(::remove) +} + +/** + * An action that caches results on-demand and recalculates them on source push + */ +public abstract class CachingAction( + public val outputType: KType, +) : Action { + + protected abstract fun transform( + set: DataSet, + meta: Meta, + key: Name = Name.EMPTY, + ): Sequence> + + override fun execute( + dataSet: DataSet, + meta: Meta, + ): DataSet = if (dataSet is DataSource) { + DataSourceBuilder(outputType, dataSet.coroutineContext).apply { + populateFrom(transform(dataSet, meta)) + + launch { + dataSet.updates.collect { + //clear old nodes + remove(it) + //collect new items + populateFrom(transform(dataSet, meta, it)) + //FIXME if the target is data, updates are fired twice + } + } + } + } else { + DataTree(outputType) { + populateFrom(transform(dataSet, meta)) + } + } +} diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt index 891e99f0..f2165e7d 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt @@ -1,6 +1,5 @@ package space.kscience.dataforge.actions -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import space.kscience.dataforge.data.* import space.kscience.dataforge.meta.Meta @@ -59,11 +58,11 @@ internal class MapAction( private val block: MapActionBuilder.() -> Unit, ) : Action { - override suspend fun execute( + override fun execute( dataSet: DataSet, meta: Meta, - scope: CoroutineScope?, ): DataSet { + fun mapOne(data: NamedData): NamedData { // Creating a new environment for action using **old** name, old meta and task meta val env = ActionEnv(data.name, data.meta, meta) @@ -92,16 +91,22 @@ internal class MapAction( val sequence = dataSet.dataSequence().map(::mapOne) - return ActiveDataTree(outputType) { - populateWith(sequence) - scope?.launch { - dataSet.updates.collect { name -> - //clear old nodes - remove(name) - //collect new items - populateWith(dataSet.children(name).map(::mapOne)) + return if (dataSet is DataSource ) { + ActiveDataTree(outputType, dataSet) { + populateFrom(sequence) + launch { + dataSet.updates.collect { name -> + //clear old nodes + remove(name) + //collect new items + populateFrom(dataSet.children(name).map(::mapOne)) + } } } + } else { + DataTree(outputType) { + populateFrom(sequence) + } } } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt index 64605ad1..3e1ec62f 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt @@ -1,8 +1,5 @@ package space.kscience.dataforge.actions -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.flow import space.kscience.dataforge.data.* import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.MutableMeta @@ -38,18 +35,17 @@ public class JoinGroup( @DFBuilder public class ReduceGroupBuilder( - private val scope: CoroutineScope, public val actionMeta: Meta, - private val outputType: KType + private val outputType: KType, ) { - private val groupRules: MutableList) -> List>> = ArrayList(); + private val groupRules: MutableList<(DataSet) -> List>> = ArrayList(); /** * introduce grouping by meta value */ public fun byValue(tag: String, defaultTag: String = "@default", action: JoinGroup.() -> Unit) { groupRules += { node -> - GroupRule.byMetaValue(scope, tag, defaultTag).gather(node).map { + GroupRule.byMetaValue(tag, defaultTag).gather(node).map { JoinGroup(it.key, it.value, outputType).apply(action) } } @@ -57,12 +53,12 @@ public class ReduceGroupBuilder( public fun group( groupName: String, - filter: (Name, Data) -> Boolean, + predicate: (Name, Meta) -> Boolean, action: JoinGroup.() -> Unit, ) { groupRules += { source -> listOf( - JoinGroup(groupName, source.filter(filter), outputType).apply(action) + JoinGroup(groupName, source.filter(predicate), outputType).apply(action) ) } } @@ -76,7 +72,7 @@ public class ReduceGroupBuilder( } } - internal suspend fun buildGroups(input: DataSet): List> = + internal fun buildGroups(input: DataSet): List> = groupRules.flatMap { it.invoke(input) } } @@ -89,8 +85,8 @@ internal class ReduceAction( //TODO optimize reduction. Currently the whole action recalculates on push - override fun CoroutineScope.transform(set: DataSet, meta: Meta, key: Name): Flow> = flow { - ReduceGroupBuilder(this@transform, meta, outputType).apply(action).buildGroups(set).forEach { group -> + override fun transform(set: DataSet, meta: Meta, key: Name): Sequence> = sequence { + ReduceGroupBuilder(meta, outputType).apply(action).buildGroups(set).forEach { group -> val dataFlow: Map> = group.set.dataSequence().fold(HashMap()) { acc, value -> acc.apply { acc[value.name] = value.data @@ -107,7 +103,7 @@ internal class ReduceAction( meta = groupMeta ) { group.result.invoke(env, it) } - emit(res.named(env.name)) + yield(res.named(env.name)) } } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt index 0f7446a5..471a8057 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt @@ -1,6 +1,5 @@ package space.kscience.dataforge.actions -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import space.kscience.dataforge.data.* import space.kscience.dataforge.meta.Laminate @@ -51,10 +50,9 @@ internal class SplitAction( private val action: SplitBuilder.() -> Unit, ) : Action { - override suspend fun execute( + override fun execute( dataSet: DataSet, meta: Meta, - scope: CoroutineScope?, ): DataSet { fun splitOne(data: NamedData): Sequence> { @@ -77,16 +75,22 @@ internal class SplitAction( } } - return ActiveDataTree(outputType) { - populateWith(dataSet.dataSequence().flatMap(transform = ::splitOne)) - scope?.launch { - dataSet.updates.collect { name -> - //clear old nodes - remove(name) - //collect new items - populateWith(dataSet.children(name).flatMap(transform = ::splitOne)) + return if (dataSet is DataSource) { + ActiveDataTree(outputType, dataSet) { + populateFrom(dataSet.dataSequence().flatMap(transform = ::splitOne)) + launch { + dataSet.updates.collect { name -> + //clear old nodes + remove(name) + //collect new items + populateFrom(dataSet.children(name).flatMap(transform = ::splitOne)) + } } } + } else { + DataTree(outputType) { + populateFrom(dataSet.dataSequence().flatMap(transform = ::splitOne)) + } } } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/ActiveDataTree.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/ActiveDataTree.kt deleted file mode 100644 index e403b625..00000000 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/ActiveDataTree.kt +++ /dev/null @@ -1,103 +0,0 @@ -package space.kscience.dataforge.data - -import kotlinx.coroutines.flow.* -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import space.kscience.dataforge.meta.* -import space.kscience.dataforge.names.* -import kotlin.reflect.KType -import kotlin.reflect.typeOf - -/** - * A mutable [DataTree]. - */ -public class ActiveDataTree( - override val dataType: KType, -) : DataTree, DataSetBuilder, ActiveDataSet { - private val mutex = Mutex() - private val treeItems = HashMap>() - - override val items: Map> - get() = treeItems.filter { !it.key.body.startsWith("@") } - - private val _updates = MutableSharedFlow() - - override val updates: Flow - get() = _updates - - private suspend fun remove(token: NameToken) = mutex.withLock { - if (treeItems.remove(token) != null) { - _updates.emit(token.asName()) - } - } - - override suspend fun remove(name: Name) { - if (name.isEmpty()) error("Can't remove the root node") - (getItem(name.cutLast()).tree as? ActiveDataTree)?.remove(name.lastOrNull()!!) - } - - private suspend fun set(token: NameToken, data: Data) = mutex.withLock { - treeItems[token] = DataTreeItem.Leaf(data) - } - - private suspend fun getOrCreateNode(token: NameToken): ActiveDataTree = - (treeItems[token] as? DataTreeItem.Node)?.tree as? ActiveDataTree - ?: ActiveDataTree(dataType).also { - mutex.withLock { - treeItems[token] = DataTreeItem.Node(it) - } - } - - private suspend fun getOrCreateNode(name: Name): ActiveDataTree = when (name.length) { - 0 -> this - 1 -> getOrCreateNode(name.firstOrNull()!!) - else -> getOrCreateNode(name.firstOrNull()!!).getOrCreateNode(name.cutFirst()) - } - - override suspend fun data(name: Name, data: Data?) { - if (data == null) { - remove(name) - } else { - when (name.length) { - 0 -> error("Can't add data with empty name") - 1 -> set(name.firstOrNull()!!, data) - 2 -> getOrCreateNode(name.cutLast()).set(name.lastOrNull()!!, data) - } - } - _updates.emit(name) - } - - override suspend fun meta(name: Name, meta: Meta) { - val item = getItem(name) - if(item is DataTreeItem.Leaf) error("TODO: Can't change meta of existing leaf item.") - data(name + DataTree.META_ITEM_NAME_TOKEN, Data.empty(meta)) - } -} - -/** - * Create a dynamic tree. Initial data is placed synchronously. Updates are propagated via [updatesScope] - */ -@Suppress("FunctionName") -public suspend fun ActiveDataTree( - type: KType, - block: suspend ActiveDataTree.() -> Unit, -): ActiveDataTree { - val tree = ActiveDataTree(type) - tree.block() - return tree -} - -@Suppress("FunctionName") -public suspend inline fun ActiveDataTree( - crossinline block: suspend ActiveDataTree.() -> Unit, -): ActiveDataTree = ActiveDataTree(typeOf()).apply { block() } - -public suspend inline fun ActiveDataTree.emit( - name: Name, - noinline block: suspend ActiveDataTree.() -> Unit, -): Unit = node(name, ActiveDataTree(typeOf(), block)) - -public suspend inline fun ActiveDataTree.emit( - name: String, - noinline block: suspend ActiveDataTree.() -> Unit, -): Unit = node(Name.parse(name), ActiveDataTree(typeOf(), block)) diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/CachingAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/CachingAction.kt deleted file mode 100644 index 5e7b62bf..00000000 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/CachingAction.kt +++ /dev/null @@ -1,51 +0,0 @@ -package space.kscience.dataforge.data - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.flow.Flow -import space.kscience.dataforge.actions.Action -import space.kscience.dataforge.meta.Meta -import space.kscience.dataforge.names.Name -import space.kscience.dataforge.names.startsWith -import kotlin.reflect.KType - -/** - * Remove all values with keys starting with [name] - */ -internal fun MutableMap.removeWhatStartsWith(name: Name) { - val toRemove = keys.filter { it.startsWith(name) } - toRemove.forEach(::remove) -} - -/** - * An action that caches results on-demand and recalculates them on source push - */ -public abstract class CachingAction( - public val outputType: KType, -) : Action { - - protected abstract fun CoroutineScope.transform( - set: DataSet, - meta: Meta, - key: Name = Name.EMPTY, - ): Flow> - - override suspend fun execute( - dataSet: DataSet, - meta: Meta, - scope: CoroutineScope?, - ): DataSet = ActiveDataTree(outputType) { - coroutineScope { - populateWith(transform(dataSet, meta)) - } - scope?.let { - dataSet.updates.collect { - //clear old nodes - remove(it) - //collect new items - populateWith(scope.transform(dataSet, meta, it)) - //FIXME if the target is data, updates are fired twice - } - } - } -} diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt index 45e60bb4..f468632f 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt @@ -62,7 +62,11 @@ DataSet { public operator fun DataSet.get(name:String): Data? = get(name.parseAsName()) -public interface ActiveDataSet : DataSet { +/** + * A [DataSet] with propagated updates. + */ +public interface DataSource : DataSet, CoroutineScope { + /** * A flow of updated item names. Updates are propagated in a form of [Flow] of names of updated nodes. * Those can include new data items and replacement of existing ones. The replaced items could update existing data content @@ -70,9 +74,16 @@ public interface ActiveDataSet : DataSet { * */ public val updates: Flow + + /** + * Stop generating updates from this [DataSource] + */ + public fun close(){ + coroutineContext[Job]?.cancel() + } } -public val DataSet.updates: Flow get() = if (this is ActiveDataSet) updates else emptyFlow() +public val DataSet.updates: Flow get() = if (this is DataSource) updates else emptyFlow() /** * Flow all data nodes with names starting with [branchName] diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSetBuilder.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSetBuilder.kt index d279e34c..cfcc1c97 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSetBuilder.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSetBuilder.kt @@ -1,7 +1,5 @@ package space.kscience.dataforge.data -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.flow.Flow import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.MutableMeta import space.kscience.dataforge.misc.DFExperimental @@ -16,14 +14,14 @@ public interface DataSetBuilder { /** * Remove all data items starting with [name] */ - public suspend fun remove(name: Name) + public fun remove(name: Name) - public suspend fun data(name: Name, data: Data?) + public fun data(name: Name, data: Data?) /** * Set a current state of given [dataSet] into a branch [name]. Does not propagate updates */ - public suspend fun node(name: Name, dataSet: DataSet) { + public fun node(name: Name, dataSet: DataSet) { //remove previous items if (name != Name.EMPTY) { remove(name) @@ -38,19 +36,19 @@ public interface DataSetBuilder { /** * Set meta for the given node */ - public suspend fun meta(name: Name, meta: Meta) + public fun meta(name: Name, meta: Meta) } /** * Define meta in this [DataSet] */ -public suspend fun DataSetBuilder.meta(value: Meta): Unit = meta(Name.EMPTY, value) +public fun DataSetBuilder.meta(value: Meta): Unit = meta(Name.EMPTY, value) /** * Define meta in this [DataSet] */ -public suspend fun DataSetBuilder.meta(mutableMeta: MutableMeta.() -> Unit): Unit = meta(Meta(mutableMeta)) +public fun DataSetBuilder.meta(mutableMeta: MutableMeta.() -> Unit): Unit = meta(Meta(mutableMeta)) @PublishedApi internal class SubSetBuilder( @@ -59,52 +57,52 @@ internal class SubSetBuilder( ) : DataSetBuilder { override val dataType: KType get() = parent.dataType - override suspend fun remove(name: Name) { + override fun remove(name: Name) { parent.remove(branch + name) } - override suspend fun data(name: Name, data: Data?) { + override fun data(name: Name, data: Data?) { parent.data(branch + name, data) } - override suspend fun node(name: Name, dataSet: DataSet) { + override fun node(name: Name, dataSet: DataSet) { parent.node(branch + name, dataSet) } - override suspend fun meta(name: Name, meta: Meta) { + override fun meta(name: Name, meta: Meta) { parent.meta(branch + name, meta) } } -public suspend inline fun DataSetBuilder.node( +public inline fun DataSetBuilder.node( name: Name, - crossinline block: suspend DataSetBuilder.() -> Unit, + crossinline block: DataSetBuilder.() -> Unit, ) { if (name.isEmpty()) block() else SubSetBuilder(this, name).block() } -public suspend fun DataSetBuilder.data(name: String, value: Data) { +public fun DataSetBuilder.data(name: String, value: Data) { data(Name.parse(name), value) } -public suspend fun DataSetBuilder.node(name: String, set: DataSet) { +public fun DataSetBuilder.node(name: String, set: DataSet) { node(Name.parse(name), set) } -public suspend inline fun DataSetBuilder.node( +public inline fun DataSetBuilder.node( name: String, - crossinline block: suspend DataSetBuilder.() -> Unit, + crossinline block: DataSetBuilder.() -> Unit, ): Unit = node(Name.parse(name), block) -public suspend fun DataSetBuilder.set(value: NamedData) { +public fun DataSetBuilder.set(value: NamedData) { data(value.name, value.data) } /** * Produce lazy [Data] and emit it into the [DataSetBuilder] */ -public suspend inline fun DataSetBuilder.produce( +public inline fun DataSetBuilder.produce( name: String, meta: Meta = Meta.EMPTY, noinline producer: suspend () -> T, @@ -113,7 +111,7 @@ public suspend inline fun DataSetBuilder.produce( data(name, data) } -public suspend inline fun DataSetBuilder.produce( +public inline fun DataSetBuilder.produce( name: Name, meta: Meta = Meta.EMPTY, noinline producer: suspend () -> T, @@ -125,19 +123,19 @@ public suspend inline fun DataSetBuilder.produce( /** * Emit a static data with the fixed value */ -public suspend inline fun DataSetBuilder.static( +public inline fun DataSetBuilder.static( name: String, data: T, meta: Meta = Meta.EMPTY, ): Unit = data(name, Data.static(data, meta)) -public suspend inline fun DataSetBuilder.static( +public inline fun DataSetBuilder.static( name: Name, data: T, meta: Meta = Meta.EMPTY, ): Unit = data(name, Data.static(data, meta)) -public suspend inline fun DataSetBuilder.static( +public inline fun DataSetBuilder.static( name: String, data: T, mutableMeta: MutableMeta.() -> Unit, @@ -147,20 +145,20 @@ public suspend inline fun DataSetBuilder.static( * Update data with given node data and meta with node meta. */ @DFExperimental -public suspend fun DataSetBuilder.populateFrom(tree: DataSet): Unit = coroutineScope { +public fun DataSetBuilder.populateFrom(tree: DataSet): Unit { tree.dataSequence().forEach { //TODO check if the place is occupied data(it.name, it.data) } } -public suspend fun DataSetBuilder.populateWith(flow: Flow>) { - flow.collect { - data(it.name, it.data) - } -} +//public fun DataSetBuilder.populateFrom(flow: Flow>) { +// flow.collect { +// data(it.name, it.data) +// } +//} -public suspend fun DataSetBuilder.populateWith(sequence: Sequence>) { +public fun DataSetBuilder.populateFrom(sequence: Sequence>) { sequence.forEach { data(it.name, it.data) } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSourceBuilder.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSourceBuilder.kt new file mode 100644 index 00000000..23b44aa0 --- /dev/null +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSourceBuilder.kt @@ -0,0 +1,122 @@ +package space.kscience.dataforge.data + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.launch +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.names.* +import kotlin.collections.set +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.coroutineContext +import kotlin.jvm.Synchronized +import kotlin.reflect.KType +import kotlin.reflect.typeOf + +/** + * A mutable [DataTree] that propagates updates + */ +public class DataSourceBuilder( + override val dataType: KType, + coroutineContext: CoroutineContext, +) : DataTree, DataSetBuilder, DataSource { + override val coroutineContext: CoroutineContext = + coroutineContext + Job(coroutineContext[Job]) + GoalExecutionRestriction() + + private val treeItems = HashMap>() + + override val items: Map> + get() = treeItems.filter { !it.key.body.startsWith("@") } + + private val _updates = MutableSharedFlow() + + override val updates: SharedFlow + get() = _updates + + @Synchronized + private fun remove(token: NameToken) { + if (treeItems.remove(token) != null) { + launch { + _updates.emit(token.asName()) + } + } + } + + override fun remove(name: Name) { + if (name.isEmpty()) error("Can't remove the root node") + (getItem(name.cutLast()).tree as? DataSourceBuilder)?.remove(name.lastOrNull()!!) + } + + @Synchronized + private fun set(token: NameToken, data: Data) { + treeItems[token] = DataTreeItem.Leaf(data) + } + + @Synchronized + private fun set(token: NameToken, node: DataTree) { + treeItems[token] = DataTreeItem.Node(node) + } + + private fun getOrCreateNode(token: NameToken): DataSourceBuilder = + (treeItems[token] as? DataTreeItem.Node)?.tree as? DataSourceBuilder + ?: DataSourceBuilder(dataType, coroutineContext).also { set(token, it) } + + private fun getOrCreateNode(name: Name): DataSourceBuilder = when (name.length) { + 0 -> this + 1 -> getOrCreateNode(name.firstOrNull()!!) + else -> getOrCreateNode(name.firstOrNull()!!).getOrCreateNode(name.cutFirst()) + } + + override fun data(name: Name, data: Data?) { + if (data == null) { + remove(name) + } else { + when (name.length) { + 0 -> error("Can't add data with empty name") + 1 -> set(name.firstOrNull()!!, data) + 2 -> getOrCreateNode(name.cutLast()).set(name.lastOrNull()!!, data) + } + } + launch { + _updates.emit(name) + } + } + + override fun meta(name: Name, meta: Meta) { + val item = getItem(name) + if (item is DataTreeItem.Leaf) error("TODO: Can't change meta of existing leaf item.") + data(name + DataTree.META_ITEM_NAME_TOKEN, Data.empty(meta)) + } +} + +/** + * Create a dynamic tree. Initial data is placed synchronously. + */ +@Suppress("FunctionName") +public fun ActiveDataTree( + type: KType, + parent: CoroutineScope, + block: DataSourceBuilder.() -> Unit, +): DataSourceBuilder { + val tree = DataSourceBuilder(type, parent.coroutineContext) + tree.block() + return tree +} + +@Suppress("FunctionName") +public suspend inline fun ActiveDataTree( + crossinline block: DataSourceBuilder.() -> Unit = {}, +): DataSourceBuilder = DataSourceBuilder(typeOf(), coroutineContext).apply { block() } + +public inline fun DataSourceBuilder.emit( + name: Name, + parent: CoroutineScope, + noinline block: DataSourceBuilder.() -> Unit, +): Unit = node(name, ActiveDataTree(typeOf(), parent, block)) + +public inline fun DataSourceBuilder.emit( + name: String, + parent: CoroutineScope, + noinline block: DataSourceBuilder.() -> Unit, +): Unit = node(Name.parse(name), ActiveDataTree(typeOf(), parent, block)) diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt index 5ef8a6d5..c00fac8d 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt @@ -15,13 +15,12 @@ */ package space.kscience.dataforge.data -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.string public interface GroupRule { - public suspend fun gather(set: DataSet): Map> + public fun gather(set: DataSet): Map> public companion object { /** @@ -33,31 +32,43 @@ public interface GroupRule { * @return */ public fun byMetaValue( - scope: CoroutineScope, key: String, defaultTagValue: String, ): GroupRule = object : GroupRule { - override suspend fun gather( + override fun gather( set: DataSet, ): Map> { - val map = HashMap>() + val map = HashMap>() - set.dataSequence().forEach { data -> - val tagValue = data.meta[key]?.string ?: defaultTagValue - map.getOrPut(tagValue) { ActiveDataTree(set.dataType) }.data(data.name, data.data) - } + if (set is DataSource) { + set.dataSequence().forEach { data -> + val tagValue: String = data.meta[key]?.string ?: defaultTagValue + (map.getOrPut(tagValue) { DataSourceBuilder(set.dataType, set.coroutineContext) } as DataSourceBuilder) + .data(data.name, data.data) - scope.launch { - set.updates.collect { name -> - val data = set.get(name) + set.launch { + set.updates.collect { name -> + val dataUpdate = set[name] - @Suppress("NULLABLE_EXTENSION_OPERATOR_WITH_SAFE_CALL_RECEIVER") - val tagValue = data?.meta?.get(key)?.string ?: defaultTagValue - map.getOrPut(tagValue) { ActiveDataTree(set.dataType) }.data(name, data) + val updateTagValue = dataUpdate?.meta?.get(key)?.string ?: defaultTagValue + map.getOrPut(updateTagValue) { + ActiveDataTree(set.dataType, this) { + data(name, dataUpdate) + } + } + } + } + } + } else { + set.dataSequence().forEach { data -> + val tagValue: String = data.meta[key]?.string ?: defaultTagValue + (map.getOrPut(tagValue) { StaticDataTree(set.dataType) } as StaticDataTree) + .data(data.name, data.data) } } + return map } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt index 06b1ff6f..2353f97d 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataTree.kt @@ -1,6 +1,5 @@ package space.kscience.dataforge.data -import kotlinx.coroutines.coroutineScope import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.names.* @@ -17,7 +16,7 @@ internal class StaticDataTree( override val items: Map> get() = _items.filter { !it.key.body.startsWith("@") } - override suspend fun remove(name: Name) { + override fun remove(name: Name) { when (name.length) { 0 -> error("Can't remove root tree node") 1 -> _items.remove(name.firstOrNull()!!) @@ -36,7 +35,7 @@ internal class StaticDataTree( else -> getOrCreateNode(name.cutLast()).getOrCreateNode(name.lastOrNull()!!.asName()) } - private suspend fun set(name: Name, item: DataTreeItem?) { + private fun set(name: Name, item: DataTreeItem?) { if (name.isEmpty()) error("Can't set top level tree node") if (item == null) { remove(name) @@ -45,41 +44,39 @@ internal class StaticDataTree( } } - override suspend fun data(name: Name, data: Data?) { + override fun data(name: Name, data: Data?) { set(name, data?.let { DataTreeItem.Leaf(it) }) } - override suspend fun node(name: Name, dataSet: DataSet) { + override fun node(name: Name, dataSet: DataSet) { if (dataSet is StaticDataTree) { set(name, DataTreeItem.Node(dataSet)) } else { - coroutineScope { - dataSet.dataSequence().forEach { - data(name + it.name, it.data) - } + dataSet.dataSequence().forEach { + data(name + it.name, it.data) } } } - override suspend fun meta(name: Name, meta: Meta) { + override fun meta(name: Name, meta: Meta) { val item = getItem(name) - if(item is DataTreeItem.Leaf) TODO("Can't change meta of existing leaf item.") + if (item is DataTreeItem.Leaf) TODO("Can't change meta of existing leaf item.") data(name + DataTree.META_ITEM_NAME_TOKEN, Data.empty(meta)) } } @Suppress("FunctionName") -public suspend fun DataTree( +public inline fun DataTree( dataType: KType, - block: suspend DataSetBuilder.() -> Unit, + block: DataSetBuilder.() -> Unit, ): DataTree = StaticDataTree(dataType).apply { block() } @Suppress("FunctionName") -public suspend inline fun DataTree( - noinline block: suspend DataSetBuilder.() -> Unit, +public inline fun DataTree( + noinline block: DataSetBuilder.() -> Unit, ): DataTree = DataTree(typeOf(), block) @OptIn(DFExperimental::class) -public suspend fun DataSet.seal(): DataTree = DataTree(dataType) { +public fun DataSet.seal(): DataTree = DataTree(dataType) { populateFrom(this@seal) } \ No newline at end of file diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataFilter.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataFilter.kt index 5ee7027d..f5037918 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataFilter.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataFilter.kt @@ -10,6 +10,8 @@ import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.isEmpty import space.kscience.dataforge.names.plus import space.kscience.dataforge.names.removeHeadOrNull +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext import kotlin.reflect.KType @@ -17,34 +19,42 @@ import kotlin.reflect.KType * A stateless filtered [DataSet] */ public fun DataSet.filter( - predicate: (Name, Data) -> Boolean, -): ActiveDataSet = object : ActiveDataSet { + predicate: (Name, Meta) -> Boolean, +): DataSource = object : DataSource { override val dataType: KType get() = this@filter.dataType + override val coroutineContext: CoroutineContext + get() = (this@filter as? DataSource)?.coroutineContext ?: EmptyCoroutineContext + + override val meta: Meta get() = this@filter.meta override fun dataSequence(): Sequence> = - this@filter.dataSequence().filter { predicate(it.name, it.data) } + this@filter.dataSequence().filter { predicate(it.name, it.meta) } override fun get(name: Name): Data? = this@filter.get(name)?.takeIf { - predicate(name, it) + predicate(name, it.meta) } override val updates: Flow = this@filter.updates.filter flowFilter@{ name -> - val theData = this@filter.get(name) ?: return@flowFilter false - predicate(name, theData) + val theData = this@filter[name] ?: return@flowFilter false + predicate(name, theData.meta) } } /** * Generate a wrapper data set with a given name prefix appended to all names */ -public fun DataSet.withNamePrefix(prefix: Name): DataSet = if (prefix.isEmpty()) this -else object : ActiveDataSet { +public fun DataSet.withNamePrefix(prefix: Name): DataSet = if (prefix.isEmpty()) { + this +} else object : DataSource { override val dataType: KType get() = this@withNamePrefix.dataType + override val coroutineContext: CoroutineContext + get() = (this@withNamePrefix as? DataSource)?.coroutineContext ?: EmptyCoroutineContext + override val meta: Meta get() = this@withNamePrefix.meta @@ -62,9 +72,12 @@ else object : ActiveDataSet { */ public fun DataSet.branch(branchName: Name): DataSet = if (branchName.isEmpty()) { this -} else object : ActiveDataSet { +} else object : DataSource { override val dataType: KType get() = this@branch.dataType + override val coroutineContext: CoroutineContext + get() = (this@branch as? DataSource)?.coroutineContext ?: EmptyCoroutineContext + override val meta: Meta get() = this@branch.meta override fun dataSequence(): Sequence> = this@branch.dataSequence().mapNotNull { diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt index c8180eb5..b4e58c2d 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt @@ -144,7 +144,7 @@ public suspend fun DataSet.map( metaTransform: MutableMeta.() -> Unit = {}, block: suspend (T) -> R, ): DataTree = DataTree(outputType) { - populateWith( + populateFrom( dataSequence().map { val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal() Data(outputType, newMeta, coroutineContext, listOf(it)) { diff --git a/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/actionInContext.kt b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/actionInContext.kt new file mode 100644 index 00000000..33731a95 --- /dev/null +++ b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/actionInContext.kt @@ -0,0 +1,2 @@ +package space.kscience.dataforge.data + diff --git a/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/select.kt b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataFilterJvm.kt similarity index 58% rename from dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/select.kt rename to dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataFilterJvm.kt index f32ff3f3..5efe6d2a 100644 --- a/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/select.kt +++ b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataFilterJvm.kt @@ -6,6 +6,8 @@ import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.matches +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext import kotlin.reflect.KType import kotlin.reflect.full.isSubtypeOf import kotlin.reflect.typeOf @@ -28,46 +30,47 @@ private fun Data<*>.castOrNull(type: KType): Data? = * Select all data matching given type and filters. Does not modify paths * * @param namePattern a name match patter according to [Name.matches] - * @param filter addition filtering condition based on item name and meta. By default, accepts all + * @param predicate addition filtering condition based on item name and meta. By default, accepts all */ @OptIn(DFExperimental::class) -public fun DataSet<*>.select( +public fun DataSet<*>.filterIsInstance( type: KType, - namePattern: Name? = null, - filter: (name: Name, meta: Meta) -> Boolean = { _, _ -> true }, -): ActiveDataSet = object : ActiveDataSet { + predicate: (name: Name, meta: Meta) -> Boolean = { _, _ -> true }, +): DataSource = object : DataSource { override val dataType = type - override val meta: Meta get() = this@select.meta + override val coroutineContext: CoroutineContext + get() = (this@filterIsInstance as? DataSource)?.coroutineContext ?: EmptyCoroutineContext + + override val meta: Meta get() = this@filterIsInstance.meta private fun checkDatum(name: Name, datum: Data<*>): Boolean = datum.type.isSubtypeOf(type) - && (namePattern == null || name.matches(namePattern)) - && filter(name, datum.meta) + && predicate(name, datum.meta) - override fun dataSequence(): Sequence> = this@select.dataSequence().filter { + override fun dataSequence(): Sequence> = this@filterIsInstance.dataSequence().filter { checkDatum(it.name, it.data) }.map { @Suppress("UNCHECKED_CAST") it as NamedData } - override fun get(name: Name): Data? = this@select[name]?.let { datum -> + override fun get(name: Name): Data? = this@filterIsInstance[name]?.let { datum -> if (checkDatum(name, datum)) datum.castOrNull(type) else null } - override val updates: Flow = this@select.updates.filter { - val datum = this@select[it] ?: return@filter false - checkDatum(it, datum) + override val updates: Flow = this@filterIsInstance.updates.filter { name -> + get(name)?.let { datum -> + checkDatum(name, datum) + } ?: false } } /** * Select a single datum of the appropriate type */ -public inline fun DataSet<*>.select( - namePattern: Name? = null, - noinline filter: (name: Name, meta: Meta) -> Boolean = { _, _ -> true }, -): DataSet = select(typeOf(), namePattern, filter) +public inline fun DataSet<*>.filterIsInstance( + noinline predicate: (name: Name, meta: Meta) -> Boolean = { _, _ -> true }, +): DataSet = filterIsInstance(typeOf(), predicate) /** * Select a single datum if it is present and of given [type] diff --git a/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataSetBuilderInContext.kt b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataSetBuilderInContext.kt index 51bfa187..3fc68141 100644 --- a/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataSetBuilderInContext.kt +++ b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataSetBuilderInContext.kt @@ -10,24 +10,24 @@ import space.kscience.dataforge.names.plus /** * Append data to node */ -context(DataSetBuilder) public suspend infix fun String.put(data: Data): Unit = +context(DataSetBuilder) public infix fun String.put(data: Data): Unit = data(Name.parse(this), data) /** * Append node */ -context(DataSetBuilder) public suspend infix fun String.put(dataSet: DataSet): Unit = +context(DataSetBuilder) public infix fun String.put(dataSet: DataSet): Unit = node(Name.parse(this), dataSet) /** * Build and append node */ -context(DataSetBuilder) public suspend infix fun String.put( - block: suspend DataSetBuilder.() -> Unit, +context(DataSetBuilder) public infix fun String.put( + block: DataSetBuilder.() -> Unit, ): Unit = node(Name.parse(this), block) /** - * Copy given data set and mirror its changes to this [ActiveDataTree] in [this@setAndObserve]. Returns an update [Job] + * Copy given data set and mirror its changes to this [DataSourceBuilder] in [this@setAndObserve]. Returns an update [Job] */ context(DataSetBuilder) public fun CoroutineScope.setAndWatch( name: Name, diff --git a/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt b/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt index c80f00c4..9fd05357 100644 --- a/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt +++ b/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt @@ -1,44 +1,49 @@ package space.kscience.dataforge.data -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.delay +import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Test import space.kscience.dataforge.actions.Action +import space.kscience.dataforge.actions.invoke import space.kscience.dataforge.actions.map import space.kscience.dataforge.misc.DFExperimental import kotlin.test.assertEquals @OptIn(DFExperimental::class) internal class ActionsTest { - private val data: DataTree = runBlocking { - DataTree { + @Test + fun testStaticMapAction() = runTest { + val data: DataTree = DataTree { repeat(10) { static(it.toString(), it) } } - } - @Test - fun testStaticMapAction() { val plusOne = Action.map { result { it + 1 } } - runBlocking { - val result = plusOne.execute(data) - assertEquals(2, result["1"]?.await()) - } + val result = plusOne(data) + assertEquals(2, result["1"]?.await()) } @Test - fun testDynamicMapAction() { + fun testDynamicMapAction() = runTest { + val data: DataSourceBuilder = ActiveDataTree() + val plusOne = Action.map { result { it + 1 } } - val datum = runBlocking { - val result = plusOne.execute(data, scope = this) - result["1"]?.await() + val result = plusOne(data) + + repeat(10) { + data.static(it.toString(), it) } - assertEquals(2, datum) + + delay(20) + + assertEquals(2, result["1"]?.await()) + data.close() } } \ No newline at end of file diff --git a/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt b/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt index 65b43a6f..9509092e 100644 --- a/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt +++ b/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt @@ -46,8 +46,8 @@ internal class DataTreeBuilderTest { } runBlocking { - assertEquals("a", node.get("update.a")?.await()) - assertEquals("a", node.get("primary.a")?.await()) + assertEquals("a", node["update.a"]?.await()) + assertEquals("a", node["primary.a"]?.await()) } } diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt index 6a3608bb..54d027ac 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt @@ -1,12 +1,10 @@ package space.kscience.dataforge.workspace +import kotlinx.coroutines.CoroutineScope import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.ContextBuilder import space.kscience.dataforge.context.Global -import space.kscience.dataforge.data.ActiveDataTree -import space.kscience.dataforge.data.DataSet -import space.kscience.dataforge.data.DataSetBuilder -import space.kscience.dataforge.data.DataTree +import space.kscience.dataforge.data.* import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.MetaRepr import space.kscience.dataforge.meta.MutableMeta @@ -17,8 +15,11 @@ import space.kscience.dataforge.misc.DFBuilder import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.asName +import kotlin.collections.HashMap +import kotlin.collections.set import kotlin.properties.PropertyDelegateProvider import kotlin.properties.ReadOnlyProperty +import kotlin.reflect.typeOf public data class TaskReference(public val taskName: Name, public val task: Task) : DataSelector { @@ -100,13 +101,13 @@ public class WorkspaceBuilder(private val parentContext: Context = Global) : Tas /** * Define intrinsic data for the workspace */ - public suspend fun buildData(builder: suspend DataSetBuilder.() -> Unit) { + public fun data(builder: DataSetBuilder.() -> Unit) { data = DataTree(builder) } @DFExperimental - public suspend fun buildActiveData(builder: suspend ActiveDataTree.() -> Unit) { - data = ActiveDataTree(builder) + public fun buildActiveData(scope: CoroutineScope, builder: DataSourceBuilder.() -> Unit) { + data = ActiveDataTree(typeOf(), scope, builder) } /** diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt index ffa397d3..6634a2a0 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt @@ -1,21 +1,24 @@ package space.kscience.dataforge.workspace -import kotlinx.coroutines.runBlocking import space.kscience.dataforge.data.DataSet -import space.kscience.dataforge.data.DataSetBuilder -import space.kscience.dataforge.data.select +import space.kscience.dataforge.data.filterIsInstance import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.matches -public fun WorkspaceBuilder.data(builder: suspend DataSetBuilder.() -> Unit): Unit = runBlocking { - buildData(builder) -} +//public fun WorkspaceBuilder.data(builder: DataSetBuilder.() -> Unit): Unit = runBlocking { +// data(builder) +//} -public inline fun TaskResultBuilder<*>.data(namePattern: Name? = null): DataSelector = object : DataSelector { - override suspend fun select(workspace: Workspace, meta: Meta): DataSet = workspace.data.select(namePattern) -} +public inline fun TaskResultBuilder<*>.data(namePattern: Name? = null): DataSelector = + object : DataSelector { + override suspend fun select(workspace: Workspace, meta: Meta): DataSet = + workspace.data.filterIsInstance { name, _ -> + namePattern == null || name.matches(namePattern) + } + } public suspend inline fun TaskResultBuilder<*>.fromTask( task: Name, taskMeta: Meta = Meta.EMPTY, -): DataSet = workspace.produce(task, taskMeta).select() \ No newline at end of file +): DataSet = workspace.produce(task, taskMeta).filterIsInstance() \ No newline at end of file diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt index 1c1a849e..f4534bc7 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt @@ -14,7 +14,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() { override val tag: PluginTag = Companion.tag val allData by task { - val selectedData = workspace.data.select() + val selectedData = workspace.data.filterIsInstance() val result: Data = selectedData.dataSequence().foldToData(0) { result, data -> result + data.await() } @@ -23,7 +23,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() { val singleData by task { - workspace.data.select()["myData[12]"]?.let { + workspace.data.filterIsInstance()["myData[12]"]?.let { data("result", it) } } diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileDataTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileDataTest.kt index 612dad13..d1bbb606 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileDataTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileDataTest.kt @@ -20,20 +20,19 @@ import kotlin.test.assertEquals class FileDataTest { - val dataNode = runBlocking { - DataTree { - node("dir") { - static("a", "Some string") { - "content" put "Some string" - } - } - static("b", "root data") - meta { - "content" put "This is root meta node" + val dataNode = DataTree { + node("dir") { + static("a", "Some string") { + "content" put "Some string" } } + static("b", "root data") + meta { + "content" put "This is root meta node" + } } + object StringIOFormat : IOFormat { override val type: KType = typeOf() diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt index 2187cbe2..367e8489 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt @@ -117,16 +117,16 @@ class SimpleWorkspaceTest { } val averageByGroup by task { - val evenSum = workspace.data.filter { name, _ -> + val evenSum = workspace.data.filterIsInstance { name, _ -> name.toString().toInt() % 2 == 0 - }.select().foldToData(0) { l, r -> + }.foldToData(0) { l, r -> l + r.await() } data("even", evenSum) - val oddSum = workspace.data.filter { name, _ -> + val oddSum = workspace.data.filterIsInstance { name, _ -> name.toString().toInt() % 2 == 1 - }.select().foldToData(0) { l, r -> + }.foldToData(0) { l, r -> l + r.await() } data("odd", oddSum) @@ -143,7 +143,7 @@ class SimpleWorkspaceTest { } val customPipe by task { - workspace.data.select().forEach { data -> + workspace.data.filterIsInstance().forEach { data -> val meta = data.meta.toMutableMeta().apply { "newValue" put 22 }