Task builder complete. Not tested
This commit is contained in:
parent
0ff915236c
commit
16d263dc0b
@ -167,6 +167,14 @@ class DataTreeBuilder<T : Any>(private val type: KClass<out T>) {
|
||||
*/
|
||||
infix fun String.to(block: DataTreeBuilder<T>.() -> Unit) = set(toName(), DataTreeBuilder<T>(type).apply(block))
|
||||
|
||||
|
||||
fun update(node: DataNode<T>){
|
||||
node.data().forEach {
|
||||
//TODO check if the place is occupied
|
||||
this[it.first] = it.second
|
||||
}
|
||||
}
|
||||
|
||||
fun build(): DataTree<T> {
|
||||
val resMap = map.mapValues { (_, value) ->
|
||||
when (value) {
|
||||
|
@ -17,17 +17,17 @@ fun <T : Any, R : Any> Data<T>.safeCast(type: KClass<R>): Data<R>? {
|
||||
* Filter a node by data and node type. Resulting node and its subnodes is guaranteed to have border type [type],
|
||||
* but could contain empty nodes
|
||||
*/
|
||||
fun <T : Any, R : Any> DataNode<T>.cast(type: KClass<out R>): DataNode<R> {
|
||||
return if (this is CheckedDataNode) {
|
||||
fun <T : Any, R : Any> DataNode<T>.cast(type: KClass<R>): DataNode<R> {
|
||||
return if (this is CastDataNode) {
|
||||
origin.cast(type)
|
||||
} else {
|
||||
CheckedDataNode(this, type)
|
||||
CastDataNode(this, type)
|
||||
}
|
||||
}
|
||||
|
||||
inline fun <T : Any, reified R : Any> DataNode<T>.cast(): DataNode<R> = cast(R::class)
|
||||
|
||||
class CheckedDataNode<out T : Any>(val origin: DataNode<Any>, override val type: KClass<out T>) : DataNode<T> {
|
||||
class CastDataNode<out T : Any>(val origin: DataNode<Any>, override val type: KClass<out T>) : DataNode<T> {
|
||||
|
||||
override fun get(name: Name): Data<T>? =
|
||||
origin[name]?.safeCast(type)
|
@ -24,7 +24,7 @@ class JoinGroup<T : Any, R : Any>(var name: String, internal val node: DataNode<
|
||||
|
||||
}
|
||||
|
||||
class JoinGroupBuilder<T : Any, R : Any> {
|
||||
class JoinGroupBuilder<T : Any, R : Any>(val actionMeta: Meta) {
|
||||
private val groupRules: MutableList<(DataNode<T>) -> List<JoinGroup<T, R>>> = ArrayList();
|
||||
|
||||
/**
|
||||
@ -80,7 +80,7 @@ class JoinAction<T : Any, R : Any>(
|
||||
error("$inputType expected, but ${node.type} received")
|
||||
}
|
||||
return DataNode.build(outputType) {
|
||||
JoinGroupBuilder<T, R>().apply(action).buildGroups(node).forEach { group ->
|
||||
JoinGroupBuilder<T, R>(meta).apply(action).buildGroups(node).forEach { group ->
|
||||
|
||||
val laminate = Laminate(group.meta, meta)
|
||||
|
||||
|
@ -2,7 +2,6 @@ package hep.dataforge.workspace
|
||||
|
||||
import hep.dataforge.data.DataFilter
|
||||
import hep.dataforge.data.DataNode
|
||||
import hep.dataforge.data.DataTreeBuilder
|
||||
import hep.dataforge.data.filter
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.MetaRepr
|
||||
@ -18,9 +17,15 @@ sealed class Dependency : MetaRepr {
|
||||
abstract fun apply(workspace: Workspace): DataNode<Any>
|
||||
}
|
||||
|
||||
class DataDependency(val filter: DataFilter) : Dependency() {
|
||||
override fun apply(workspace: Workspace): DataNode<Any> =
|
||||
workspace.data.filter(filter)
|
||||
class DataDependency(val filter: DataFilter, val placement: Name = EmptyName) : Dependency() {
|
||||
override fun apply(workspace: Workspace): DataNode<Any> {
|
||||
val result = workspace.data.filter(filter)
|
||||
return if (placement.isEmpty()) {
|
||||
result
|
||||
} else {
|
||||
DataNode.build(Any::class){ this[placement] = result }
|
||||
}
|
||||
}
|
||||
|
||||
override fun toMeta(): Meta = filter.config
|
||||
|
||||
@ -37,7 +42,7 @@ class TaskModelDependency(val name: String, val meta: Meta, val placement: Name
|
||||
return if (placement.isEmpty()) {
|
||||
result
|
||||
} else {
|
||||
DataTreeBuilder(Any::class).apply { this[placement] = result }.build()
|
||||
DataNode.build(Any::class){ this[placement] = result }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -12,6 +12,7 @@ import hep.dataforge.meta.*
|
||||
import hep.dataforge.names.EmptyName
|
||||
import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.toName
|
||||
import hep.dataforge.workspace.TaskModel.Companion.MODEL_TARGET_KEY
|
||||
|
||||
|
||||
/**
|
||||
@ -39,6 +40,10 @@ data class TaskModel(
|
||||
//TODO ensure all dependencies are listed
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
const val MODEL_TARGET_KEY = "@target"
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -63,6 +68,8 @@ class TaskModelBuilder(val name: String, meta: Meta = EmptyMeta) {
|
||||
var meta: MetaBuilder = meta.builder()
|
||||
val dependencies = HashSet<Dependency>()
|
||||
|
||||
var target: String by this.meta.string(key = MODEL_TARGET_KEY,default = "")
|
||||
|
||||
/**
|
||||
* Add dependency for
|
||||
*/
|
||||
@ -95,3 +102,6 @@ class TaskModelBuilder(val name: String, meta: Meta = EmptyMeta) {
|
||||
|
||||
fun build(): TaskModel = TaskModel(name, meta.seal(), dependencies)
|
||||
}
|
||||
|
||||
|
||||
val TaskModel.target get() = meta[MODEL_TARGET_KEY]?.string ?: ""
|
@ -5,7 +5,10 @@ import hep.dataforge.descriptors.NodeDescriptor
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.get
|
||||
import hep.dataforge.meta.node
|
||||
import hep.dataforge.meta.string
|
||||
import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.toName
|
||||
import mu.KotlinLogging
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
class GenericTask<R : Any>(
|
||||
@ -17,11 +20,11 @@ class GenericTask<R : Any>(
|
||||
) : Task<R> {
|
||||
|
||||
private fun gather(workspace: Workspace, model: TaskModel): DataNode<Any> {
|
||||
// val builder = DataTreeBuilder(Any::class)
|
||||
// model.dependencies.forEach { dep ->
|
||||
// dep.apply(workspace)
|
||||
// }
|
||||
// return builder.build()
|
||||
return DataNode.build(Any::class) {
|
||||
model.dependencies.forEach { dep ->
|
||||
update(dep.apply(workspace))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun run(workspace: Workspace, model: TaskModel): DataNode<R> {
|
||||
@ -32,11 +35,11 @@ class GenericTask<R : Any>(
|
||||
val input = gather(workspace, model)
|
||||
|
||||
//execute
|
||||
workspace.context.logger.info("Starting task '$name' on data node ${input.name} with meta: \n${model.meta}")
|
||||
workspace.context.logger.info("Starting task '$name' on ${model.target} with meta: \n${model.meta}")
|
||||
val output = dataTransform.invoke(model, input)
|
||||
|
||||
//handle result
|
||||
output.handle(model.context.dispatcher) { this.handle(it) }
|
||||
//output.handle(model.context.dispatcher) { this.handle(it) }
|
||||
|
||||
return output
|
||||
}
|
||||
@ -65,13 +68,13 @@ class KTaskBuilder(val name: String) {
|
||||
private class DataTransformation(
|
||||
val from: String = "",
|
||||
val to: String = "",
|
||||
val transform: TaskModel.(DataNode<Any>?) -> DataNode<Any>
|
||||
val transform: TaskModel.(DataNode<Any>) -> DataNode<Any>
|
||||
) {
|
||||
operator fun invoke(model: TaskModel, node: DataNode<Any>): DataNode<Any> {
|
||||
operator fun invoke(model: TaskModel, node: DataNode<Any>): DataNode<Any>? {
|
||||
val localData = if (from.isEmpty()) {
|
||||
node
|
||||
} else {
|
||||
node.getNode(from.toName())
|
||||
node.getNode(from.toName()) ?: return null
|
||||
}
|
||||
return transform.invoke(model, localData);
|
||||
}
|
||||
@ -84,13 +87,13 @@ class KTaskBuilder(val name: String) {
|
||||
}
|
||||
|
||||
fun <T : Any> transform(
|
||||
inputType: Class<T>,
|
||||
inputType: KClass<T>,
|
||||
from: String = "",
|
||||
to: String = "",
|
||||
transform: TaskModel.(DataNode<T>) -> DataNode<Any>
|
||||
) {
|
||||
dataTransforms += DataTransformation(from, to) { data: DataNode<Any> ->
|
||||
transform.invoke(this, data.checked(inputType))
|
||||
transform.invoke(this, data.cast(inputType))
|
||||
}
|
||||
}
|
||||
|
||||
@ -99,7 +102,7 @@ class KTaskBuilder(val name: String) {
|
||||
to: String = "",
|
||||
noinline transform: TaskModel.(DataNode<T>) -> DataNode<Any>
|
||||
) {
|
||||
transform(T::class.java, from, to, transform)
|
||||
transform(T::class, from, to, transform)
|
||||
}
|
||||
|
||||
/**
|
||||
@ -112,7 +115,6 @@ class KTaskBuilder(val name: String) {
|
||||
}
|
||||
|
||||
inline fun <reified T : Any, reified R : Any> pipeAction(
|
||||
actionName: String = "pipe",
|
||||
from: String = "",
|
||||
to: String = "",
|
||||
noinline action: PipeBuilder<T, R>.() -> Unit
|
||||
@ -126,7 +128,6 @@ class KTaskBuilder(val name: String) {
|
||||
}
|
||||
|
||||
inline fun <reified T : Any, reified R : Any> pipe(
|
||||
actionName: String = "pipe",
|
||||
from: String = "",
|
||||
to: String = "",
|
||||
noinline action: suspend ActionEnv.(T) -> R
|
||||
@ -140,7 +141,6 @@ class KTaskBuilder(val name: String) {
|
||||
|
||||
|
||||
inline fun <reified T : Any, reified R : Any> joinAction(
|
||||
actionName: String = "join",
|
||||
from: String = "",
|
||||
to: String = "",
|
||||
noinline action: JoinGroupBuilder<T, R>.() -> Unit
|
||||
@ -154,23 +154,21 @@ class KTaskBuilder(val name: String) {
|
||||
}
|
||||
|
||||
inline fun <reified T : Any, reified R : Any> join(
|
||||
actionName: String = name,
|
||||
from: String = "",
|
||||
to: String = "",
|
||||
noinline action: suspend ActionEnv.(Map<String, T>) -> R
|
||||
noinline action: suspend ActionEnv.(Map<Name, T>) -> R
|
||||
) {
|
||||
val join: Action<T, R> = JoinAction(
|
||||
inputType = T::class,
|
||||
outputType = R::class,
|
||||
action = {
|
||||
result(meta.getString("@target", actionName), action)
|
||||
result(actionMeta[TaskModel.MODEL_TARGET_KEY]?.string ?: "@anonimous", action)
|
||||
}
|
||||
)
|
||||
action(join, from, to);
|
||||
}
|
||||
|
||||
inline fun <reified T : Any, reified R : Any> splitAction(
|
||||
actionName: String = "split",
|
||||
from: String = "",
|
||||
to: String = "",
|
||||
noinline action: SplitBuilder<T, R>.() -> Unit
|
||||
@ -194,16 +192,18 @@ class KTaskBuilder(val name: String) {
|
||||
val transform: TaskModel.(DataNode<Any>) -> DataNode<Any> = { data ->
|
||||
if (dataTransforms.isEmpty()) {
|
||||
//return data node as is
|
||||
logger.warn("No transformation present, returning input data")
|
||||
data.checked(Any::class.java)
|
||||
KotlinLogging.logger(this::class.toString()).warn("No transformation present, returning input data")
|
||||
data
|
||||
} else {
|
||||
val builder = DataTreeBuilder(Any::class)
|
||||
dataTransforms.forEach {
|
||||
val res = it(this, data)
|
||||
if (res != null) {
|
||||
if (it.to.isEmpty()) {
|
||||
builder.update(res)
|
||||
} else {
|
||||
builder.putNode(it.to, res)
|
||||
builder[it.to.toName()] = res
|
||||
}
|
||||
}
|
||||
}
|
||||
builder.build()
|
||||
|
Loading…
Reference in New Issue
Block a user