From ac8631e3a09e27a531846c4bc4ca44e5c2a12cee Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sun, 17 Jan 2021 19:11:49 +0300 Subject: [PATCH] [WIP] redo Workspace --- .../{MutableDataTree.kt => ActiveDataTree.kt} | 55 +++---- .../hep/dataforge/data/CachingAction.kt | 2 +- .../kotlin/hep/dataforge/data/Data.kt | 22 --- .../kotlin/hep/dataforge/data/DataSet.kt | 12 +- .../kotlin/hep/dataforge/data/DataTree.kt | 15 -- .../kotlin/hep/dataforge/data/GroupRule.kt | 4 +- .../kotlin/hep/dataforge/data/MapAction.kt | 2 +- .../kotlin/hep/dataforge/data/NamedData.kt | 32 ++++ .../kotlin/hep/dataforge/data/SplitAction.kt | 2 +- .../hep/dataforge/data/StaticDataTree.kt | 12 +- .../hep/dataforge/data/DataTreeBuilderTest.kt | 4 +- .../hep/dataforge/scripting/BuildersKtTest.kt | 1 - .../hep/dataforge/workspace/Dependency.kt | 8 +- .../hep/dataforge/workspace/GenericTask.kt | 55 ------- .../dataforge/workspace/SimpleWorkspace.kt | 11 +- .../hep/dataforge/workspace/StageData.kt | 47 ++++++ .../hep/dataforge/workspace/StageDataSet.kt | 46 ++++++ .../kotlin/hep/dataforge/workspace/Task.kt | 49 ------ .../hep/dataforge/workspace/TaskModel.kt | 141 ------------------ .../hep/dataforge/workspace/WorkStage.kt | 23 +++ .../hep/dataforge/workspace/Workspace.kt | 55 ++++--- .../hep/dataforge/workspace/TaskBuilder.kt | 11 +- .../dataforge/workspace/WorkspaceBuilder.kt | 16 +- .../dataforge/workspace/WorkspacePlugin.kt | 10 +- .../workspace/DataPropagationTest.kt | 13 +- .../workspace/SimpleWorkspaceTest.kt | 8 +- 26 files changed, 256 insertions(+), 400 deletions(-) rename dataforge-data/src/commonMain/kotlin/hep/dataforge/data/{MutableDataTree.kt => ActiveDataTree.kt} (57%) create mode 100644 dataforge-data/src/commonMain/kotlin/hep/dataforge/data/NamedData.kt delete mode 100644 dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/GenericTask.kt create mode 100644 dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/StageData.kt create mode 100644 dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/StageDataSet.kt delete mode 100644 dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Task.kt delete mode 100644 dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskModel.kt create mode 100644 dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/WorkStage.kt diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/MutableDataTree.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/ActiveDataTree.kt similarity index 57% rename from dataforge-data/src/commonMain/kotlin/hep/dataforge/data/MutableDataTree.kt rename to dataforge-data/src/commonMain/kotlin/hep/dataforge/data/ActiveDataTree.kt index d6c49f59..82563eac 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/MutableDataTree.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/ActiveDataTree.kt @@ -11,11 +11,11 @@ import kotlinx.coroutines.sync.withLock import kotlin.reflect.KClass /** - * A mutable [DataTree.Companion.dynamic]. It + * A mutable [DataTree.Companion.active]. It */ -public class MutableDataTree( +public class ActiveDataTree( override val dataType: KClass, -) : DataTree, DataSetBuilder { +) : DataTree, DataSetBuilder, ActiveDataSet { private val mutex = Mutex() private val treeItems = HashMap>() @@ -38,37 +38,24 @@ public class MutableDataTree( override suspend fun remove(name: Name) { if (name.isEmpty()) error("Can't remove the root node") - (getItem(name.cutLast()).tree as? MutableDataTree)?.remove(name.lastOrNull()!!) + (getItem(name.cutLast()).tree as? ActiveDataTree)?.remove(name.lastOrNull()!!) } -// private suspend fun set(token: NameToken, node: DataSet) { -// //if (_map.containsKey(token)) error("Tree entry with name $token is not empty") -// mutex.withLock { -// treeItems[token] = DataTreeItem.Node(node.toMutableTree()) -// coroutineScope { -// node.updates.onEach { -// _updates.emit(token + it) -// }.launchIn(this) -// } -// _updates.emit(token.asName()) -// } -// } - private suspend fun set(token: NameToken, data: Data) { mutex.withLock { treeItems[token] = DataTreeItem.Leaf(data) } } - private suspend fun getOrCreateNode(token: NameToken): MutableDataTree = - (treeItems[token] as? DataTreeItem.Node)?.tree as? MutableDataTree - ?: MutableDataTree(dataType).also { + private suspend fun getOrCreateNode(token: NameToken): ActiveDataTree = + (treeItems[token] as? DataTreeItem.Node)?.tree as? ActiveDataTree + ?: ActiveDataTree(dataType).also { mutex.withLock { treeItems[token] = DataTreeItem.Node(it) } } - private suspend fun getOrCreateNode(name: Name): MutableDataTree { + private suspend fun getOrCreateNode(name: Name): ActiveDataTree { return when (name.length) { 0 -> this 1 -> getOrCreateNode(name.firstOrNull()!!) @@ -90,7 +77,7 @@ public class MutableDataTree( } /** - * Copy given data set and mirror its changes to this [MutableDataTree] in [this@setAndObserve]. Returns an update [Job] + * Copy given data set and mirror its changes to this [ActiveDataTree] in [this@setAndObserve]. Returns an update [Job] */ public fun CoroutineScope.setAndObserve(name: Name, dataSet: DataSet): Job = launch { set(name, dataSet) @@ -103,26 +90,26 @@ public class MutableDataTree( /** * Create a dynamic tree. Initial data is placed synchronously. Updates are propagated via [updatesScope] */ -public suspend fun DataTree.Companion.dynamic( +public suspend fun DataTree.Companion.active( type: KClass, - block: suspend MutableDataTree.() -> Unit, + block: suspend ActiveDataTree.() -> Unit, ): DataTree { - val tree = MutableDataTree(type) + val tree = ActiveDataTree(type) tree.block() return tree } -public suspend inline fun DataTree.Companion.dynamic( - crossinline block: suspend MutableDataTree.() -> Unit, -): DataTree = MutableDataTree(T::class).apply { block() } +public suspend inline fun DataTree.Companion.active( + crossinline block: suspend ActiveDataTree.() -> Unit, +): DataTree = ActiveDataTree(T::class).apply { block() } -public suspend inline fun MutableDataTree.set( +public suspend inline fun ActiveDataTree.set( name: Name, - noinline block: suspend MutableDataTree.() -> Unit, -): Unit = set(name, DataTree.dynamic(T::class, block)) + noinline block: suspend ActiveDataTree.() -> Unit, +): Unit = set(name, DataTree.active(T::class, block)) -public suspend inline fun MutableDataTree.set( +public suspend inline fun ActiveDataTree.set( name: String, - noinline block: suspend MutableDataTree.() -> Unit, -): Unit = set(name.toName(), DataTree.dynamic(T::class, block)) + noinline block: suspend ActiveDataTree.() -> Unit, +): Unit = set(name.toName(), DataTree.active(T::class, block)) diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/CachingAction.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/CachingAction.kt index 45d3e888..e440a765 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/CachingAction.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/CachingAction.kt @@ -34,7 +34,7 @@ public abstract class CachingAction( dataSet: DataSet, meta: Meta, scope: CoroutineScope?, - ): DataSet = DataTree.dynamic(outputType) { + ): DataSet = DataTree.active(outputType) { coroutineScope { collectFrom(transform(dataSet, meta)) } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt index 39983534..57647c03 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt @@ -3,9 +3,7 @@ package hep.dataforge.data import hep.dataforge.meta.Meta import hep.dataforge.meta.MetaRepr import hep.dataforge.meta.isEmpty -import hep.dataforge.misc.Named import hep.dataforge.misc.Type -import hep.dataforge.names.Name import kotlinx.coroutines.* import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext @@ -91,27 +89,7 @@ public inline fun Data( noinline block: suspend CoroutineScope.() -> T, ): Data = Data(T::class, meta, context, dependencies, block) -public class NamedData internal constructor( - override val name: Name, - public val data: Data, -) : Data by data, Named { - override fun toString(): String = buildString { - append("NamedData(name=\"$name\"") - if(data is StaticData){ - append(", value=${data.value}") - } - if(!data.meta.isEmpty()){ - append(", meta=${data.meta}") - } - append(")") - } -} -public fun Data.named(name: Name): NamedData = if (this is NamedData) { - NamedData(name, this.data) -} else { - NamedData(name, this) -} public fun Data.map( outputType: KClass, diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSet.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSet.kt index 84c053f4..1c68c02a 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSet.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSet.kt @@ -34,6 +34,12 @@ public interface DataSet { public suspend fun listChildren(prefix: Name = Name.EMPTY): List = flow().map { it.name }.filter { it.startsWith(prefix) && (it.length == prefix.length + 1) }.toList() + public companion object { + public val META_KEY: Name = "@meta".asName() + } +} + +public interface ActiveDataSet: DataSet{ /** * A flow of updated item names. Updates are propagated in a form of [Flow] of names of updated nodes. * Those can include new data items and replacement of existing ones. The replaced items could update existing data content @@ -41,12 +47,10 @@ public interface DataSet { * */ public val updates: Flow - - public companion object { - public val META_KEY: Name = "@meta".asName() - } } +public val DataSet.updates: Flow get() = if(this is ActiveDataSet) updates else emptyFlow() + /** * Flow all data nodes with names starting with [branchName] */ diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataTree.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataTree.kt index ac4d3166..659ff029 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataTree.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataTree.kt @@ -31,19 +31,6 @@ public interface DataTree : DataSet { */ public suspend fun items(): Map> -// override fun flow(): Flow> = flow flowBuilder@{ -// val item = getItem(root) ?: return@flowBuilder -// when (item) { -// is DataTreeItem.Leaf -> emit(item.data.named(root)) -// is DataTreeItem.Node -> item.tree.items().forEach { (token, childItem: DataTreeItem) -> -// when (childItem) { -// is DataTreeItem.Leaf -> emit(childItem.data.named(root + token)) -// is DataTreeItem.Node -> emitAll(childItem.tree.flow().map { it.named(root + token + it.name) }) -// } -// } -// } -// } - override fun flow(): Flow> = flow { items().forEach { (token, childItem: DataTreeItem) -> if(!token.body.startsWith("@")) { @@ -104,7 +91,5 @@ public fun DataTree.itemFlow(): Flow>> = public fun DataTree.branch(branchName: Name): DataTree = object : DataTree { override val dataType: KClass get() = this@branch.dataType - override val updates: Flow = this@branch.updates.mapNotNull { it.removeHeadOrNull(branchName) } - override suspend fun items(): Map> = getItem(branchName).tree?.items() ?: emptyMap() } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/GroupRule.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/GroupRule.kt index fdf83372..fcd22eae 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/GroupRule.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/GroupRule.kt @@ -37,11 +37,11 @@ public interface GroupRule { object : GroupRule { override suspend fun gather(dataType: KClass, set: DataSet): Map> { - val map = HashMap>() + val map = HashMap>() set.flow().collect { data -> val tagValue = data.meta[key]?.string ?: defaultTagValue - map.getOrPut(tagValue) { MutableDataTree(dataType) }.set(data.name, data.data) + map.getOrPut(tagValue) { ActiveDataTree(dataType) }.set(data.name, data.data) } return map diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/MapAction.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/MapAction.kt index 47e1c411..ec6442d9 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/MapAction.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/MapAction.kt @@ -67,7 +67,7 @@ public class MapAction( val flow = dataSet.flow().map(::mapOne) - return DataTree.dynamic(outputType) { + return DataTree.active(outputType) { collectFrom(flow) scope?.launch { dataSet.updates.collect { name -> diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/NamedData.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/NamedData.kt new file mode 100644 index 00000000..aa5afcdc --- /dev/null +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/NamedData.kt @@ -0,0 +1,32 @@ +package hep.dataforge.data + +import hep.dataforge.meta.isEmpty +import hep.dataforge.misc.Named +import hep.dataforge.names.Name + +public interface NamedData : Named, Data { + override val name: Name + public val data: Data +} + +private class NamedDataImpl( + override val name: Name, + override val data: Data, +) : Data by data, NamedData { + override fun toString(): String = buildString { + append("NamedData(name=\"$name\"") + if (data is StaticData) { + append(", value=${data.value}") + } + if (!data.meta.isEmpty()) { + append(", meta=${data.meta}") + } + append(")") + } +} + +public fun Data.named(name: Name): NamedData = if (this is NamedData) { + NamedDataImpl(name, this.data) +} else { + NamedDataImpl(name, this) +} \ No newline at end of file diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt index f2e35767..48f4fc93 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt @@ -62,7 +62,7 @@ public class SplitAction( } } - return DataTree.dynamic(outputType) { + return DataTree.active(outputType) { collectFrom(dataSet.flow().flatMapConcat(transform = ::splitOne)) scope?.launch { dataSet.updates.collect { name -> diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/StaticDataTree.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/StaticDataTree.kt index 0d26005a..c3191ef9 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/StaticDataTree.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/StaticDataTree.kt @@ -12,8 +12,6 @@ internal class StaticDataTree( private val items: MutableMap> = HashMap() - override val updates: Flow = emptyFlow() - override suspend fun items(): Map> = items.filter { !it.key.body.startsWith("@") } override suspend fun remove(name: Name) { @@ -61,15 +59,15 @@ internal class StaticDataTree( } } -public suspend fun DataTree.Companion.static( +public suspend fun DataTree( dataType: KClass, block: suspend DataSetBuilder.() -> Unit, ): DataTree = StaticDataTree(dataType).apply { block() } -public suspend inline fun DataTree.Companion.static( +public suspend inline fun DataTree( noinline block: suspend DataSetBuilder.() -> Unit, -): DataTree = static(T::class, block) +): DataTree = DataTree(T::class, block) -public suspend fun DataSet.toStaticTree(): DataTree = StaticDataTree(dataType).apply { - update(this@toStaticTree) +public suspend fun DataSet.seal(): DataTree = DataTree(dataType){ + update(this@seal) } \ No newline at end of file diff --git a/dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/DataTreeBuilderTest.kt b/dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/DataTreeBuilderTest.kt index 3770a670..98feae57 100644 --- a/dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/DataTreeBuilderTest.kt +++ b/dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/DataTreeBuilderTest.kt @@ -35,7 +35,7 @@ internal class DataTreeBuilderTest { fun testDynamicUpdates() = runBlocking { try { supervisorScope { - val subNode = DataTree.dynamic { + val subNode = DataTree.active { launch { repeat(10) { delay(10) @@ -48,7 +48,7 @@ internal class DataTreeBuilderTest { println(it) } } - val rootNode = DataTree.dynamic { + val rootNode = DataTree.active { setAndObserve("sub".toName(), subNode) } diff --git a/dataforge-scripting/src/jvmTest/kotlin/hep/dataforge/scripting/BuildersKtTest.kt b/dataforge-scripting/src/jvmTest/kotlin/hep/dataforge/scripting/BuildersKtTest.kt index 6dd61105..7d96c168 100644 --- a/dataforge-scripting/src/jvmTest/kotlin/hep/dataforge/scripting/BuildersKtTest.kt +++ b/dataforge-scripting/src/jvmTest/kotlin/hep/dataforge/scripting/BuildersKtTest.kt @@ -5,7 +5,6 @@ import hep.dataforge.meta.get import hep.dataforge.meta.int import hep.dataforge.workspace.SimpleWorkspaceBuilder import hep.dataforge.workspace.context -import hep.dataforge.workspace.target import kotlin.test.Test import kotlin.test.assertEquals diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Dependency.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Dependency.kt index 83b2027c..3c342e90 100644 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Dependency.kt +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Dependency.kt @@ -25,7 +25,7 @@ public abstract class TaskDependency( public val meta: Meta, protected val placement: DataPlacement, ) : Dependency() { - public abstract fun resolveTask(workspace: Workspace): Task + public abstract fun resolveTask(workspace: Workspace): WorkStage /** * A name of the dependency for logging and serialization @@ -40,11 +40,11 @@ public abstract class TaskDependency( } public class ExternalTaskDependency( - public val task: Task, + public val task: WorkStage, meta: Meta, placement: DataPlacement, ) : TaskDependency(meta, placement) { - override fun resolveTask(workspace: Workspace): Task = task + override fun resolveTask(workspace: Workspace): WorkStage = task override val name: Name get() = EXTERNAL_TASK_NAME + task.name @@ -64,7 +64,7 @@ public class WorkspaceTaskDependency( meta: Meta, placement: DataPlacement, ) : TaskDependency(meta, placement) { - override fun resolveTask(workspace: Workspace): Task<*> = workspace.tasks[name] + override fun resolveTask(workspace: Workspace): WorkStage<*> = workspace.stages[name] ?: error("Task with name $name is not found in the workspace") override fun toMeta(): Meta { diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/GenericTask.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/GenericTask.kt deleted file mode 100644 index 7c5b039f..00000000 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/GenericTask.kt +++ /dev/null @@ -1,55 +0,0 @@ -package hep.dataforge.workspace - -import hep.dataforge.context.logger -import hep.dataforge.data.DataSet -import hep.dataforge.meta.Meta -import hep.dataforge.meta.descriptors.NodeDescriptor -import hep.dataforge.meta.get -import hep.dataforge.meta.node -import hep.dataforge.names.Name -import kotlin.reflect.KClass - -//data class TaskEnv(val workspace: Workspace, val model: TaskModel) - - -public class GenericTask( - override val name: Name, - override val type: KClass, - override val descriptor: NodeDescriptor, - private val modelTransform: TaskModelBuilder.(Meta) -> Unit, - private val dataTransform: Workspace.() -> suspend TaskModel.(DataSet) -> DataSet -) : Task { - - override suspend fun run(workspace: Workspace, model: TaskModel): DataSet { - //validate model - validate(model) - - // gather data - val input = model.buildInput(workspace)// gather(workspace, model) - - //execute - workspace.logger.info{"Starting task '$name' on ${model.target} with meta: \n${model.meta}"} - val output = dataTransform(workspace).invoke(model, input) - - //handle result - //output.handle(model.context.dispatcher) { this.handle(it) } - - return output - } - - /** - * Build new TaskModel and apply specific model transformation for this - * task. By default model uses the meta node with the same node as the name of the task. - * - * @param workspace - * @param taskMeta - * @return - */ - override fun build(workspace: Workspace, taskMeta: Meta): TaskModel { - val taskMeta = taskMeta[name]?.node ?: taskMeta - val builder = TaskModelBuilder(name, taskMeta) - builder.modelTransform(taskMeta) - return builder.build() - } - //TODO add validation -} \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/SimpleWorkspace.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/SimpleWorkspace.kt index 4da2f694..442e52bf 100644 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/SimpleWorkspace.kt +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/SimpleWorkspace.kt @@ -2,8 +2,7 @@ package hep.dataforge.workspace import hep.dataforge.context.Context import hep.dataforge.context.gather -import hep.dataforge.context.toMap -import hep.dataforge.data.DataTree +import hep.dataforge.data.DataSet import hep.dataforge.meta.Meta import hep.dataforge.names.Name @@ -13,13 +12,13 @@ import hep.dataforge.names.Name */ public class SimpleWorkspace( override val context: Context, - override val data: DataTree, + override val data: DataSet, override val targets: Map, - tasks: Collection> + stages: Map> ) : Workspace { - override val tasks: Map> by lazy { - context.gather>(Task.TYPE) + tasks.toMap() + override val stages: Map> by lazy { + context.gather>(WorkStage.TYPE) + stages } public companion object { diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/StageData.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/StageData.kt new file mode 100644 index 00000000..f17cb09a --- /dev/null +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/StageData.kt @@ -0,0 +1,47 @@ +package hep.dataforge.workspace + +import hep.dataforge.data.Data +import hep.dataforge.data.NamedData +import hep.dataforge.meta.Meta +import hep.dataforge.names.Name + +/** + * A [Workspace]-locked [NamedData], that serves as a computation model. + */ +public interface StageData : NamedData { + /** + * The [Workspace] this data belongs to + */ + public val workspace: Workspace + + /** + * The name of the stage that produced this data. [Name.EMPTY] if the workspace intrinsic data is used. + */ + public val stage: Name + + /** + * Stage configuration used to produce this data. + */ + public val stageMeta: Meta + + /** + * Dependencies that allow to compute transitive dependencies as well. + */ + override val dependencies: Collection> +} + +private class StageDataImpl( + override val workspace: Workspace, + override val data: Data, + override val name: Name, + override val stage: Name, + override val stageMeta: Meta, +) : StageData, Data by data { + override val dependencies: Collection> = data.dependencies.map { + it as? StageData<*> ?: error("StageData can't depend on external data") + } +} + +internal fun Workspace.internalize(data: Data, name: Name, stage: Name, stageMeta: Meta): StageData = + StageDataImpl(this, data, name, stage, stageMeta) + diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/StageDataSet.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/StageDataSet.kt new file mode 100644 index 00000000..c0bf3784 --- /dev/null +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/StageDataSet.kt @@ -0,0 +1,46 @@ +package hep.dataforge.workspace + +import hep.dataforge.data.DataSet +import hep.dataforge.meta.Meta +import hep.dataforge.names.Name +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map + +/** + * A result of a [WorkStage] + */ +public interface StageDataSet : DataSet { + /** + * The [Workspace] this [DataSet] belongs to + */ + public val workspace: Workspace + + /** + * The [Name] of the stage that produced this [DataSet] + */ + public val stageName: Name + + /** + * The configuration of the stage that produced this [DataSet] + */ + public val stageMeta: Meta + + override fun flow(): Flow> + override suspend fun getData(name: Name): StageData? +} + +private class StageDataSetImpl( + override val workspace: Workspace, + val dataSet: DataSet, + override val stageName: Name, + override val stageMeta: Meta, +) : StageDataSet, DataSet by dataSet { + + override fun flow(): Flow> = dataSet.flow().map { + workspace.internalize(it, it.name, stageName, stageMeta) + } + + override suspend fun getData(name: Name): StageData? = dataSet.getData(name)?.let { + workspace.internalize(it, name, stageName, stageMeta) + } +} \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Task.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Task.kt deleted file mode 100644 index 4f6c9c17..00000000 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Task.kt +++ /dev/null @@ -1,49 +0,0 @@ -package hep.dataforge.workspace - -import hep.dataforge.data.DataSet -import hep.dataforge.meta.Meta -import hep.dataforge.meta.descriptors.Described -import hep.dataforge.misc.Type -import hep.dataforge.workspace.Task.Companion.TYPE -import kotlin.reflect.KClass - -@Type(TYPE) -public interface Task : Described { - - /** - * The explicit type of the node returned by the task - */ - public val type: KClass - - /** - * Build a model for this task. Does not run any computations unless task [isEager] - * - * @param workspace - * @param taskMeta - * @return - */ - public fun build(workspace: Workspace, taskMeta: Meta): TaskModel - - /** - * Check if the model is valid and is acceptable by the task. Throw exception if not. - * - * @param model - */ - public fun validate(model: TaskModel) { - if(this.name != model.name) error("The task $name could not be run with model from task ${model.name}") - } - - /** - * Run given task model. Type check expected to be performed before actual - * calculation. - * - * @param workspace - a workspace to run task model in - * @param model - a model to be executed - * @return - */ - public suspend fun run(workspace: Workspace, model: TaskModel): DataSet - - public companion object { - public const val TYPE: String = "task" - } -} \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskModel.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskModel.kt deleted file mode 100644 index 51072ed9..00000000 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskModel.kt +++ /dev/null @@ -1,141 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package hep.dataforge.workspace - -import hep.dataforge.data.DataTree -import hep.dataforge.data.dynamic -import hep.dataforge.data.update -import hep.dataforge.meta.* -import hep.dataforge.names.Name -import hep.dataforge.names.asName -import hep.dataforge.names.toName -import hep.dataforge.workspace.TaskModel.Companion.MODEL_TARGET_KEY - -//FIXME TaskModel should store individual propagation of all data elements, not just nodes - -/** - * A model for task execution - * @param name the name of the task - * @param meta the meta for the task (not for the whole configuration) - * @param dependencies a list of direct dependencies for this task - */ -public data class TaskModel( - val name: Name, - val meta: Meta, - val dependencies: Collection, -) : MetaRepr { - //TODO provide a way to get task descriptor - //TODO add pre-run check of task result type? - - override fun toMeta(): Meta = Meta { - "name" put name.toString() - "meta" put meta - "dependsOn" put { - val dataDependencies = dependencies.filterIsInstance() - val taskDependencies = dependencies.filterIsInstance>() - setIndexed("data".toName(), dataDependencies.map { it.toMeta() }) //Should list all data here - setIndexed( - "task".toName(), - taskDependencies.map { it.toMeta() }) { _, index -> taskDependencies[index].name.toString() } - //TODO ensure all dependencies are listed - } - } - - public companion object { - public val MODEL_TARGET_KEY: Name = "@target".asName() - } -} - -/** - * Build input for the task - */ -public suspend fun TaskModel.buildInput(workspace: Workspace): DataTree = DataTree.dynamic(workspace.context) { - dependencies.forEach { dep -> - update(dep.apply(workspace)) - } -} - -public interface TaskDependencyContainer { - public val defaultMeta: Meta - public fun add(dependency: Dependency) -} - -/** - * Add dependency for a task defined in a workspace and resolved by - */ -public fun TaskDependencyContainer.dependsOn( - name: Name, - placement: DataPlacement = DataPlacement.ALL, - meta: Meta = defaultMeta, -): WorkspaceTaskDependency = WorkspaceTaskDependency(name, meta, placement).also { add(it) } - -public fun TaskDependencyContainer.dependsOn( - name: String, - placement: DataPlacement = DataPlacement.ALL, - meta: Meta = defaultMeta, -): WorkspaceTaskDependency = dependsOn(name.toName(), placement, meta) - -public fun TaskDependencyContainer.dependsOn( - task: Task, - placement: DataPlacement = DataPlacement.ALL, - meta: Meta = defaultMeta, -): ExternalTaskDependency = ExternalTaskDependency(task, meta, placement).also { add(it) } - - -public fun TaskDependencyContainer.dependsOn( - task: Task, - placement: DataPlacement = DataPlacement.ALL, - metaBuilder: MetaBuilder.() -> Unit, -): ExternalTaskDependency = dependsOn(task, placement, Meta(metaBuilder)) - -/** - * Add custom data dependency - */ -public fun TaskDependencyContainer.data(action: DataPlacementScheme.() -> Unit): DataDependency = - DataDependency(DataPlacementScheme(action)).also { add(it) } - -/** - * User-friendly way to add data dependency - */ -public fun TaskDependencyContainer.data( - pattern: String? = null, - from: String? = null, - to: String? = null, -): DataDependency = data { - pattern?.let { this.pattern = it } - from?.let { this.from = it } - to?.let { this.to = it } -} - -///** -// * Add all data as root node -// */ -//public fun TaskDependencyContainer.allData(to: Name = Name.EMPTY): AllDataDependency = AllDataDependency(to).also { add(it) } - -/** - * A builder for [TaskModel] - */ -public class TaskModelBuilder(public val name: Name, meta: Meta = Meta.EMPTY) : TaskDependencyContainer { - /** - * Meta for current task. By default uses the whole input meta - */ - public var meta: MetaBuilder = meta.toMutableMeta() - private val dependencies: HashSet = HashSet() - - override val defaultMeta: Meta get() = meta - - override fun add(dependency: Dependency) { - dependencies.add(dependency) - } - - public var target: String by this.meta.string(key = MODEL_TARGET_KEY, default = "") - - - public fun build(): TaskModel = TaskModel(name, meta.seal(), dependencies) -} - - -public val TaskModel.target: String get() = meta[MODEL_TARGET_KEY]?.string ?: "" \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/WorkStage.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/WorkStage.kt new file mode 100644 index 00000000..663e7d4b --- /dev/null +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/WorkStage.kt @@ -0,0 +1,23 @@ +package hep.dataforge.workspace + +import hep.dataforge.meta.Meta +import hep.dataforge.meta.descriptors.Described +import hep.dataforge.misc.Type +import hep.dataforge.workspace.WorkStage.Companion.TYPE + +@Type(TYPE) +public interface WorkStage : Described { + + /** + * Compute a [StageDataSet] using given meta. In general, the result is lazy and represents both computation model + * and a handler for actual result + * + * @param workspace a workspace to run task model in + * @param meta configuration for current stage computation + */ + public suspend fun execute(workspace: Workspace, meta: Meta): StageDataSet + + public companion object { + public const val TYPE: String = "workspace.stage" + } +} \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Workspace.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Workspace.kt index a1c7d27c..738e574b 100644 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Workspace.kt +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Workspace.kt @@ -1,9 +1,7 @@ package hep.dataforge.workspace import hep.dataforge.context.ContextAware -import hep.dataforge.data.DataSet import hep.dataforge.meta.Meta -import hep.dataforge.meta.MetaBuilder import hep.dataforge.misc.Type import hep.dataforge.names.Name import hep.dataforge.names.toName @@ -15,7 +13,7 @@ public interface Workspace : ContextAware, Provider { /** * The whole data node for current workspace */ - public val data: DataSet + public val data: StageDataSet<*> /** * All targets associated with the workspace @@ -23,47 +21,46 @@ public interface Workspace : ContextAware, Provider { public val targets: Map /** - * All tasks associated with the workspace + * All stages associated with the workspace */ - public val tasks: Map> + public val stages: Map> override fun content(target: String): Map { return when (target) { "target", Meta.TYPE -> targets.mapKeys { it.key.toName() } - Task.TYPE -> tasks + WorkStage.TYPE -> stages //Data.TYPE -> data.flow().toMap() else -> emptyMap() } } - /** - * Invoke a task in the workspace utilizing caching if possible - */ - public suspend fun run(task: Task, config: Meta): DataSet { - val model = task.build(this, config) - task.validate(model) - return task.run(this, model) - } - public companion object { public const val TYPE: String = "workspace" } } -public suspend fun Workspace.run(task: Task<*>, target: String): DataSet { - val meta = targets[target] ?: error("A target with name $target not found in $this") - return run(task, meta) +public suspend fun Workspace.stage(taskName: Name, taskMeta: Meta): StageDataSet<*> { + val task = stages[taskName] ?: error("Task with name $taskName not found in the workspace") + return task.execute(this, taskMeta) } +public suspend fun Workspace.getData(taskName: Name, taskMeta: Meta, name: Name): StageData<*>? = + stage(taskName, taskMeta).getData(name) -public suspend fun Workspace.run(task: String, target: String): DataSet = - tasks[task.toName()]?.let { run(it, target) } ?: error("Task with name $task not found") - -public suspend fun Workspace.run(task: String, meta: Meta): DataSet = - tasks[task.toName()]?.let { run(it, meta) } ?: error("Task with name $task not found") - -public suspend fun Workspace.run(task: String, block: MetaBuilder.() -> Unit = {}): DataSet = - run(task, Meta(block)) - -public suspend fun Workspace.run(task: Task, metaBuilder: MetaBuilder.() -> Unit = {}): DataSet = - run(task, Meta(metaBuilder)) \ No newline at end of file +//public suspend fun Workspace.execute(task: WorkStage<*>, target: String): DataSet { +// val meta = targets[target] ?: error("A target with name $target not found in $this") +// return run(task, meta) +//} +// +// +//public suspend fun Workspace.execute(task: String, target: String): DataSet = +// stages[task.toName()]?.let { execute(it, target) } ?: error("Task with name $task not found") +// +//public suspend fun Workspace.execute(task: String, meta: Meta): DataSet = +// stages[task.toName()]?.let { run(it, meta) } ?: error("Task with name $task not found") +// +//public suspend fun Workspace.execute(task: String, block: MetaBuilder.() -> Unit = {}): DataSet = +// execute(task, Meta(block)) +// +//public suspend fun Workspace.execute(task: WorkStage, metaBuilder: MetaBuilder.() -> Unit = {}): DataSet = +// run(task, Meta(metaBuilder)) \ No newline at end of file diff --git a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt index 6c95bb64..4a8e0acd 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt @@ -1,11 +1,14 @@ package hep.dataforge.workspace import hep.dataforge.context.Context -import hep.dataforge.context.logger import hep.dataforge.data.* import hep.dataforge.meta.* import hep.dataforge.meta.descriptors.NodeDescriptor import hep.dataforge.names.Name +import hep.dataforge.workspace.old.GenericTask +import hep.dataforge.workspace.old.TaskModel +import hep.dataforge.workspace.old.TaskModelBuilder +import hep.dataforge.workspace.old.data import kotlin.reflect.KClass private typealias DataTransformation = suspend (context: Context, model: TaskModel, data: DataSet) -> DataSet @@ -187,7 +190,7 @@ public class TaskBuilder(private val name: Name, public val type: KClas logger.warn { "No transformation present, returning input data" } dataSet.castOrNull(type) ?: error("$type expected, but $type received") } else { - DataTree.dynamic(type, workspace.context){ + DataTree.active(type, workspace.context){ dataTransforms.forEach { transformation -> val res = transformation(workspace.context, model, dataSet) update(res) @@ -201,5 +204,5 @@ public class TaskBuilder(private val name: Name, public val type: KClas @DFExperimental public suspend inline fun TaskBuilder.TaskEnv.dataTree( - crossinline block: suspend MutableDataTree.() -> Unit, -): DataTree = DataTree.dynamic(context, block) \ No newline at end of file + crossinline block: suspend ActiveDataTree.() -> Unit, +): DataTree = DataTree.active(context, block) \ No newline at end of file diff --git a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/WorkspaceBuilder.kt b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/WorkspaceBuilder.kt index 48942ad5..e92950ff 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/WorkspaceBuilder.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/WorkspaceBuilder.kt @@ -3,7 +3,7 @@ package hep.dataforge.workspace import hep.dataforge.context.Context import hep.dataforge.context.ContextBuilder import hep.dataforge.context.Global -import hep.dataforge.data.MutableDataTree +import hep.dataforge.data.ActiveDataTree import hep.dataforge.meta.* import hep.dataforge.names.toName import kotlin.reflect.KClass @@ -12,8 +12,8 @@ import kotlin.reflect.KClass public interface WorkspaceBuilder { public val parentContext: Context public var context: Context - public var data: MutableDataTree - public var tasks: MutableSet> + public var data: ActiveDataTree + public var tasks: MutableSet> public var targets: MutableMap public fun build(): Workspace @@ -27,7 +27,7 @@ public fun WorkspaceBuilder.context(name: String = "WORKSPACE", block: ContextBu } public inline fun WorkspaceBuilder.data( - block: MutableDataTree.() -> Unit, + block: ActiveDataTree.() -> Unit, ): Unit{ data.apply(block) } @@ -59,21 +59,21 @@ public fun WorkspaceBuilder.task( public inline fun WorkspaceBuilder.task( name: String, noinline builder: TaskBuilder.() -> Unit, -): Task = task(name, T::class, builder) +): WorkStage = task(name, T::class, builder) @JvmName("rawTask") public fun WorkspaceBuilder.task( name: String, builder: TaskBuilder.() -> Unit, -): Task = task(name, Any::class, builder) +): WorkStage = task(name, Any::class, builder) /** * A builder for a simple workspace */ public class SimpleWorkspaceBuilder(override val parentContext: Context) : WorkspaceBuilder { override var context: Context = parentContext - override var data: MutableDataTree = MutableDataTree(Any::class, context) - override var tasks: MutableSet> = HashSet() + override var data: ActiveDataTree = ActiveDataTree(Any::class, context) + override var tasks: MutableSet> = HashSet() override var targets: MutableMap = HashMap() override fun build(): SimpleWorkspace { diff --git a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/WorkspacePlugin.kt b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/WorkspacePlugin.kt index 9079139d..0ccaccf3 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/WorkspacePlugin.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/WorkspacePlugin.kt @@ -1,26 +1,26 @@ package hep.dataforge.workspace import hep.dataforge.context.AbstractPlugin -import hep.dataforge.context.toMap import hep.dataforge.names.Name import hep.dataforge.names.toName +import hep.dataforge.workspace.old.GenericTask import kotlin.reflect.KClass /** * An abstract plugin with some additional boilerplate to effectively work with workspace context */ public abstract class WorkspacePlugin : AbstractPlugin() { - private val _tasks = HashSet>() - public val tasks: Collection> get() = _tasks + private val _tasks = HashSet>() + public val tasks: Collection> get() = _tasks override fun content(target: String): Map { return when (target) { - Task.TYPE -> tasks.toMap() + WorkStage.TYPE -> tasks.toMap() else -> emptyMap() } } - public fun task(task: Task<*>){ + public fun task(task: WorkStage<*>){ _tasks.add(task) } diff --git a/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/DataPropagationTest.kt b/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/DataPropagationTest.kt index 9e98a276..ba8db797 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/DataPropagationTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/DataPropagationTest.kt @@ -5,6 +5,7 @@ import hep.dataforge.context.PluginFactory import hep.dataforge.context.PluginTag import hep.dataforge.data.* import hep.dataforge.meta.Meta +import hep.dataforge.workspace.old.data import kotlinx.coroutines.flow.firstOrNull import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.reduce @@ -23,7 +24,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() { data() } transform { data -> - DataTree.dynamic(context) { + DataTree.active(context) { val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair } data("result", result) } @@ -36,7 +37,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() { data(pattern = "myData\\[12\\]") } transform { data -> - DataTree.dynamic(context) { + DataTree.active(context) { val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair } data("result", result) } @@ -48,7 +49,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() { data(pattern = "myData.*") } transform { data -> - DataTree.dynamic(context) { + DataTree.active(context) { val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair } data("result", result) } @@ -81,7 +82,7 @@ class DataPropagationTest { @Test fun testAllData() { runBlocking { - val node = testWorkspace.run("Test.allData") + val node = testWorkspace.execute("Test.allData") assertEquals(4950, node.first()!!.value()) } } @@ -89,7 +90,7 @@ class DataPropagationTest { @Test fun testAllRegexData() { runBlocking { - val node = testWorkspace.run("Test.allRegexData") + val node = testWorkspace.execute("Test.allRegexData") assertEquals(4950, node.first()!!.value()) } } @@ -97,7 +98,7 @@ class DataPropagationTest { @Test fun testSingleData() { runBlocking { - val node = testWorkspace.run("Test.singleData") + val node = testWorkspace.execute("Test.singleData") assertEquals(12, node.first()!!.value()) } } diff --git a/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/SimpleWorkspaceTest.kt b/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/SimpleWorkspaceTest.kt index 817f37b6..11562a31 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/SimpleWorkspaceTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/SimpleWorkspaceTest.kt @@ -4,6 +4,8 @@ import hep.dataforge.context.* import hep.dataforge.data.* import hep.dataforge.meta.* import hep.dataforge.names.plus +import hep.dataforge.workspace.old.data +import hep.dataforge.workspace.old.dependsOn import kotlinx.coroutines.flow.collect import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Timeout @@ -23,7 +25,7 @@ public inline fun P.toFactory(): PluginFactory

= object } public fun Workspace.runBlocking(task: String, block: MetaBuilder.() -> Unit = {}): DataSet = runBlocking{ - run(task, block) + execute(task, block) } @@ -166,7 +168,7 @@ class SimpleWorkspaceTest { @Test fun testPluginTask() { - val tasks = workspace.tasks + val tasks = workspace.stages assertTrue { tasks["test.test"] != null } //val node = workspace.run("test.test", "empty") } @@ -174,7 +176,7 @@ class SimpleWorkspaceTest { @Test fun testFullSquare() { runBlocking { - val node = workspace.run("fullsquare") + val node = workspace.execute("fullsquare") println(node.toMeta()) } }