[WIP] redo Workspace

This commit is contained in:
Alexander Nozik 2021-01-17 19:11:49 +03:00
parent 7e4d1af55f
commit ac8631e3a0
26 changed files with 256 additions and 400 deletions

View File

@ -11,11 +11,11 @@ import kotlinx.coroutines.sync.withLock
import kotlin.reflect.KClass import kotlin.reflect.KClass
/** /**
* A mutable [DataTree.Companion.dynamic]. It * A mutable [DataTree.Companion.active]. It
*/ */
public class MutableDataTree<T : Any>( public class ActiveDataTree<T : Any>(
override val dataType: KClass<out T>, override val dataType: KClass<out T>,
) : DataTree<T>, DataSetBuilder<T> { ) : DataTree<T>, DataSetBuilder<T>, ActiveDataSet<T> {
private val mutex = Mutex() private val mutex = Mutex()
private val treeItems = HashMap<NameToken, DataTreeItem<T>>() private val treeItems = HashMap<NameToken, DataTreeItem<T>>()
@ -38,37 +38,24 @@ public class MutableDataTree<T : Any>(
override suspend fun remove(name: Name) { override suspend fun remove(name: Name) {
if (name.isEmpty()) error("Can't remove the root node") if (name.isEmpty()) error("Can't remove the root node")
(getItem(name.cutLast()).tree as? MutableDataTree)?.remove(name.lastOrNull()!!) (getItem(name.cutLast()).tree as? ActiveDataTree)?.remove(name.lastOrNull()!!)
} }
// private suspend fun set(token: NameToken, node: DataSet<T>) {
// //if (_map.containsKey(token)) error("Tree entry with name $token is not empty")
// mutex.withLock {
// treeItems[token] = DataTreeItem.Node(node.toMutableTree())
// coroutineScope {
// node.updates.onEach {
// _updates.emit(token + it)
// }.launchIn(this)
// }
// _updates.emit(token.asName())
// }
// }
private suspend fun set(token: NameToken, data: Data<T>) { private suspend fun set(token: NameToken, data: Data<T>) {
mutex.withLock { mutex.withLock {
treeItems[token] = DataTreeItem.Leaf(data) treeItems[token] = DataTreeItem.Leaf(data)
} }
} }
private suspend fun getOrCreateNode(token: NameToken): MutableDataTree<T> = private suspend fun getOrCreateNode(token: NameToken): ActiveDataTree<T> =
(treeItems[token] as? DataTreeItem.Node<T>)?.tree as? MutableDataTree<T> (treeItems[token] as? DataTreeItem.Node<T>)?.tree as? ActiveDataTree<T>
?: MutableDataTree(dataType).also { ?: ActiveDataTree(dataType).also {
mutex.withLock { mutex.withLock {
treeItems[token] = DataTreeItem.Node(it) treeItems[token] = DataTreeItem.Node(it)
} }
} }
private suspend fun getOrCreateNode(name: Name): MutableDataTree<T> { private suspend fun getOrCreateNode(name: Name): ActiveDataTree<T> {
return when (name.length) { return when (name.length) {
0 -> this 0 -> this
1 -> getOrCreateNode(name.firstOrNull()!!) 1 -> getOrCreateNode(name.firstOrNull()!!)
@ -90,7 +77,7 @@ public class MutableDataTree<T : Any>(
} }
/** /**
* Copy given data set and mirror its changes to this [MutableDataTree] in [this@setAndObserve]. Returns an update [Job] * Copy given data set and mirror its changes to this [ActiveDataTree] in [this@setAndObserve]. Returns an update [Job]
*/ */
public fun CoroutineScope.setAndObserve(name: Name, dataSet: DataSet<T>): Job = launch { public fun CoroutineScope.setAndObserve(name: Name, dataSet: DataSet<T>): Job = launch {
set(name, dataSet) set(name, dataSet)
@ -103,26 +90,26 @@ public class MutableDataTree<T : Any>(
/** /**
* Create a dynamic tree. Initial data is placed synchronously. Updates are propagated via [updatesScope] * Create a dynamic tree. Initial data is placed synchronously. Updates are propagated via [updatesScope]
*/ */
public suspend fun <T : Any> DataTree.Companion.dynamic( public suspend fun <T : Any> DataTree.Companion.active(
type: KClass<out T>, type: KClass<out T>,
block: suspend MutableDataTree<T>.() -> Unit, block: suspend ActiveDataTree<T>.() -> Unit,
): DataTree<T> { ): DataTree<T> {
val tree = MutableDataTree(type) val tree = ActiveDataTree(type)
tree.block() tree.block()
return tree return tree
} }
public suspend inline fun <reified T : Any> DataTree.Companion.dynamic( public suspend inline fun <reified T : Any> DataTree.Companion.active(
crossinline block: suspend MutableDataTree<T>.() -> Unit, crossinline block: suspend ActiveDataTree<T>.() -> Unit,
): DataTree<T> = MutableDataTree(T::class).apply { block() } ): DataTree<T> = ActiveDataTree(T::class).apply { block() }
public suspend inline fun <reified T : Any> MutableDataTree<T>.set( public suspend inline fun <reified T : Any> ActiveDataTree<T>.set(
name: Name, name: Name,
noinline block: suspend MutableDataTree<T>.() -> Unit, noinline block: suspend ActiveDataTree<T>.() -> Unit,
): Unit = set(name, DataTree.dynamic(T::class, block)) ): Unit = set(name, DataTree.active(T::class, block))
public suspend inline fun <reified T : Any> MutableDataTree<T>.set( public suspend inline fun <reified T : Any> ActiveDataTree<T>.set(
name: String, name: String,
noinline block: suspend MutableDataTree<T>.() -> Unit, noinline block: suspend ActiveDataTree<T>.() -> Unit,
): Unit = set(name.toName(), DataTree.dynamic(T::class, block)) ): Unit = set(name.toName(), DataTree.active(T::class, block))

View File

@ -34,7 +34,7 @@ public abstract class CachingAction<in T : Any, out R : Any>(
dataSet: DataSet<T>, dataSet: DataSet<T>,
meta: Meta, meta: Meta,
scope: CoroutineScope?, scope: CoroutineScope?,
): DataSet<R> = DataTree.dynamic(outputType) { ): DataSet<R> = DataTree.active(outputType) {
coroutineScope { coroutineScope {
collectFrom(transform(dataSet, meta)) collectFrom(transform(dataSet, meta))
} }

View File

@ -3,9 +3,7 @@ package hep.dataforge.data
import hep.dataforge.meta.Meta import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaRepr import hep.dataforge.meta.MetaRepr
import hep.dataforge.meta.isEmpty import hep.dataforge.meta.isEmpty
import hep.dataforge.misc.Named
import hep.dataforge.misc.Type import hep.dataforge.misc.Type
import hep.dataforge.names.Name
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.EmptyCoroutineContext
@ -91,27 +89,7 @@ public inline fun <reified T : Any> Data(
noinline block: suspend CoroutineScope.() -> T, noinline block: suspend CoroutineScope.() -> T,
): Data<T> = Data(T::class, meta, context, dependencies, block) ): Data<T> = Data(T::class, meta, context, dependencies, block)
public class NamedData<out T : Any> internal constructor(
override val name: Name,
public val data: Data<T>,
) : Data<T> by data, Named {
override fun toString(): String = buildString {
append("NamedData(name=\"$name\"")
if(data is StaticData){
append(", value=${data.value}")
}
if(!data.meta.isEmpty()){
append(", meta=${data.meta}")
}
append(")")
}
}
public fun <T : Any> Data<T>.named(name: Name): NamedData<T> = if (this is NamedData) {
NamedData(name, this.data)
} else {
NamedData(name, this)
}
public fun <T : Any, R : Any> Data<T>.map( public fun <T : Any, R : Any> Data<T>.map(
outputType: KClass<out R>, outputType: KClass<out R>,

View File

@ -34,6 +34,12 @@ public interface DataSet<out T : Any> {
public suspend fun listChildren(prefix: Name = Name.EMPTY): List<Name> = public suspend fun listChildren(prefix: Name = Name.EMPTY): List<Name> =
flow().map { it.name }.filter { it.startsWith(prefix) && (it.length == prefix.length + 1) }.toList() flow().map { it.name }.filter { it.startsWith(prefix) && (it.length == prefix.length + 1) }.toList()
public companion object {
public val META_KEY: Name = "@meta".asName()
}
}
public interface ActiveDataSet<T: Any>: DataSet<T>{
/** /**
* A flow of updated item names. Updates are propagated in a form of [Flow] of names of updated nodes. * A flow of updated item names. Updates are propagated in a form of [Flow] of names of updated nodes.
* Those can include new data items and replacement of existing ones. The replaced items could update existing data content * Those can include new data items and replacement of existing ones. The replaced items could update existing data content
@ -41,11 +47,9 @@ public interface DataSet<out T : Any> {
* *
*/ */
public val updates: Flow<Name> public val updates: Flow<Name>
}
public companion object { public val <T: Any> DataSet<T>.updates: Flow<Name> get() = if(this is ActiveDataSet) updates else emptyFlow()
public val META_KEY: Name = "@meta".asName()
}
}
/** /**
* Flow all data nodes with names starting with [branchName] * Flow all data nodes with names starting with [branchName]

View File

@ -31,19 +31,6 @@ public interface DataTree<out T : Any> : DataSet<T> {
*/ */
public suspend fun items(): Map<NameToken, DataTreeItem<T>> public suspend fun items(): Map<NameToken, DataTreeItem<T>>
// override fun flow(): Flow<NamedData<T>> = flow flowBuilder@{
// val item = getItem(root) ?: return@flowBuilder
// when (item) {
// is DataTreeItem.Leaf -> emit(item.data.named(root))
// is DataTreeItem.Node -> item.tree.items().forEach { (token, childItem: DataTreeItem<T>) ->
// when (childItem) {
// is DataTreeItem.Leaf -> emit(childItem.data.named(root + token))
// is DataTreeItem.Node -> emitAll(childItem.tree.flow().map { it.named(root + token + it.name) })
// }
// }
// }
// }
override fun flow(): Flow<NamedData<T>> = flow { override fun flow(): Flow<NamedData<T>> = flow {
items().forEach { (token, childItem: DataTreeItem<T>) -> items().forEach { (token, childItem: DataTreeItem<T>) ->
if(!token.body.startsWith("@")) { if(!token.body.startsWith("@")) {
@ -104,7 +91,5 @@ public fun <T : Any> DataTree<T>.itemFlow(): Flow<Pair<Name, DataTreeItem<T>>> =
public fun <T : Any> DataTree<T>.branch(branchName: Name): DataTree<T> = object : DataTree<T> { public fun <T : Any> DataTree<T>.branch(branchName: Name): DataTree<T> = object : DataTree<T> {
override val dataType: KClass<out T> get() = this@branch.dataType override val dataType: KClass<out T> get() = this@branch.dataType
override val updates: Flow<Name> = this@branch.updates.mapNotNull { it.removeHeadOrNull(branchName) }
override suspend fun items(): Map<NameToken, DataTreeItem<T>> = getItem(branchName).tree?.items() ?: emptyMap() override suspend fun items(): Map<NameToken, DataTreeItem<T>> = getItem(branchName).tree?.items() ?: emptyMap()
} }

View File

@ -37,11 +37,11 @@ public interface GroupRule {
object : GroupRule { object : GroupRule {
override suspend fun <T : Any> gather(dataType: KClass<out T>, set: DataSet<T>): Map<String, DataSet<T>> { override suspend fun <T : Any> gather(dataType: KClass<out T>, set: DataSet<T>): Map<String, DataSet<T>> {
val map = HashMap<String, MutableDataTree<T>>() val map = HashMap<String, ActiveDataTree<T>>()
set.flow().collect { data -> set.flow().collect { data ->
val tagValue = data.meta[key]?.string ?: defaultTagValue val tagValue = data.meta[key]?.string ?: defaultTagValue
map.getOrPut(tagValue) { MutableDataTree(dataType) }.set(data.name, data.data) map.getOrPut(tagValue) { ActiveDataTree(dataType) }.set(data.name, data.data)
} }
return map return map

View File

@ -67,7 +67,7 @@ public class MapAction<in T : Any, out R : Any>(
val flow = dataSet.flow().map(::mapOne) val flow = dataSet.flow().map(::mapOne)
return DataTree.dynamic(outputType) { return DataTree.active(outputType) {
collectFrom(flow) collectFrom(flow)
scope?.launch { scope?.launch {
dataSet.updates.collect { name -> dataSet.updates.collect { name ->

View File

@ -0,0 +1,32 @@
package hep.dataforge.data
import hep.dataforge.meta.isEmpty
import hep.dataforge.misc.Named
import hep.dataforge.names.Name
public interface NamedData<out T : Any> : Named, Data<T> {
override val name: Name
public val data: Data<T>
}
private class NamedDataImpl<out T : Any>(
override val name: Name,
override val data: Data<T>,
) : Data<T> by data, NamedData<T> {
override fun toString(): String = buildString {
append("NamedData(name=\"$name\"")
if (data is StaticData) {
append(", value=${data.value}")
}
if (!data.meta.isEmpty()) {
append(", meta=${data.meta}")
}
append(")")
}
}
public fun <T : Any> Data<T>.named(name: Name): NamedData<T> = if (this is NamedData) {
NamedDataImpl(name, this.data)
} else {
NamedDataImpl(name, this)
}

View File

@ -62,7 +62,7 @@ public class SplitAction<T : Any, R : Any>(
} }
} }
return DataTree.dynamic(outputType) { return DataTree.active(outputType) {
collectFrom(dataSet.flow().flatMapConcat(transform = ::splitOne)) collectFrom(dataSet.flow().flatMapConcat(transform = ::splitOne))
scope?.launch { scope?.launch {
dataSet.updates.collect { name -> dataSet.updates.collect { name ->

View File

@ -12,8 +12,6 @@ internal class StaticDataTree<T : Any>(
private val items: MutableMap<NameToken, DataTreeItem<T>> = HashMap() private val items: MutableMap<NameToken, DataTreeItem<T>> = HashMap()
override val updates: Flow<Name> = emptyFlow()
override suspend fun items(): Map<NameToken, DataTreeItem<T>> = items.filter { !it.key.body.startsWith("@") } override suspend fun items(): Map<NameToken, DataTreeItem<T>> = items.filter { !it.key.body.startsWith("@") }
override suspend fun remove(name: Name) { override suspend fun remove(name: Name) {
@ -61,15 +59,15 @@ internal class StaticDataTree<T : Any>(
} }
} }
public suspend fun <T : Any> DataTree.Companion.static( public suspend fun <T : Any> DataTree(
dataType: KClass<out T>, dataType: KClass<out T>,
block: suspend DataSetBuilder<T>.() -> Unit, block: suspend DataSetBuilder<T>.() -> Unit,
): DataTree<T> = StaticDataTree(dataType).apply { block() } ): DataTree<T> = StaticDataTree(dataType).apply { block() }
public suspend inline fun <reified T : Any> DataTree.Companion.static( public suspend inline fun <reified T : Any> DataTree(
noinline block: suspend DataSetBuilder<T>.() -> Unit, noinline block: suspend DataSetBuilder<T>.() -> Unit,
): DataTree<T> = static(T::class, block) ): DataTree<T> = DataTree(T::class, block)
public suspend fun <T : Any> DataSet<T>.toStaticTree(): DataTree<T> = StaticDataTree(dataType).apply { public suspend fun <T : Any> DataSet<T>.seal(): DataTree<T> = DataTree(dataType){
update(this@toStaticTree) update(this@seal)
} }

View File

@ -35,7 +35,7 @@ internal class DataTreeBuilderTest {
fun testDynamicUpdates() = runBlocking { fun testDynamicUpdates() = runBlocking {
try { try {
supervisorScope { supervisorScope {
val subNode = DataTree.dynamic<Int> { val subNode = DataTree.active<Int> {
launch { launch {
repeat(10) { repeat(10) {
delay(10) delay(10)
@ -48,7 +48,7 @@ internal class DataTreeBuilderTest {
println(it) println(it)
} }
} }
val rootNode = DataTree.dynamic<Int> { val rootNode = DataTree.active<Int> {
setAndObserve("sub".toName(), subNode) setAndObserve("sub".toName(), subNode)
} }

View File

@ -5,7 +5,6 @@ import hep.dataforge.meta.get
import hep.dataforge.meta.int import hep.dataforge.meta.int
import hep.dataforge.workspace.SimpleWorkspaceBuilder import hep.dataforge.workspace.SimpleWorkspaceBuilder
import hep.dataforge.workspace.context import hep.dataforge.workspace.context
import hep.dataforge.workspace.target
import kotlin.test.Test import kotlin.test.Test
import kotlin.test.assertEquals import kotlin.test.assertEquals

View File

@ -25,7 +25,7 @@ public abstract class TaskDependency<out T : Any>(
public val meta: Meta, public val meta: Meta,
protected val placement: DataPlacement, protected val placement: DataPlacement,
) : Dependency() { ) : Dependency() {
public abstract fun resolveTask(workspace: Workspace): Task<T> public abstract fun resolveTask(workspace: Workspace): WorkStage<T>
/** /**
* A name of the dependency for logging and serialization * A name of the dependency for logging and serialization
@ -40,11 +40,11 @@ public abstract class TaskDependency<out T : Any>(
} }
public class ExternalTaskDependency<T : Any>( public class ExternalTaskDependency<T : Any>(
public val task: Task<T>, public val task: WorkStage<T>,
meta: Meta, meta: Meta,
placement: DataPlacement, placement: DataPlacement,
) : TaskDependency<T>(meta, placement) { ) : TaskDependency<T>(meta, placement) {
override fun resolveTask(workspace: Workspace): Task<T> = task override fun resolveTask(workspace: Workspace): WorkStage<T> = task
override val name: Name get() = EXTERNAL_TASK_NAME + task.name override val name: Name get() = EXTERNAL_TASK_NAME + task.name
@ -64,7 +64,7 @@ public class WorkspaceTaskDependency(
meta: Meta, meta: Meta,
placement: DataPlacement, placement: DataPlacement,
) : TaskDependency<Any>(meta, placement) { ) : TaskDependency<Any>(meta, placement) {
override fun resolveTask(workspace: Workspace): Task<*> = workspace.tasks[name] override fun resolveTask(workspace: Workspace): WorkStage<*> = workspace.stages[name]
?: error("Task with name $name is not found in the workspace") ?: error("Task with name $name is not found in the workspace")
override fun toMeta(): Meta { override fun toMeta(): Meta {

View File

@ -1,55 +0,0 @@
package hep.dataforge.workspace
import hep.dataforge.context.logger
import hep.dataforge.data.DataSet
import hep.dataforge.meta.Meta
import hep.dataforge.meta.descriptors.NodeDescriptor
import hep.dataforge.meta.get
import hep.dataforge.meta.node
import hep.dataforge.names.Name
import kotlin.reflect.KClass
//data class TaskEnv(val workspace: Workspace, val model: TaskModel)
public class GenericTask<R : Any>(
override val name: Name,
override val type: KClass<out R>,
override val descriptor: NodeDescriptor,
private val modelTransform: TaskModelBuilder.(Meta) -> Unit,
private val dataTransform: Workspace.() -> suspend TaskModel.(DataSet<Any>) -> DataSet<R>
) : Task<R> {
override suspend fun run(workspace: Workspace, model: TaskModel): DataSet<R> {
//validate model
validate(model)
// gather data
val input = model.buildInput(workspace)// gather(workspace, model)
//execute
workspace.logger.info{"Starting task '$name' on ${model.target} with meta: \n${model.meta}"}
val output = dataTransform(workspace).invoke(model, input)
//handle result
//output.handle(model.context.dispatcher) { this.handle(it) }
return output
}
/**
* Build new TaskModel and apply specific model transformation for this
* task. By default model uses the meta node with the same node as the name of the task.
*
* @param workspace
* @param taskMeta
* @return
*/
override fun build(workspace: Workspace, taskMeta: Meta): TaskModel {
val taskMeta = taskMeta[name]?.node ?: taskMeta
val builder = TaskModelBuilder(name, taskMeta)
builder.modelTransform(taskMeta)
return builder.build()
}
//TODO add validation
}

View File

@ -2,8 +2,7 @@ package hep.dataforge.workspace
import hep.dataforge.context.Context import hep.dataforge.context.Context
import hep.dataforge.context.gather import hep.dataforge.context.gather
import hep.dataforge.context.toMap import hep.dataforge.data.DataSet
import hep.dataforge.data.DataTree
import hep.dataforge.meta.Meta import hep.dataforge.meta.Meta
import hep.dataforge.names.Name import hep.dataforge.names.Name
@ -13,13 +12,13 @@ import hep.dataforge.names.Name
*/ */
public class SimpleWorkspace( public class SimpleWorkspace(
override val context: Context, override val context: Context,
override val data: DataTree<Any>, override val data: DataSet<Any>,
override val targets: Map<String, Meta>, override val targets: Map<String, Meta>,
tasks: Collection<Task<Any>> stages: Map<Name, WorkStage<Any>>
) : Workspace { ) : Workspace {
override val tasks: Map<Name, Task<*>> by lazy { override val stages: Map<Name, WorkStage<*>> by lazy {
context.gather<Task<*>>(Task.TYPE) + tasks.toMap() context.gather<WorkStage<*>>(WorkStage.TYPE) + stages
} }
public companion object { public companion object {

View File

@ -0,0 +1,47 @@
package hep.dataforge.workspace
import hep.dataforge.data.Data
import hep.dataforge.data.NamedData
import hep.dataforge.meta.Meta
import hep.dataforge.names.Name
/**
* A [Workspace]-locked [NamedData], that serves as a computation model.
*/
public interface StageData<out T : Any> : NamedData<T> {
/**
* The [Workspace] this data belongs to
*/
public val workspace: Workspace
/**
* The name of the stage that produced this data. [Name.EMPTY] if the workspace intrinsic data is used.
*/
public val stage: Name
/**
* Stage configuration used to produce this data.
*/
public val stageMeta: Meta
/**
* Dependencies that allow to compute transitive dependencies as well.
*/
override val dependencies: Collection<StageData<*>>
}
private class StageDataImpl<out T : Any>(
override val workspace: Workspace,
override val data: Data<T>,
override val name: Name,
override val stage: Name,
override val stageMeta: Meta,
) : StageData<T>, Data<T> by data {
override val dependencies: Collection<StageData<*>> = data.dependencies.map {
it as? StageData<*> ?: error("StageData can't depend on external data")
}
}
internal fun <T : Any> Workspace.internalize(data: Data<T>, name: Name, stage: Name, stageMeta: Meta): StageData<T> =
StageDataImpl(this, data, name, stage, stageMeta)

View File

@ -0,0 +1,46 @@
package hep.dataforge.workspace
import hep.dataforge.data.DataSet
import hep.dataforge.meta.Meta
import hep.dataforge.names.Name
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
/**
* A result of a [WorkStage]
*/
public interface StageDataSet<out T : Any> : DataSet<T> {
/**
* The [Workspace] this [DataSet] belongs to
*/
public val workspace: Workspace
/**
* The [Name] of the stage that produced this [DataSet]
*/
public val stageName: Name
/**
* The configuration of the stage that produced this [DataSet]
*/
public val stageMeta: Meta
override fun flow(): Flow<StageData<T>>
override suspend fun getData(name: Name): StageData<T>?
}
private class StageDataSetImpl<out T : Any>(
override val workspace: Workspace,
val dataSet: DataSet<T>,
override val stageName: Name,
override val stageMeta: Meta,
) : StageDataSet<T>, DataSet<T> by dataSet {
override fun flow(): Flow<StageData<T>> = dataSet.flow().map {
workspace.internalize(it, it.name, stageName, stageMeta)
}
override suspend fun getData(name: Name): StageData<T>? = dataSet.getData(name)?.let {
workspace.internalize(it, name, stageName, stageMeta)
}
}

View File

@ -1,49 +0,0 @@
package hep.dataforge.workspace
import hep.dataforge.data.DataSet
import hep.dataforge.meta.Meta
import hep.dataforge.meta.descriptors.Described
import hep.dataforge.misc.Type
import hep.dataforge.workspace.Task.Companion.TYPE
import kotlin.reflect.KClass
@Type(TYPE)
public interface Task<out R : Any> : Described {
/**
* The explicit type of the node returned by the task
*/
public val type: KClass<out R>
/**
* Build a model for this task. Does not run any computations unless task [isEager]
*
* @param workspace
* @param taskMeta
* @return
*/
public fun build(workspace: Workspace, taskMeta: Meta): TaskModel
/**
* Check if the model is valid and is acceptable by the task. Throw exception if not.
*
* @param model
*/
public fun validate(model: TaskModel) {
if(this.name != model.name) error("The task $name could not be run with model from task ${model.name}")
}
/**
* Run given task model. Type check expected to be performed before actual
* calculation.
*
* @param workspace - a workspace to run task model in
* @param model - a model to be executed
* @return
*/
public suspend fun run(workspace: Workspace, model: TaskModel): DataSet<R>
public companion object {
public const val TYPE: String = "task"
}
}

View File

@ -1,141 +0,0 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package hep.dataforge.workspace
import hep.dataforge.data.DataTree
import hep.dataforge.data.dynamic
import hep.dataforge.data.update
import hep.dataforge.meta.*
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import hep.dataforge.names.toName
import hep.dataforge.workspace.TaskModel.Companion.MODEL_TARGET_KEY
//FIXME TaskModel should store individual propagation of all data elements, not just nodes
/**
* A model for task execution
* @param name the name of the task
* @param meta the meta for the task (not for the whole configuration)
* @param dependencies a list of direct dependencies for this task
*/
public data class TaskModel(
val name: Name,
val meta: Meta,
val dependencies: Collection<Dependency>,
) : MetaRepr {
//TODO provide a way to get task descriptor
//TODO add pre-run check of task result type?
override fun toMeta(): Meta = Meta {
"name" put name.toString()
"meta" put meta
"dependsOn" put {
val dataDependencies = dependencies.filterIsInstance<DataDependency>()
val taskDependencies = dependencies.filterIsInstance<TaskDependency<*>>()
setIndexed("data".toName(), dataDependencies.map { it.toMeta() }) //Should list all data here
setIndexed(
"task".toName(),
taskDependencies.map { it.toMeta() }) { _, index -> taskDependencies[index].name.toString() }
//TODO ensure all dependencies are listed
}
}
public companion object {
public val MODEL_TARGET_KEY: Name = "@target".asName()
}
}
/**
* Build input for the task
*/
public suspend fun TaskModel.buildInput(workspace: Workspace): DataTree<Any> = DataTree.dynamic(workspace.context) {
dependencies.forEach { dep ->
update(dep.apply(workspace))
}
}
public interface TaskDependencyContainer {
public val defaultMeta: Meta
public fun add(dependency: Dependency)
}
/**
* Add dependency for a task defined in a workspace and resolved by
*/
public fun TaskDependencyContainer.dependsOn(
name: Name,
placement: DataPlacement = DataPlacement.ALL,
meta: Meta = defaultMeta,
): WorkspaceTaskDependency = WorkspaceTaskDependency(name, meta, placement).also { add(it) }
public fun TaskDependencyContainer.dependsOn(
name: String,
placement: DataPlacement = DataPlacement.ALL,
meta: Meta = defaultMeta,
): WorkspaceTaskDependency = dependsOn(name.toName(), placement, meta)
public fun <T : Any> TaskDependencyContainer.dependsOn(
task: Task<T>,
placement: DataPlacement = DataPlacement.ALL,
meta: Meta = defaultMeta,
): ExternalTaskDependency<T> = ExternalTaskDependency(task, meta, placement).also { add(it) }
public fun <T : Any> TaskDependencyContainer.dependsOn(
task: Task<T>,
placement: DataPlacement = DataPlacement.ALL,
metaBuilder: MetaBuilder.() -> Unit,
): ExternalTaskDependency<T> = dependsOn(task, placement, Meta(metaBuilder))
/**
* Add custom data dependency
*/
public fun TaskDependencyContainer.data(action: DataPlacementScheme.() -> Unit): DataDependency =
DataDependency(DataPlacementScheme(action)).also { add(it) }
/**
* User-friendly way to add data dependency
*/
public fun TaskDependencyContainer.data(
pattern: String? = null,
from: String? = null,
to: String? = null,
): DataDependency = data {
pattern?.let { this.pattern = it }
from?.let { this.from = it }
to?.let { this.to = it }
}
///**
// * Add all data as root node
// */
//public fun TaskDependencyContainer.allData(to: Name = Name.EMPTY): AllDataDependency = AllDataDependency(to).also { add(it) }
/**
* A builder for [TaskModel]
*/
public class TaskModelBuilder(public val name: Name, meta: Meta = Meta.EMPTY) : TaskDependencyContainer {
/**
* Meta for current task. By default uses the whole input meta
*/
public var meta: MetaBuilder = meta.toMutableMeta()
private val dependencies: HashSet<Dependency> = HashSet()
override val defaultMeta: Meta get() = meta
override fun add(dependency: Dependency) {
dependencies.add(dependency)
}
public var target: String by this.meta.string(key = MODEL_TARGET_KEY, default = "")
public fun build(): TaskModel = TaskModel(name, meta.seal(), dependencies)
}
public val TaskModel.target: String get() = meta[MODEL_TARGET_KEY]?.string ?: ""

View File

@ -0,0 +1,23 @@
package hep.dataforge.workspace
import hep.dataforge.meta.Meta
import hep.dataforge.meta.descriptors.Described
import hep.dataforge.misc.Type
import hep.dataforge.workspace.WorkStage.Companion.TYPE
@Type(TYPE)
public interface WorkStage<out R : Any> : Described {
/**
* Compute a [StageDataSet] using given meta. In general, the result is lazy and represents both computation model
* and a handler for actual result
*
* @param workspace a workspace to run task model in
* @param meta configuration for current stage computation
*/
public suspend fun execute(workspace: Workspace, meta: Meta): StageDataSet<R>
public companion object {
public const val TYPE: String = "workspace.stage"
}
}

View File

@ -1,9 +1,7 @@
package hep.dataforge.workspace package hep.dataforge.workspace
import hep.dataforge.context.ContextAware import hep.dataforge.context.ContextAware
import hep.dataforge.data.DataSet
import hep.dataforge.meta.Meta import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
import hep.dataforge.misc.Type import hep.dataforge.misc.Type
import hep.dataforge.names.Name import hep.dataforge.names.Name
import hep.dataforge.names.toName import hep.dataforge.names.toName
@ -15,7 +13,7 @@ public interface Workspace : ContextAware, Provider {
/** /**
* The whole data node for current workspace * The whole data node for current workspace
*/ */
public val data: DataSet<Any> public val data: StageDataSet<*>
/** /**
* All targets associated with the workspace * All targets associated with the workspace
@ -23,47 +21,46 @@ public interface Workspace : ContextAware, Provider {
public val targets: Map<String, Meta> public val targets: Map<String, Meta>
/** /**
* All tasks associated with the workspace * All stages associated with the workspace
*/ */
public val tasks: Map<Name, Task<*>> public val stages: Map<Name, WorkStage<*>>
override fun content(target: String): Map<Name, Any> { override fun content(target: String): Map<Name, Any> {
return when (target) { return when (target) {
"target", Meta.TYPE -> targets.mapKeys { it.key.toName() } "target", Meta.TYPE -> targets.mapKeys { it.key.toName() }
Task.TYPE -> tasks WorkStage.TYPE -> stages
//Data.TYPE -> data.flow().toMap() //Data.TYPE -> data.flow().toMap()
else -> emptyMap() else -> emptyMap()
} }
} }
/**
* Invoke a task in the workspace utilizing caching if possible
*/
public suspend fun <R : Any> run(task: Task<R>, config: Meta): DataSet<R> {
val model = task.build(this, config)
task.validate(model)
return task.run(this, model)
}
public companion object { public companion object {
public const val TYPE: String = "workspace" public const val TYPE: String = "workspace"
} }
} }
public suspend fun Workspace.run(task: Task<*>, target: String): DataSet<Any> { public suspend fun Workspace.stage(taskName: Name, taskMeta: Meta): StageDataSet<*> {
val meta = targets[target] ?: error("A target with name $target not found in $this") val task = stages[taskName] ?: error("Task with name $taskName not found in the workspace")
return run(task, meta) return task.execute(this, taskMeta)
} }
public suspend fun Workspace.getData(taskName: Name, taskMeta: Meta, name: Name): StageData<*>? =
stage(taskName, taskMeta).getData(name)
public suspend fun Workspace.run(task: String, target: String): DataSet<Any> = //public suspend fun Workspace.execute(task: WorkStage<*>, target: String): DataSet<Any> {
tasks[task.toName()]?.let { run(it, target) } ?: error("Task with name $task not found") // val meta = targets[target] ?: error("A target with name $target not found in $this")
// return run(task, meta)
public suspend fun Workspace.run(task: String, meta: Meta): DataSet<Any> = //}
tasks[task.toName()]?.let { run(it, meta) } ?: error("Task with name $task not found") //
//
public suspend fun Workspace.run(task: String, block: MetaBuilder.() -> Unit = {}): DataSet<Any> = //public suspend fun Workspace.execute(task: String, target: String): DataSet<Any> =
run(task, Meta(block)) // stages[task.toName()]?.let { execute(it, target) } ?: error("Task with name $task not found")
//
public suspend fun <T : Any> Workspace.run(task: Task<T>, metaBuilder: MetaBuilder.() -> Unit = {}): DataSet<T> = //public suspend fun Workspace.execute(task: String, meta: Meta): DataSet<Any> =
run(task, Meta(metaBuilder)) // stages[task.toName()]?.let { run(it, meta) } ?: error("Task with name $task not found")
//
//public suspend fun Workspace.execute(task: String, block: MetaBuilder.() -> Unit = {}): DataSet<Any> =
// execute(task, Meta(block))
//
//public suspend fun <T : Any> Workspace.execute(task: WorkStage<T>, metaBuilder: MetaBuilder.() -> Unit = {}): DataSet<T> =
// run(task, Meta(metaBuilder))

View File

@ -1,11 +1,14 @@
package hep.dataforge.workspace package hep.dataforge.workspace
import hep.dataforge.context.Context import hep.dataforge.context.Context
import hep.dataforge.context.logger
import hep.dataforge.data.* import hep.dataforge.data.*
import hep.dataforge.meta.* import hep.dataforge.meta.*
import hep.dataforge.meta.descriptors.NodeDescriptor import hep.dataforge.meta.descriptors.NodeDescriptor
import hep.dataforge.names.Name import hep.dataforge.names.Name
import hep.dataforge.workspace.old.GenericTask
import hep.dataforge.workspace.old.TaskModel
import hep.dataforge.workspace.old.TaskModelBuilder
import hep.dataforge.workspace.old.data
import kotlin.reflect.KClass import kotlin.reflect.KClass
private typealias DataTransformation<R> = suspend (context: Context, model: TaskModel, data: DataSet<Any>) -> DataSet<R> private typealias DataTransformation<R> = suspend (context: Context, model: TaskModel, data: DataSet<Any>) -> DataSet<R>
@ -187,7 +190,7 @@ public class TaskBuilder<R : Any>(private val name: Name, public val type: KClas
logger.warn { "No transformation present, returning input data" } logger.warn { "No transformation present, returning input data" }
dataSet.castOrNull(type) ?: error("$type expected, but $type received") dataSet.castOrNull(type) ?: error("$type expected, but $type received")
} else { } else {
DataTree.dynamic(type, workspace.context){ DataTree.active(type, workspace.context){
dataTransforms.forEach { transformation -> dataTransforms.forEach { transformation ->
val res = transformation(workspace.context, model, dataSet) val res = transformation(workspace.context, model, dataSet)
update(res) update(res)
@ -201,5 +204,5 @@ public class TaskBuilder<R : Any>(private val name: Name, public val type: KClas
@DFExperimental @DFExperimental
public suspend inline fun <reified T : Any> TaskBuilder.TaskEnv.dataTree( public suspend inline fun <reified T : Any> TaskBuilder.TaskEnv.dataTree(
crossinline block: suspend MutableDataTree<T>.() -> Unit, crossinline block: suspend ActiveDataTree<T>.() -> Unit,
): DataTree<T> = DataTree.dynamic(context, block) ): DataTree<T> = DataTree.active(context, block)

View File

@ -3,7 +3,7 @@ package hep.dataforge.workspace
import hep.dataforge.context.Context import hep.dataforge.context.Context
import hep.dataforge.context.ContextBuilder import hep.dataforge.context.ContextBuilder
import hep.dataforge.context.Global import hep.dataforge.context.Global
import hep.dataforge.data.MutableDataTree import hep.dataforge.data.ActiveDataTree
import hep.dataforge.meta.* import hep.dataforge.meta.*
import hep.dataforge.names.toName import hep.dataforge.names.toName
import kotlin.reflect.KClass import kotlin.reflect.KClass
@ -12,8 +12,8 @@ import kotlin.reflect.KClass
public interface WorkspaceBuilder { public interface WorkspaceBuilder {
public val parentContext: Context public val parentContext: Context
public var context: Context public var context: Context
public var data: MutableDataTree<Any> public var data: ActiveDataTree<Any>
public var tasks: MutableSet<Task<Any>> public var tasks: MutableSet<WorkStage<Any>>
public var targets: MutableMap<String, Meta> public var targets: MutableMap<String, Meta>
public fun build(): Workspace public fun build(): Workspace
@ -27,7 +27,7 @@ public fun WorkspaceBuilder.context(name: String = "WORKSPACE", block: ContextBu
} }
public inline fun WorkspaceBuilder.data( public inline fun WorkspaceBuilder.data(
block: MutableDataTree<Any>.() -> Unit, block: ActiveDataTree<Any>.() -> Unit,
): Unit{ ): Unit{
data.apply(block) data.apply(block)
} }
@ -59,21 +59,21 @@ public fun <T : Any> WorkspaceBuilder.task(
public inline fun <reified T : Any> WorkspaceBuilder.task( public inline fun <reified T : Any> WorkspaceBuilder.task(
name: String, name: String,
noinline builder: TaskBuilder<T>.() -> Unit, noinline builder: TaskBuilder<T>.() -> Unit,
): Task<T> = task(name, T::class, builder) ): WorkStage<T> = task(name, T::class, builder)
@JvmName("rawTask") @JvmName("rawTask")
public fun WorkspaceBuilder.task( public fun WorkspaceBuilder.task(
name: String, name: String,
builder: TaskBuilder<Any>.() -> Unit, builder: TaskBuilder<Any>.() -> Unit,
): Task<Any> = task(name, Any::class, builder) ): WorkStage<Any> = task(name, Any::class, builder)
/** /**
* A builder for a simple workspace * A builder for a simple workspace
*/ */
public class SimpleWorkspaceBuilder(override val parentContext: Context) : WorkspaceBuilder { public class SimpleWorkspaceBuilder(override val parentContext: Context) : WorkspaceBuilder {
override var context: Context = parentContext override var context: Context = parentContext
override var data: MutableDataTree<Any> = MutableDataTree(Any::class, context) override var data: ActiveDataTree<Any> = ActiveDataTree(Any::class, context)
override var tasks: MutableSet<Task<Any>> = HashSet() override var tasks: MutableSet<WorkStage<Any>> = HashSet()
override var targets: MutableMap<String, Meta> = HashMap() override var targets: MutableMap<String, Meta> = HashMap()
override fun build(): SimpleWorkspace { override fun build(): SimpleWorkspace {

View File

@ -1,26 +1,26 @@
package hep.dataforge.workspace package hep.dataforge.workspace
import hep.dataforge.context.AbstractPlugin import hep.dataforge.context.AbstractPlugin
import hep.dataforge.context.toMap
import hep.dataforge.names.Name import hep.dataforge.names.Name
import hep.dataforge.names.toName import hep.dataforge.names.toName
import hep.dataforge.workspace.old.GenericTask
import kotlin.reflect.KClass import kotlin.reflect.KClass
/** /**
* An abstract plugin with some additional boilerplate to effectively work with workspace context * An abstract plugin with some additional boilerplate to effectively work with workspace context
*/ */
public abstract class WorkspacePlugin : AbstractPlugin() { public abstract class WorkspacePlugin : AbstractPlugin() {
private val _tasks = HashSet<Task<*>>() private val _tasks = HashSet<WorkStage<*>>()
public val tasks: Collection<Task<*>> get() = _tasks public val tasks: Collection<WorkStage<*>> get() = _tasks
override fun content(target: String): Map<Name, Any> { override fun content(target: String): Map<Name, Any> {
return when (target) { return when (target) {
Task.TYPE -> tasks.toMap() WorkStage.TYPE -> tasks.toMap()
else -> emptyMap() else -> emptyMap()
} }
} }
public fun task(task: Task<*>){ public fun task(task: WorkStage<*>){
_tasks.add(task) _tasks.add(task)
} }

View File

@ -5,6 +5,7 @@ import hep.dataforge.context.PluginFactory
import hep.dataforge.context.PluginTag import hep.dataforge.context.PluginTag
import hep.dataforge.data.* import hep.dataforge.data.*
import hep.dataforge.meta.Meta import hep.dataforge.meta.Meta
import hep.dataforge.workspace.old.data
import kotlinx.coroutines.flow.firstOrNull import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.reduce import kotlinx.coroutines.flow.reduce
@ -23,7 +24,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
data() data()
} }
transform<Int> { data -> transform<Int> { data ->
DataTree.dynamic(context) { DataTree.active(context) {
val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair } val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair }
data("result", result) data("result", result)
} }
@ -36,7 +37,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
data(pattern = "myData\\[12\\]") data(pattern = "myData\\[12\\]")
} }
transform<Int> { data -> transform<Int> { data ->
DataTree.dynamic(context) { DataTree.active(context) {
val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair } val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair }
data("result", result) data("result", result)
} }
@ -48,7 +49,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
data(pattern = "myData.*") data(pattern = "myData.*")
} }
transform<Int> { data -> transform<Int> { data ->
DataTree.dynamic(context) { DataTree.active(context) {
val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair } val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair }
data("result", result) data("result", result)
} }
@ -81,7 +82,7 @@ class DataPropagationTest {
@Test @Test
fun testAllData() { fun testAllData() {
runBlocking { runBlocking {
val node = testWorkspace.run("Test.allData") val node = testWorkspace.execute("Test.allData")
assertEquals(4950, node.first()!!.value()) assertEquals(4950, node.first()!!.value())
} }
} }
@ -89,7 +90,7 @@ class DataPropagationTest {
@Test @Test
fun testAllRegexData() { fun testAllRegexData() {
runBlocking { runBlocking {
val node = testWorkspace.run("Test.allRegexData") val node = testWorkspace.execute("Test.allRegexData")
assertEquals(4950, node.first()!!.value()) assertEquals(4950, node.first()!!.value())
} }
} }
@ -97,7 +98,7 @@ class DataPropagationTest {
@Test @Test
fun testSingleData() { fun testSingleData() {
runBlocking { runBlocking {
val node = testWorkspace.run("Test.singleData") val node = testWorkspace.execute("Test.singleData")
assertEquals(12, node.first()!!.value()) assertEquals(12, node.first()!!.value())
} }
} }

View File

@ -4,6 +4,8 @@ import hep.dataforge.context.*
import hep.dataforge.data.* import hep.dataforge.data.*
import hep.dataforge.meta.* import hep.dataforge.meta.*
import hep.dataforge.names.plus import hep.dataforge.names.plus
import hep.dataforge.workspace.old.data
import hep.dataforge.workspace.old.dependsOn
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.Timeout
@ -23,7 +25,7 @@ public inline fun <reified P : Plugin> P.toFactory(): PluginFactory<P> = object
} }
public fun Workspace.runBlocking(task: String, block: MetaBuilder.() -> Unit = {}): DataSet<Any> = runBlocking{ public fun Workspace.runBlocking(task: String, block: MetaBuilder.() -> Unit = {}): DataSet<Any> = runBlocking{
run(task, block) execute(task, block)
} }
@ -166,7 +168,7 @@ class SimpleWorkspaceTest {
@Test @Test
fun testPluginTask() { fun testPluginTask() {
val tasks = workspace.tasks val tasks = workspace.stages
assertTrue { tasks["test.test"] != null } assertTrue { tasks["test.test"] != null }
//val node = workspace.run("test.test", "empty") //val node = workspace.run("test.test", "empty")
} }
@ -174,7 +176,7 @@ class SimpleWorkspaceTest {
@Test @Test
fun testFullSquare() { fun testFullSquare() {
runBlocking { runBlocking {
val node = workspace.run("fullsquare") val node = workspace.execute("fullsquare")
println(node.toMeta()) println(node.toMeta())
} }
} }