From 7e4d1af55fbab1147c1f965c36f06d962c4cc4e4 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sat, 16 Jan 2021 12:44:39 +0300 Subject: [PATCH] [WIP] redo DataSet fill and updates --- .../kotlin/hep/dataforge/data/Action.kt | 8 +- .../hep/dataforge/data/CachingAction.kt | 21 ++-- .../kotlin/hep/dataforge/data/Data.kt | 23 +++- .../kotlin/hep/dataforge/data/DataSet.kt | 4 +- .../hep/dataforge/data/DataSetBuilder.kt | 67 +++++++--- .../kotlin/hep/dataforge/data/DataTree.kt | 8 +- .../kotlin/hep/dataforge/data/GroupRule.kt | 2 +- .../kotlin/hep/dataforge/data/MapAction.kt | 39 +++--- .../hep/dataforge/data/MutableDataTree.kt | 116 +++++++----------- .../kotlin/hep/dataforge/data/SplitAction.kt | 25 ++-- .../hep/dataforge/data/StaticDataTree.kt | 26 ++-- .../kotlin/hep/dataforge/data/dataSetMeta.kt | 4 +- .../kotlin/hep/dataforge/data/ActionsTest.kt | 39 ++++++ .../hep/dataforge/data/DataTreeBuilderTest.kt | 65 +++++++--- .../kotlin/hep/dataforge/names/Name.kt | 2 +- .../kotlin/hep/dataforge/workspace/Task.kt | 3 +- .../hep/dataforge/workspace/TaskModel.kt | 11 +- .../hep/dataforge/workspace/TaskBuilder.kt | 25 ++-- .../dataforge/workspace/WorkspaceBuilder.kt | 4 +- .../workspace/DataPropagationTest.kt | 2 +- .../workspace/SimpleWorkspaceTest.kt | 7 +- 21 files changed, 301 insertions(+), 200 deletions(-) create mode 100644 dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/ActionsTest.kt diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Action.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Action.kt index 514efe08..3b72c718 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Action.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Action.kt @@ -5,7 +5,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.* /** - * A simple data transformation on a data node. Actions should avoid doing actual dependency evaluation in [run]. + * A simple data transformation on a data node. Actions should avoid doing actual dependency evaluation in [execute]. */ public interface Action { /** @@ -14,7 +14,7 @@ public interface Action { * * [scope] context used to compute the initial result, also it is used for updates propagation */ - public suspend fun run(set: DataSet, meta: Meta, scope: CoroutineScope): DataSet + public suspend fun execute(dataSet: DataSet, meta: Meta = Meta.EMPTY, scope: CoroutineScope? = null): DataSet public companion object } @@ -25,8 +25,8 @@ public interface Action { public infix fun Action.then(action: Action): Action { // TODO introduce composite action and add optimize by adding action to the list return object : Action { - override suspend fun run(set: DataSet, meta: Meta, scope: CoroutineScope): DataSet { - return action.run(this@then.run(set, meta, scope), meta, scope) + override suspend fun execute(dataSet: DataSet, meta: Meta, scope: CoroutineScope?): DataSet { + return action.execute(this@then.execute(dataSet, meta, scope), meta, scope) } } } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/CachingAction.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/CachingAction.kt index 88d001ad..45d3e888 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/CachingAction.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/CachingAction.kt @@ -4,6 +4,7 @@ import hep.dataforge.meta.Meta import hep.dataforge.names.Name import hep.dataforge.names.startsWith import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collect import kotlin.reflect.KClass @@ -29,20 +30,22 @@ public abstract class CachingAction( key: Name = Name.EMPTY, ): Flow> - override suspend fun run( - set: DataSet, + override suspend fun execute( + dataSet: DataSet, meta: Meta, - scope: CoroutineScope, - ): DataSet = DataTree.dynamic(outputType,scope) { - collectFrom(scope.transform(set, meta)) - scope.let { - set.updates.collect { + scope: CoroutineScope?, + ): DataSet = DataTree.dynamic(outputType) { + coroutineScope { + collectFrom(transform(dataSet, meta)) + } + scope?.let { + dataSet.updates.collect { //clear old nodes remove(it) //collect new items - collectFrom(scope.transform(set, meta, it)) + collectFrom(scope.transform(dataSet, meta, it)) //FIXME if the target is data, updates are fired twice } } } -} \ No newline at end of file +} diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt index 954df54d..39983534 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt @@ -44,13 +44,15 @@ public interface Data : Goal, MetaRepr { /** * An empty data containing only meta */ - public fun empty(meta: Meta): Data = object : Data { + public fun empty(meta: Meta): Data = object : Data { override val type: KClass = Nothing::class override val meta: Meta = meta override val dependencies: Collection> = emptyList() - override val deferred: Deferred get() = GlobalScope.async(start = CoroutineStart.LAZY) { - error("The Data is empty and could not be computed") - } + override val deferred: Deferred + get() = GlobalScope.async(start = CoroutineStart.LAZY) { + error("The Data is empty and could not be computed") + } + override fun async(coroutineScope: CoroutineScope): Deferred = deferred override fun reset() {} } @@ -92,7 +94,18 @@ public inline fun Data( public class NamedData internal constructor( override val name: Name, public val data: Data, -) : Data by data, Named +) : Data by data, Named { + override fun toString(): String = buildString { + append("NamedData(name=\"$name\"") + if(data is StaticData){ + append(", value=${data.value}") + } + if(!data.meta.isEmpty()){ + append(", meta=${data.meta}") + } + append(")") + } +} public fun Data.named(name: Name): NamedData = if (this is NamedData) { NamedData(name, this.data) diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSet.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSet.kt index c8373949..84c053f4 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSet.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSet.kt @@ -76,4 +76,6 @@ public suspend fun DataSet<*>.toMeta(): Meta = Meta { } } } -} \ No newline at end of file +} + +public val DataSet.updatesWithData: Flow> get() = updates.mapNotNull { getData(it)?.named(it) } \ No newline at end of file diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSetBuilder.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSetBuilder.kt index 2ddc3106..17120270 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSetBuilder.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSetBuilder.kt @@ -4,50 +4,81 @@ import hep.dataforge.meta.DFExperimental import hep.dataforge.meta.Meta import hep.dataforge.meta.MetaBuilder import hep.dataforge.names.Name +import hep.dataforge.names.plus import hep.dataforge.names.toName import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collect public interface DataSetBuilder { - public fun remove(name: Name) + /** + * Remove all data items starting with [name] + */ + public suspend fun remove(name: Name) - public operator fun set(name: Name, data: Data?) + public suspend fun set(name: Name, data: Data?) - public suspend fun set(name: Name, dataSet: DataSet) - - public operator fun set(name: Name, block: DataSetBuilder.() -> Unit) + /** + * Set a current state of given [dataSet] into a branch [name]. Does not propagate updates + */ + public suspend fun set(name: Name, dataSet: DataSet){ + //remove previous items + remove(name) + //Set new items + dataSet.flow().collect { + set(name + it.name, it.data) + } + } /** * Append data to node */ - public infix fun String.put(data: Data): Unit = set(toName(), data) + public suspend infix fun String.put(data: Data): Unit = set(toName(), data) /** * Append node */ - public suspend infix fun String.put(tree: DataSet): Unit = set(toName(), tree) + public suspend infix fun String.put(dataSet: DataSet): Unit = set(toName(), dataSet) /** * Build and append node */ - public infix fun String.put(block: DataSetBuilder.() -> Unit): Unit = set(toName(), block) + public suspend infix fun String.put(block: suspend DataSetBuilder.() -> Unit): Unit = set(toName(), block) +} +private class SubSetBuilder(private val parent: DataSetBuilder, private val branch: Name) : + DataSetBuilder { + override suspend fun remove(name: Name) { + parent.remove(branch + name) + } + + override suspend fun set(name: Name, data: Data?) { + parent.set(branch + name, data) + } + + override suspend fun set(name: Name, dataSet: DataSet) { + parent.set(branch + name, dataSet) + } +} + +public suspend fun DataSetBuilder.set(name: Name, block: suspend DataSetBuilder.() -> Unit){ + SubSetBuilder(this,name).apply { block() } } -public operator fun DataSetBuilder.set(name: String, data: Data) { - this@set[name.toName()] = data +public suspend fun DataSetBuilder.set(name: String, data: Data) { + set(name.toName(), data) } -public fun DataSetBuilder.data(name: Name, data: T, meta: Meta = Meta.EMPTY) { +public suspend fun DataSetBuilder.data(name: Name, data: T, meta: Meta = Meta.EMPTY) { set(name, Data.static(data, meta)) } -public fun DataSetBuilder.data(name: Name, data: T, block: MetaBuilder.() -> Unit = {}) { +public suspend fun DataSetBuilder.data(name: Name, data: T, block: MetaBuilder.() -> Unit = {}) { set(name, Data.static(data, Meta(block))) } -public fun DataSetBuilder.data(name: String, data: T, block: MetaBuilder.() -> Unit = {}) { +public suspend fun DataSetBuilder.data(name: String, data: T, block: MetaBuilder.() -> Unit = {}) { set(name.toName(), Data.static(data, Meta(block))) } @@ -55,7 +86,7 @@ public suspend fun DataSetBuilder.set(name: String, set: DataSet this.set(name.toName(), set) } -public operator fun DataSetBuilder.set(name: String, block: DataSetBuilder.() -> Unit): Unit = +public suspend fun DataSetBuilder.set(name: String, block: suspend DataSetBuilder.() -> Unit): Unit = this@set.set(name.toName(), block) @@ -63,9 +94,15 @@ public operator fun DataSetBuilder.set(name: String, block: DataSet * Update data with given node data and meta with node meta. */ @DFExperimental -public suspend fun DataSetBuilder.update(tree: DataSet): Unit = coroutineScope{ +public suspend fun DataSetBuilder.update(tree: DataSet): Unit = coroutineScope { tree.flow().collect { //TODO check if the place is occupied set(it.name, it.data) } } + +public suspend fun DataSetBuilder.collectFrom(flow: Flow>) { + flow.collect { + set(it.name, it.data) + } +} diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataTree.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataTree.kt index 939066a9..ac4d3166 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataTree.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataTree.kt @@ -46,9 +46,11 @@ public interface DataTree : DataSet { override fun flow(): Flow> = flow { items().forEach { (token, childItem: DataTreeItem) -> - when (childItem) { - is DataTreeItem.Leaf -> emit(childItem.data.named(token.asName())) - is DataTreeItem.Node -> emitAll(childItem.tree.flow().map { it.named(token + it.name) }) + if(!token.body.startsWith("@")) { + when (childItem) { + is DataTreeItem.Leaf -> emit(childItem.data.named(token.asName())) + is DataTreeItem.Node -> emitAll(childItem.tree.flow().map { it.named(token + it.name) }) + } } } } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/GroupRule.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/GroupRule.kt index 0fbff04f..fdf83372 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/GroupRule.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/GroupRule.kt @@ -41,7 +41,7 @@ public interface GroupRule { set.flow().collect { data -> val tagValue = data.meta[key]?.string ?: defaultTagValue - map.getOrPut(tagValue) { MutableDataTree(dataType, scope) }.set(data.name, data.data) + map.getOrPut(tagValue) { MutableDataTree(dataType) }.set(data.name, data.data) } return map diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/MapAction.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/MapAction.kt index e51f5fb0..47e1c411 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/MapAction.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/MapAction.kt @@ -33,16 +33,16 @@ public class MapActionBuilder(public var name: Name, public var meta: Meta } -public class MapAction( +public class MapAction( public val outputType: KClass, private val block: MapActionBuilder.() -> Unit, ) : Action { - override suspend fun run( - set: DataSet, + override suspend fun execute( + dataSet: DataSet, meta: Meta, - scope: CoroutineScope, - ): DataSet = DataTree.dynamic(outputType, scope) { + scope: CoroutineScope?, + ): DataSet { suspend fun mapOne(data: NamedData): NamedData { // Creating a new environment for action using **old** name, old meta and task meta val env = ActionEnv(data.name, data.meta, meta) @@ -65,23 +65,26 @@ public class MapAction( return newData.named(newName) } - collectFrom(set.flow().map(::mapOne)) - scope.launch { - set.updates.collect { name -> - //clear old nodes - remove(name) - //collect new items - collectFrom(set.flowChildren(name).map(::mapOne)) + val flow = dataSet.flow().map(::mapOne) + + return DataTree.dynamic(outputType) { + collectFrom(flow) + scope?.launch { + dataSet.updates.collect { name -> + //clear old nodes + remove(name) + //collect new items + collectFrom(dataSet.flowChildren(name).map(::mapOne)) + } } } } } -public suspend inline fun DataSet.map( - meta: Meta, - updatesScope: CoroutineScope, - noinline action: MapActionBuilder.() -> Unit, -): DataSet = MapAction(R::class, action).run(this, meta, updatesScope) - + +@Suppress("FunctionName") +public inline fun MapAction( + noinline builder: MapActionBuilder.() -> Unit, +): MapAction = MapAction(R::class, builder) diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/MutableDataTree.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/MutableDataTree.kt index 863dda43..d6c49f59 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/MutableDataTree.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/MutableDataTree.kt @@ -3,7 +3,7 @@ package hep.dataforge.data import hep.dataforge.meta.* import hep.dataforge.names.* import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.Job import kotlinx.coroutines.flow.* import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex @@ -15,12 +15,13 @@ import kotlin.reflect.KClass */ public class MutableDataTree( override val dataType: KClass, - public val scope: CoroutineScope, ) : DataTree, DataSetBuilder { private val mutex = Mutex() private val treeItems = HashMap>() - override suspend fun items(): Map> = mutex.withLock { treeItems } + override suspend fun items(): Map> = mutex.withLock { + treeItems.filter { !it.key.body.startsWith("@") } + } private val _updates = MutableSharedFlow() @@ -35,36 +36,37 @@ public class MutableDataTree( } } - override fun remove(name: Name) { - scope.launch { - if (name.isEmpty()) error("Can't remove the root node") - (getItem(name.cutLast()).tree as? MutableDataTree)?.remove(name.lastOrNull()!!) - } + override suspend fun remove(name: Name) { + if (name.isEmpty()) error("Can't remove the root node") + (getItem(name.cutLast()).tree as? MutableDataTree)?.remove(name.lastOrNull()!!) } - private suspend fun set(token: NameToken, node: DataSet) { - //if (_map.containsKey(token)) error("Tree entry with name $token is not empty") - mutex.withLock { - treeItems[token] = DataTreeItem.Node(node.toMutableTree(scope)) - coroutineScope { - node.updates.onEach { - _updates.emit(token + it) - }.launchIn(this) - } - _updates.emit(token.asName()) - } - } +// private suspend fun set(token: NameToken, node: DataSet) { +// //if (_map.containsKey(token)) error("Tree entry with name $token is not empty") +// mutex.withLock { +// treeItems[token] = DataTreeItem.Node(node.toMutableTree()) +// coroutineScope { +// node.updates.onEach { +// _updates.emit(token + it) +// }.launchIn(this) +// } +// _updates.emit(token.asName()) +// } +// } private suspend fun set(token: NameToken, data: Data) { mutex.withLock { treeItems[token] = DataTreeItem.Leaf(data) - _updates.emit(token.asName()) } } private suspend fun getOrCreateNode(token: NameToken): MutableDataTree = (treeItems[token] as? DataTreeItem.Node)?.tree as? MutableDataTree - ?: MutableDataTree(dataType, scope).also { set(token, it) } + ?: MutableDataTree(dataType).also { + mutex.withLock { + treeItems[token] = DataTreeItem.Node(it) + } + } private suspend fun getOrCreateNode(name: Name): MutableDataTree { return when (name.length) { @@ -74,83 +76,53 @@ public class MutableDataTree( } } - override fun set(name: Name, data: Data?) { + override suspend fun set(name: Name, data: Data?) { if (data == null) { remove(name) } else { - scope.launch { - when (name.length) { - 0 -> error("Can't add data with empty name") - 1 -> set(name.firstOrNull()!!, data) - 2 -> getOrCreateNode(name.cutLast()).set(name.lastOrNull()!!, data) - } + when (name.length) { + 0 -> error("Can't add data with empty name") + 1 -> set(name.firstOrNull()!!, data) + 2 -> getOrCreateNode(name.cutLast()).set(name.lastOrNull()!!, data) } } + _updates.emit(name) } - private suspend fun setTree(name: Name, node: MutableDataTree) { - when (name.length) { - 0 -> error("Can't add data with empty name") - 1 -> set(name.firstOrNull()!!, node) - 2 -> getOrCreateNode(name.cutLast()).set(name.lastOrNull()!!, node) + /** + * Copy given data set and mirror its changes to this [MutableDataTree] in [this@setAndObserve]. Returns an update [Job] + */ + public fun CoroutineScope.setAndObserve(name: Name, dataSet: DataSet): Job = launch { + set(name, dataSet) + dataSet.updates.collect { nameInBranch -> + set(name + nameInBranch, dataSet.getData(nameInBranch)) } } - - override suspend fun set(name: Name, dataSet: DataSet): Unit { - if (dataSet is MutableDataTree) { - setTree(name, dataSet) - } else { - setTree(name, dataSet.toMutableTree(scope)) - } - } - - override fun set(name: Name, block: DataSetBuilder.() -> Unit) { - scope.launch { - setTree(name, MutableDataTree(dataType, scope).apply(block)) - } - } - - public fun collectFrom(flow: Flow>) { - flow.onEach { - set(it.name, it.data) - }.launchIn(scope) - } } +/** + * Create a dynamic tree. Initial data is placed synchronously. Updates are propagated via [updatesScope] + */ public suspend fun DataTree.Companion.dynamic( type: KClass, - updatesScope: CoroutineScope, block: suspend MutableDataTree.() -> Unit, ): DataTree { - val tree = MutableDataTree(type, updatesScope) + val tree = MutableDataTree(type) tree.block() return tree } public suspend inline fun DataTree.Companion.dynamic( - updatesScope: CoroutineScope, crossinline block: suspend MutableDataTree.() -> Unit, -): DataTree = MutableDataTree(T::class, updatesScope).apply { block() } +): DataTree = MutableDataTree(T::class).apply { block() } public suspend inline fun MutableDataTree.set( name: Name, noinline block: suspend MutableDataTree.() -> Unit, -): Unit = set(name, DataTree.dynamic(T::class, scope, block)) +): Unit = set(name, DataTree.dynamic(T::class, block)) public suspend inline fun MutableDataTree.set( name: String, noinline block: suspend MutableDataTree.() -> Unit, -): Unit = set(name.toName(), DataTree.dynamic(T::class, scope, block)) - -/** - * Generate a mutable builder from this node. Node content is not changed - */ -public suspend fun DataSet.toMutableTree( - scope: CoroutineScope, -): MutableDataTree = MutableDataTree(dataType, scope).apply { - flow().collect { set(it.name, it.data) } - this@toMutableTree.updates.onEach { - set(it, getData(it)) - }.launchIn(scope) -} +): Unit = set(name.toName(), DataTree.dynamic(T::class, block)) diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt index 851cc82f..f2e35767 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt @@ -13,7 +13,6 @@ import kotlin.collections.set import kotlin.reflect.KClass - public class SplitBuilder(public val name: Name, public val meta: Meta) { public class FragmentRule(public val name: Name, public var meta: MetaBuilder) { @@ -44,11 +43,11 @@ public class SplitAction( private val action: SplitBuilder.() -> Unit, ) : Action { - override suspend fun run( - set: DataSet, + override suspend fun execute( + dataSet: DataSet, meta: Meta, - scope: CoroutineScope, - ): DataSet = DataTree.dynamic(outputType, scope) { + scope: CoroutineScope?, + ): DataSet { suspend fun splitOne(data: NamedData): Flow> { val laminate = Laminate(data.meta, meta) @@ -63,13 +62,15 @@ public class SplitAction( } } - collectFrom(set.flow().flatMapConcat(transform = ::splitOne)) - scope.launch { - set.updates.collect { name -> - //clear old nodes - remove(name) - //collect new items - collectFrom(set.flowChildren(name).flatMapConcat(transform = ::splitOne)) + return DataTree.dynamic(outputType) { + collectFrom(dataSet.flow().flatMapConcat(transform = ::splitOne)) + scope?.launch { + dataSet.updates.collect { name -> + //clear old nodes + remove(name) + //collect new items + collectFrom(dataSet.flowChildren(name).flatMapConcat(transform = ::splitOne)) + } } } } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/StaticDataTree.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/StaticDataTree.kt index 08e989d9..0d26005a 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/StaticDataTree.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/StaticDataTree.kt @@ -5,7 +5,8 @@ import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.* import kotlin.reflect.KClass -private class StaticDataTree( +@PublishedApi +internal class StaticDataTree( override val dataType: KClass, ) : DataSetBuilder, DataTree { @@ -13,9 +14,9 @@ private class StaticDataTree( override val updates: Flow = emptyFlow() - override suspend fun items(): Map> = items + override suspend fun items(): Map> = items.filter { !it.key.body.startsWith("@") } - override fun remove(name: Name) { + override suspend fun remove(name: Name) { when (name.length) { 0 -> error("Can't remove root tree node") 1 -> items.remove(name.firstOrNull()!!) @@ -34,7 +35,7 @@ private class StaticDataTree( else -> getOrCreateNode(name.cutLast()).getOrCreateNode(name.lastOrNull()!!.asName()) } - private operator fun set(name: Name, item: DataTreeItem?) { + private suspend fun set(name: Name, item: DataTreeItem?) { if (name.isEmpty()) error("Can't set top level tree node") if (item == null) { remove(name) @@ -43,7 +44,7 @@ private class StaticDataTree( } } - override fun set(name: Name, data: Data?) { + override suspend fun set(name: Name, data: Data?) { set(name, data?.let { DataTreeItem.Leaf(it) }) } @@ -58,20 +59,15 @@ private class StaticDataTree( } } } - - override fun set(name: Name, block: DataSetBuilder.() -> Unit) { - val tree = StaticDataTree(dataType).apply(block) - set(name, DataTreeItem.Node(tree)) - } } -public fun DataTree.Companion.static( +public suspend fun DataTree.Companion.static( dataType: KClass, - block: DataSetBuilder.() -> Unit, -): DataTree = StaticDataTree(dataType).apply(block) + block: suspend DataSetBuilder.() -> Unit, +): DataTree = StaticDataTree(dataType).apply { block() } -public inline fun DataTree.Companion.static( - noinline block: DataSetBuilder.() -> Unit, +public suspend inline fun DataTree.Companion.static( + noinline block: suspend DataSetBuilder.() -> Unit, ): DataTree = static(T::class, block) public suspend fun DataSet.toStaticTree(): DataTree = StaticDataTree(dataType).apply { diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataSetMeta.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataSetMeta.kt index 82af17cd..4b833e10 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataSetMeta.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataSetMeta.kt @@ -12,9 +12,9 @@ public suspend fun DataSet<*>.getMeta(): Meta? = getData(DataSet.META_KEY)?.meta /** * Add meta-data node to a [DataSet] */ -public fun DataSetBuilder<*>.meta(meta: Meta): Unit = set(DataSet.META_KEY, Data.empty(meta)) +public suspend fun DataSetBuilder<*>.meta(meta: Meta): Unit = set(DataSet.META_KEY, Data.empty(meta)) /** * Add meta-data node to a [DataSet] */ -public fun DataSetBuilder<*>.meta(metaBuilder: MetaBuilder.() -> Unit): Unit = meta(Meta(metaBuilder)) \ No newline at end of file +public suspend fun DataSetBuilder<*>.meta(metaBuilder: MetaBuilder.() -> Unit): Unit = meta(Meta(metaBuilder)) \ No newline at end of file diff --git a/dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/ActionsTest.kt b/dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/ActionsTest.kt new file mode 100644 index 00000000..d1944785 --- /dev/null +++ b/dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/ActionsTest.kt @@ -0,0 +1,39 @@ +package hep.dataforge.data + +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Test +import kotlin.test.assertEquals + +class ActionsTest { + val data: DataTree = runBlocking { + DataTree.static { + repeat(10) { + data(it.toString(), it) + } + } + } + + @Test + fun testStaticMapAction() { + val plusOne = MapAction { + result { it + 1 } + } + runBlocking { + val result = plusOne.execute(data) + assertEquals(2, result.getData("1")?.value()) + } + } + + @Test + fun testDynamicMapAction() { + val plusOne = MapAction { + result { it + 1 } + } + val datum = runBlocking { + val result = plusOne.execute(data, scope = this) + result.getData("1")?.value() + } + assertEquals(2, datum) + } + +} \ No newline at end of file diff --git a/dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/DataTreeBuilderTest.kt b/dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/DataTreeBuilderTest.kt index f93d6ee7..3770a670 100644 --- a/dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/DataTreeBuilderTest.kt +++ b/dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/DataTreeBuilderTest.kt @@ -1,34 +1,69 @@ package hep.dataforge.data -import kotlinx.coroutines.runBlocking +import hep.dataforge.names.toName +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.collect import kotlin.test.Test -import kotlin.test.assertTrue +import kotlin.test.assertEquals -internal class DataTreeBuilderTest{ +internal class DataTreeBuilderTest { @Test - fun testDataUpdate(){ - val updateData: DataTree = DataTree.static{ + fun testDataUpdate() = runBlocking { + val updateData: DataTree = DataTree.static { "update" put { "a" put Data.static("a") "b" put Data.static("b") } } - val node = DataTree.static{ - set("primary"){ - data("a","a") - data("b","b") - } - data("root","root") - runBlocking { - update(updateData) + val node = DataTree.static { + set("primary") { + data("a", "a") + data("b", "b") } + data("root", "root") + update(updateData) } - assertTrue { node.branch("update.a") != null } - assertTrue { node.branch("primary.a") != null } + assertEquals("a", node.getData("update.a")?.value()) + assertEquals("a", node.getData("primary.a")?.value()) + } + + @Test + fun testDynamicUpdates() = runBlocking { + try { + supervisorScope { + val subNode = DataTree.dynamic { + launch { + repeat(10) { + delay(10) + data("value", it) + } + } + } + launch { + subNode.updatesWithData.collect { + println(it) + } + } + val rootNode = DataTree.dynamic { + setAndObserve("sub".toName(), subNode) + } + + launch { + rootNode.updatesWithData.collect { + println(it) + } + } + delay(200) + assertEquals(9, rootNode.getData("sub.value")?.value()) + cancel() + } + } catch (t: Throwable) { + if (t !is CancellationException) throw t + } } } \ No newline at end of file diff --git a/dataforge-meta/src/commonMain/kotlin/hep/dataforge/names/Name.kt b/dataforge-meta/src/commonMain/kotlin/hep/dataforge/names/Name.kt index 297cc29f..5c1a6bd3 100644 --- a/dataforge-meta/src/commonMain/kotlin/hep/dataforge/names/Name.kt +++ b/dataforge-meta/src/commonMain/kotlin/hep/dataforge/names/Name.kt @@ -193,7 +193,7 @@ public fun Name.endsWith(name: Name): Boolean = * if [this] starts with given [head] name, returns the reminder of the name (could be empty). Otherwise returns null */ public fun Name.removeHeadOrNull(head: Name): Name? = if (startsWith(head)) { - Name(tokens.subList(head.length, head.length)) + Name(tokens.subList(head.length, length)) } else { null } \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Task.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Task.kt index b17eacbd..4f6c9c17 100644 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Task.kt +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Task.kt @@ -3,13 +3,12 @@ package hep.dataforge.workspace import hep.dataforge.data.DataSet import hep.dataforge.meta.Meta import hep.dataforge.meta.descriptors.Described -import hep.dataforge.misc.Named import hep.dataforge.misc.Type import hep.dataforge.workspace.Task.Companion.TYPE import kotlin.reflect.KClass @Type(TYPE) -public interface Task : Named, Described { +public interface Task : Described { /** * The explicit type of the node returned by the task diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskModel.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskModel.kt index 590d2e2a..51072ed9 100644 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskModel.kt +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskModel.kt @@ -104,12 +104,11 @@ public fun TaskDependencyContainer.data( pattern: String? = null, from: String? = null, to: String? = null, -): DataDependency = - data { - pattern?.let { this.pattern = it } - from?.let { this.from = it } - to?.let { this.to = it } - } +): DataDependency = data { + pattern?.let { this.pattern = it } + from?.let { this.from = it } + to?.let { this.to = it } +} ///** // * Add all data as root node diff --git a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt index f41cfa49..6c95bb64 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt @@ -35,14 +35,7 @@ public class TaskBuilder(private val name: Name, public val type: KClas public val meta: Meta, public val context: Context, public val data: DataSet, - ) { -// public operator fun DirectTaskDependency.invoke(): DataSet = if (placement.isEmpty()) { -// data.cast(task.type) -// } else { -// data[placement].tree?.cast(task.type) -// ?: error("Could not find results of direct task dependency $this at \"$placement\"") -// } - } + ) /** * Add a transformation on untyped data @@ -86,7 +79,7 @@ public class TaskBuilder(private val name: Name, public val type: KClas crossinline block: TaskEnv.() -> Action, ) { transform { data: DataSet -> - block().run(data, meta, context) + block().execute(data, meta, context) } } @@ -194,15 +187,19 @@ public class TaskBuilder(private val name: Name, public val type: KClas logger.warn { "No transformation present, returning input data" } dataSet.castOrNull(type) ?: error("$type expected, but $type received") } else { - val builder = MutableDataTree(type, workspace.context) - dataTransforms.forEach { transformation -> - val res = transformation(workspace.context, model, dataSet) - builder.update(res) + DataTree.dynamic(type, workspace.context){ + dataTransforms.forEach { transformation -> + val res = transformation(workspace.context, model, dataSet) + update(res) + } } - builder } } } } } +@DFExperimental +public suspend inline fun TaskBuilder.TaskEnv.dataTree( + crossinline block: suspend MutableDataTree.() -> Unit, +): DataTree = DataTree.dynamic(context, block) \ No newline at end of file diff --git a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/WorkspaceBuilder.kt b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/WorkspaceBuilder.kt index 83be2f63..48942ad5 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/WorkspaceBuilder.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/WorkspaceBuilder.kt @@ -37,6 +37,8 @@ public fun WorkspaceBuilder.target(name: String, block: MetaBuilder.() -> Unit) targets[name] = Meta(block).seal() } +class WorkspaceTask(val workspace: Workspace, val name: String) + /** * Use existing target as a base updating it with the block */ @@ -52,7 +54,7 @@ public fun WorkspaceBuilder.task( name: String, type: KClass, builder: TaskBuilder.() -> Unit, -): Task = TaskBuilder(name.toName(), type).apply(builder).build().also { tasks.add(it) } +): WorkspaceTask = TaskBuilder(name.toName(), type).apply(builder).build().also { tasks.add(it) } public inline fun WorkspaceBuilder.task( name: String, diff --git a/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/DataPropagationTest.kt b/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/DataPropagationTest.kt index 323b44de..9e98a276 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/DataPropagationTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/DataPropagationTest.kt @@ -33,7 +33,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() { val testSingleData = task("singleData", Int::class) { model { - data("myData\\[12\\]") + data(pattern = "myData\\[12\\]") } transform { data -> DataTree.dynamic(context) { diff --git a/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/SimpleWorkspaceTest.kt b/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/SimpleWorkspaceTest.kt index c0379594..817f37b6 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/SimpleWorkspaceTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/SimpleWorkspaceTest.kt @@ -85,7 +85,7 @@ class SimpleWorkspaceTest { transform { data -> val squareNode = data.branch("square").filterIsInstance() //squareDep() val linearNode = data.branch("linear").filterIsInstance() //linearDep() - DataTree.dynamic(context) { + dataTree { squareNode.flow().collect { val newData: Data = Data { val squareValue = squareNode.getData(it.name)!!.value() @@ -150,6 +150,7 @@ class SimpleWorkspaceTest { } @Test + @Timeout(1) fun testWorkspace() { val node = workspace.runBlocking("sum") val res = node.first() @@ -157,7 +158,7 @@ class SimpleWorkspaceTest { } @Test - @Timeout(400) + @Timeout(1) fun testMetaPropagation() { val node = workspace.runBlocking("sum") { "testFlag" put true } val res = node.first()?.value() @@ -179,7 +180,7 @@ class SimpleWorkspaceTest { } @Test - fun testGather() { + fun testFilter() { val node = workspace.runBlocking("filterOne") runBlocking { assertEquals(12, node.first()?.value())