diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Action.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Action.kt index d55fac7b..228522dc 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Action.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Action.kt @@ -41,7 +41,7 @@ infix fun Action.then(action: Action): A // */ //class PipeAction(val transform: (Name, Data, Meta) -> Data?) : Action { // override fun invoke(node: DataNode, meta: Meta): DataNode = DataNode.build { -// node.dataSequence().forEach { (name, data) -> +// node.data().forEach { (name, data) -> // val res = transform(name, data, meta) // if (res != null) { // set(name, res) diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt index 058916b8..20957824 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt @@ -3,7 +3,6 @@ package hep.dataforge.data import hep.dataforge.meta.Meta import hep.dataforge.meta.MetaRepr import kotlinx.coroutines.CoroutineScope -import kotlin.coroutines.CoroutineContext import kotlin.reflect.KClass /** @@ -30,7 +29,9 @@ interface Data : MetaRepr { const val TYPE = "data" fun of(type: KClass, goal: Goal, meta: Meta): Data = DataImpl(type, goal, meta) + inline fun of(goal: Goal, meta: Meta): Data = of(T::class, goal, meta) + fun of(name: String, type: KClass, goal: Goal, meta: Meta): Data = NamedData(name, of(type, goal, meta)) @@ -42,7 +43,18 @@ interface Data : MetaRepr { } } -suspend fun Data.await(): T = goal.await() +/** + * Upcast a [Data] to a supertype + */ +inline fun Data.cast(): Data { + return Data.of(R::class, goal, meta) +} + +fun Data.cast(type: KClass): Data { + return Data.of(type, goal, meta) +} + +suspend fun Data.await(): T = goal.await() /** * Generic Data implementation diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataFilter.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataFilter.kt index 79f1aacc..b5a40513 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataFilter.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataFilter.kt @@ -23,7 +23,7 @@ fun DataNode.filter(filter: DataFilter): DataNode { val sourceNode = filter.from?.let { getNode(it.toName()) } ?: this@filter val regex = filter.pattern.toRegex() val targetNode = DataTreeBuilder(type).apply { - sourceNode.dataSequence().forEach { (name, data) -> + sourceNode.data().forEach { (name, data) -> if (name.toString().matches(regex)) { this[name] = data } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataNode.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataNode.kt index dc71c28c..3d4b2638 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataNode.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataNode.kt @@ -26,14 +26,14 @@ interface DataNode { /** * Walk the tree upside down and provide all data nodes with full names */ - fun dataSequence(): Sequence>> + fun data(): Sequence>> /** * A sequence of all nodes in the tree walking upside down, excluding self */ - fun nodeSequence(): Sequence>> + fun nodes(): Sequence>> - operator fun iterator(): Iterator>> = dataSequence().iterator() + operator fun iterator(): Iterator>> = data().iterator() companion object { const val TYPE = "dataNode" @@ -43,7 +43,6 @@ interface DataNode { fun builder(type: KClass) = DataTreeBuilder(type) } - } internal sealed class DataTreeItem { @@ -69,14 +68,14 @@ class DataTree internal constructor( else -> getNode(name.first()!!.asName())?.getNode(name.cutFirst()) } - override fun dataSequence(): Sequence>> { + override fun data(): Sequence>> { return sequence { items.forEach { (head, tree) -> when (tree) { is DataTreeItem.Value -> yield(head.asName() to tree.value) is DataTreeItem.Node -> { val subSequence = - tree.tree.dataSequence().map { (name, data) -> (head.asName() + name) to data } + tree.tree.data().map { (name, data) -> (head.asName() + name) to data } yieldAll(subSequence) } } @@ -84,13 +83,13 @@ class DataTree internal constructor( } } - override fun nodeSequence(): Sequence>> { + override fun nodes(): Sequence>> { return sequence { items.forEach { (head, tree) -> if (tree is DataTreeItem.Node) { yield(head.asName() to tree.tree) val subSequence = - tree.tree.nodeSequence().map { (name, node) -> (head.asName() + name) to node } + tree.tree.nodes().map { (name, node) -> (head.asName() + name) to node } yieldAll(subSequence) } } @@ -183,16 +182,16 @@ class DataTreeBuilder(private val type: KClass) { * Generate a mutable builder from this node. Node content is not changed */ fun DataNode.builder(): DataTreeBuilder = DataTreeBuilder(type).apply { - dataSequence().forEach { (name, data) -> this[name] = data } + data().forEach { (name, data) -> this[name] = data } } /** * Start computation for all goals in data node */ -fun DataNode<*>.startAll() = dataSequence().forEach { (_, data) -> data.goal.start() } +fun DataNode<*>.startAll() = data().forEach { (_, data) -> data.goal.start() } fun DataNode.filter(predicate: (Name, Data) -> Boolean): DataNode = DataNode.build(type) { - dataSequence().forEach { (name, data) -> + data().forEach { (name, data) -> if (predicate(name, data)) { this[name] = data } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt index 738f7dbc..991ddbdb 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt @@ -112,7 +112,20 @@ fun Collection>.join( scope: CoroutineScope = first(), context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.(Collection) -> R -): Goal = - scope.createGoal(this, context) { - block(map { it.await() }) - } \ No newline at end of file +): Goal = scope.createGoal(this, context) { + block(map { it.await() }) +} + +/** + * A joining goal for a map + * @param K type of the map key + * @param T type of the input goal + * @param R type of the result goal + */ +fun Map>.join( + scope: CoroutineScope = values.first(), + context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.(Map) -> R +): Goal = scope.createGoal(this.values, context) { + block(mapValues { it.value.await() }) +} \ No newline at end of file diff --git a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/CheckedDataNode.kt b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/CheckedDataNode.kt new file mode 100644 index 00000000..980bb474 --- /dev/null +++ b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/CheckedDataNode.kt @@ -0,0 +1,46 @@ +package hep.dataforge.data + +import hep.dataforge.names.Name +import kotlin.reflect.KClass +import kotlin.reflect.full.isSubclassOf + +fun Data.safeCast(type: KClass): Data? { + return if (type.isSubclassOf(type)) { + @Suppress("UNCHECKED_CAST") + Data.of(type, goal as Goal, meta) + } else { + null + } +} + +/** + * Filter a node by data and node type. Resulting node and its subnodes is guaranteed to have border type [type], + * but could contain empty nodes + */ +fun DataNode.cast(type: KClass): DataNode { + return if (this is CheckedDataNode) { + origin.cast(type) + } else { + CheckedDataNode(this, type) + } +} + +inline fun DataNode.cast(): DataNode = cast(R::class) + +class CheckedDataNode(val origin: DataNode, override val type: KClass) : DataNode { + + override fun get(name: Name): Data? = + origin[name]?.safeCast(type) + + override fun getNode(name: Name): DataNode? { + return origin.getNode(name)?.cast(type) + } + + override fun data(): Sequence>> = + origin.data().mapNotNull { pair -> + pair.second.safeCast(type)?.let { pair.first to it } + } + + override fun nodes(): Sequence>> = + origin.nodes().map { it.first to it.second.cast(type) } +} \ No newline at end of file diff --git a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/GroupBuilder.kt b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/GroupBuilder.kt index a47dd4c6..68e9e092 100644 --- a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/GroupBuilder.kt +++ b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/GroupBuilder.kt @@ -44,7 +44,7 @@ object GroupBuilder { override fun invoke(node: DataNode): Map> { val map = HashMap>() - node.dataSequence().forEach { (name, data) -> + node.data().forEach { (name, data) -> val tagValue = data.meta[key]?.string ?: defaultTagValue map.getOrPut(tagValue) { DataNode.builder(node.type) }[name] = data } diff --git a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/JoinAction.kt b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/JoinAction.kt index 3d200a14..d8702735 100644 --- a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/JoinAction.kt +++ b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/JoinAction.kt @@ -6,7 +6,6 @@ import hep.dataforge.meta.MetaBuilder import hep.dataforge.meta.builder import hep.dataforge.names.Name import hep.dataforge.names.toName -import java.util.stream.Collectors import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext import kotlin.reflect.KClass @@ -17,9 +16,9 @@ class JoinGroup(var name: String, internal val node: DataNode< var meta: MetaBuilder = MetaBuilder() - lateinit var result: suspend ActionEnv.(Map) -> R + lateinit var result: suspend ActionEnv.(Map) -> R - fun result(f: suspend ActionEnv.(Map) -> R) { + fun result(f: suspend ActionEnv.(Map) -> R) { this.result = f; } @@ -53,7 +52,7 @@ class JoinGroupBuilder { /** * Apply transformation to the whole node */ - fun result(resultName: String, f: suspend ActionEnv.(Map) -> R) { + fun result(resultName: String, f: suspend ActionEnv.(Map) -> R) { groupRules += { node -> listOf(JoinGroup(resultName, node).apply { result(f) }) } @@ -85,21 +84,19 @@ class JoinAction( val laminate = Laminate(group.meta, meta) - val goalMap: Map> = group.node - .dataSequence() + val goalMap: Map> = group.node + .data() .associate { it.first to it.second.goal } val groupName: String = group.name; - if (groupName.isEmpty()) { - throw AnonymousNotAlowedException("Anonymous groups are not allowed"); - } - val env = ActionEnv(groupName.toName(), laminate.builder()) - val goal = goalMap.join(dispatcher) { group.result.invoke(env, it) } - val res = NamedData(env.name, outputType, goal, env.meta) - builder.add(res) + val goal = goalMap.join(context = context) { group.result.invoke(env, it) } + + val res = Data.of(outputType, goal, env.meta) + + set(env.name, res) } } diff --git a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/PipeAction.kt b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/PipeAction.kt index b52830d3..933c546d 100644 --- a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/PipeAction.kt +++ b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/PipeAction.kt @@ -25,7 +25,7 @@ class PipeBuilder(var name: Name, var meta: MetaBuilder) { } -abstract class PipeAction( +class PipeAction( val inputType: KClass, val outputType: KClass, val context: CoroutineContext = EmptyCoroutineContext, @@ -37,7 +37,7 @@ abstract class PipeAction( error("$inputType expected, but ${node.type} received") } return DataNode.build(outputType) { - node.dataSequence().forEach { (name, data) -> + node.data().forEach { (name, data) -> //merging data meta with action meta (data meta is primary) val oldMeta = meta.builder().apply { update(data.meta) } // creating environment from old meta and name @@ -57,4 +57,11 @@ abstract class PipeAction( } } +inline fun DataNode.pipe( + meta: Meta, + context: CoroutineContext = EmptyCoroutineContext, + noinline action: PipeBuilder.() -> Unit +): DataNode = PipeAction(T::class, R::class, context, action).invoke(this, meta) + + diff --git a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/SplitAction.kt b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/SplitAction.kt index 880c065f..a9029590 100644 --- a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/SplitAction.kt +++ b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/SplitAction.kt @@ -3,11 +3,17 @@ package hep.dataforge.data import hep.dataforge.meta.Laminate import hep.dataforge.meta.Meta import hep.dataforge.meta.MetaBuilder +import hep.dataforge.meta.builder import hep.dataforge.names.Name -import kotlinx.coroutines.runBlocking +import hep.dataforge.names.toName +import kotlin.collections.set +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.reflect.KClass +import kotlin.reflect.full.isSuperclassOf -class FragmentEnv(val context: Context, val name: String, var meta: MetaBuilder, val log: Chronicle) { +class FragmentRule(val name: Name, var meta: MetaBuilder) { lateinit var result: suspend (T) -> R fun result(f: suspend (T) -> R) { @@ -16,75 +22,51 @@ class FragmentEnv(val context: Context, val name: String, var } -class SplitBuilder(val context: Context, val name: String, val meta: Meta) { - internal val fragments: MutableMap.() -> Unit> = HashMap() +class SplitBuilder(val name: Name, val meta: Meta) { + internal val fragments: MutableMap.() -> Unit> = HashMap() /** * Add new fragment building rule. If the framgent not defined, result won't be available even if it is present in the map * @param name the name of a fragment * @param rule the rule to transform fragment name and meta using */ - fun fragment(name: String, rule: FragmentEnv.() -> Unit) { - fragments[name] = rule + fun fragment(name: String, rule: FragmentRule.() -> Unit) { + fragments[name.toName()] = rule } } -class KSplit( - actionName: String, - inputType: Class, - outputType: Class, +class SplitAction( + val inputType: KClass, + val outputType: KClass, + val context: CoroutineContext = EmptyCoroutineContext, private val action: SplitBuilder.() -> Unit -) : GenericAction(actionName, inputType, outputType) { +) : Action { - override fun run(context: Context, data: DataNode, actionMeta: Meta): DataNode { - if (!this.inputType.isAssignableFrom(data.type)) { - throw RuntimeException("Type mismatch in action $name. $inputType expected, but ${data.type} received") + override fun invoke(node: DataNode, meta: Meta): DataNode { + if (!this.inputType.isSuperclassOf(node.type)) { + error("$inputType expected, but ${node.type} received") } - val builder = DataSet.edit(outputType) + return DataNode.build(outputType) { + node.data().forEach { (name, data) -> + val laminate = Laminate(data.meta, meta) - runBlocking { - data.dataStream(true).forEach { + val split = SplitBuilder(name, data.meta).apply(action) - val laminate = Laminate(it.meta, actionMeta) - - val split = SplitBuilder(context, it.name, it.meta).apply(action) - - - val dispatcher = context + getExecutorService(context, laminate).asCoroutineDispatcher() - - // Create a map of results in a single goal - //val commonGoal = it.goal.pipe(dispatcher) { split.result.invoke(env, it) } // apply individual fragment rules to result - split.fragments.forEach { name, rule -> - val env = FragmentEnv( - context, - it.name, - laminate.builder, - context.history.getChronicle(Name.joinString(it.name, name)) - ) + split.fragments.forEach { fragmentName, rule -> + val env = FragmentRule(fragmentName, laminate.builder()) - rule.invoke(env) + rule(env) - val goal = it.goal.pipe(dispatcher, env.result) + val goal = data.goal.pipe(context = context) { env.result(it) } - val res = NamedData(env.name, outputType, goal, env.meta) - builder.add(res) + val res = Data.of(outputType, goal, env.meta) + set(env.name, res) } } } - - return builder.build(); } -} - -inline fun DataNode.pipe( - context: Context, - meta: Meta, - name: String = "pipe", - noinline action: PipeBuilder.() -> Unit -): DataNode { - return KPipe(name, T::class.java, R::class.java, action).run(context, this, meta); } \ No newline at end of file diff --git a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/data.kt b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/_Data.kt similarity index 100% rename from dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/data.kt rename to dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/_Data.kt diff --git a/dataforge-meta/src/commonMain/kotlin/hep/dataforge/meta/Specification.kt b/dataforge-meta/src/commonMain/kotlin/hep/dataforge/meta/Specification.kt index a578b119..e1ed91fe 100644 --- a/dataforge-meta/src/commonMain/kotlin/hep/dataforge/meta/Specification.kt +++ b/dataforge-meta/src/commonMain/kotlin/hep/dataforge/meta/Specification.kt @@ -22,13 +22,14 @@ interface SpecificationCompanion { fun build(action: T.() -> Unit) = update(Config(), action) + fun empty() = build { } + /** * Wrap generic configuration producing instance of desired type */ fun wrap(config: Config): T fun wrap(meta: Meta): T = wrap(meta.toConfig()) - } fun specification(wrapper: (Config) -> T): SpecificationCompanion = diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Dependency.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Dependency.kt index 1f0f64f4..7d87cc58 100644 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Dependency.kt +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Dependency.kt @@ -37,7 +37,7 @@ class TaskModelDependency(val name: String, val meta: Meta, val placement: Name return if (placement.isEmpty()) { result } else { - DataTreeBuilder().apply { this[placement] = result }.build() + DataTreeBuilder(Any::class).apply { this[placement] = result }.build() } } diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskModel.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskModel.kt index 8d0de0c0..1f7771c6 100644 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskModel.kt +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskModel.kt @@ -45,8 +45,8 @@ data class TaskModel( * Build input for the task */ fun TaskModel.buildInput(workspace: Workspace): DataTree { - return DataTreeBuilder().apply { - dependencies.asSequence().flatMap { it.apply(workspace).dataSequence() }.forEach { (name, data) -> + return DataTreeBuilder(Any::class).apply { + dependencies.asSequence().flatMap { it.apply(workspace).data() }.forEach { (name, data) -> //TODO add concise error on replacement this[name] = data } diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Workspace.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Workspace.kt index 40c45aaf..e9a2914d 100644 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Workspace.kt +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Workspace.kt @@ -43,8 +43,8 @@ interface Workspace : ContextAware, Provider { return when (target) { "target", Meta.TYPE -> targets.keys.asSequence().map { it.toName() } Task.TYPE -> tasks.keys.asSequence().map { it.toName() } - Data.TYPE -> data.dataSequence().map { it.first } - DataNode.TYPE -> data.nodeSequence().map { it.first } + Data.TYPE -> data.data().map { it.first } + DataNode.TYPE -> data.nodes().map { it.first } else -> emptySequence() } } diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/WorkspaceBuilder.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/WorkspaceBuilder.kt index 9aebdcd0..9f8eb08f 100644 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/WorkspaceBuilder.kt +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/WorkspaceBuilder.kt @@ -11,7 +11,7 @@ import hep.dataforge.meta.buildMeta * A builder for a workspace */ class WorkspaceBuilder(var context: Context) { - val data = DataTreeBuilder() + val data = DataTreeBuilder(Any::class) val targets = HashMap() val tasks = HashSet>() diff --git a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/GenericTask.kt b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/GenericTask.kt new file mode 100644 index 00000000..b2d5967a --- /dev/null +++ b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/GenericTask.kt @@ -0,0 +1,218 @@ +package hep.dataforge.workspace + +import hep.dataforge.data.* +import hep.dataforge.descriptors.NodeDescriptor +import hep.dataforge.meta.Meta +import hep.dataforge.meta.get +import hep.dataforge.meta.node +import hep.dataforge.names.toName +import kotlin.reflect.KClass + +class GenericTask( + override val name: String, + override val type: KClass, + override val descriptor: NodeDescriptor, + private val modelTransform: TaskModelBuilder.(Meta) -> Unit, + private val dataTransform: TaskModel.(DataNode) -> DataNode +) : Task { + + private fun gather(workspace: Workspace, model: TaskModel): DataNode { +// val builder = DataTreeBuilder(Any::class) +// model.dependencies.forEach { dep -> +// dep.apply(workspace) +// } +// return builder.build() + } + + override fun run(workspace: Workspace, model: TaskModel): DataNode { + //validate model + validate(model) + + // gather data + val input = gather(workspace, model) + + //execute + workspace.context.logger.info("Starting task '$name' on data node ${input.name} with meta: \n${model.meta}") + val output = dataTransform.invoke(model, input) + + //handle result + output.handle(model.context.dispatcher) { this.handle(it) } + + return output + } + + /** + * Build new TaskModel and apply specific model transformation for this + * task. By default model uses the meta node with the same node as the name of the task. + * + * @param workspace + * @param taskConfig + * @return + */ + override fun build(workspace: Workspace, taskConfig: Meta): TaskModel { + val taskMeta = taskConfig[name]?.node ?: taskConfig + val builder = TaskModelBuilder(name, taskMeta) + modelTransform.invoke(builder, taskMeta) + return builder.build() + } + //TODO add validation +} + +class KTaskBuilder(val name: String) { + private var modelTransform: TaskModelBuilder.(Meta) -> Unit = { data("*") } + var descriptor: NodeDescriptor? = null + + private class DataTransformation( + val from: String = "", + val to: String = "", + val transform: TaskModel.(DataNode?) -> DataNode + ) { + operator fun invoke(model: TaskModel, node: DataNode): DataNode { + val localData = if (from.isEmpty()) { + node + } else { + node.getNode(from.toName()) + } + return transform.invoke(model, localData); + } + } + + private val dataTransforms: MutableList = ArrayList(); + + fun model(modelTransform: TaskModelBuilder.(Meta) -> Unit) { + this.modelTransform = modelTransform + } + + fun transform( + inputType: Class, + from: String = "", + to: String = "", + transform: TaskModel.(DataNode) -> DataNode + ) { + dataTransforms += DataTransformation(from, to) { data: DataNode -> + transform.invoke(this, data.checked(inputType)) + } + } + + inline fun transform( + from: String = "", + to: String = "", + noinline transform: TaskModel.(DataNode) -> DataNode + ) { + transform(T::class.java, from, to, transform) + } + + /** + * Perform given action on data elements in `from` node in input and put the result to `to` node + */ + inline fun action(action: Action, from: String = "", to: String = "") { + transform(from, to) { data: DataNode -> + action(data, meta) + } + } + + inline fun pipeAction( + actionName: String = "pipe", + from: String = "", + to: String = "", + noinline action: PipeBuilder.() -> Unit + ) { + val pipe: Action = PipeAction( + inputType = T::class, + outputType = R::class, + block = action + ) + action(pipe, from, to); + } + + inline fun pipe( + actionName: String = "pipe", + from: String = "", + to: String = "", + noinline action: suspend ActionEnv.(T) -> R + ) { + val pipe: Action = PipeAction( + inputType = T::class, + outputType = R::class + ) { result(action) } + action(pipe, from, to); + } + + + inline fun joinAction( + actionName: String = "join", + from: String = "", + to: String = "", + noinline action: JoinGroupBuilder.() -> Unit + ) { + val join: Action = JoinAction( + inputType = T::class, + outputType = R::class, + action = action + ) + action(join, from, to); + } + + inline fun join( + actionName: String = name, + from: String = "", + to: String = "", + noinline action: suspend ActionEnv.(Map) -> R + ) { + val join: Action = JoinAction( + inputType = T::class, + outputType = R::class, + action = { + result(meta.getString("@target", actionName), action) + } + ) + action(join, from, to); + } + + inline fun splitAction( + actionName: String = "split", + from: String = "", + to: String = "", + noinline action: SplitBuilder.() -> Unit + ) { + val split: Action = SplitAction( + inputType = T::class, + outputType = R::class, + action = action + ) + action(split, from, to); + } + + /** + * Use DSL to create a descriptor for this task + */ + fun descriptor(transform: NodeDescriptor.() -> Unit) { + this.descriptor = NodeDescriptor.build(transform) + } + + fun build(): GenericTask { + val transform: TaskModel.(DataNode) -> DataNode = { data -> + if (dataTransforms.isEmpty()) { + //return data node as is + logger.warn("No transformation present, returning input data") + data.checked(Any::class.java) + } else { + val builder = DataTreeBuilder(Any::class) + dataTransforms.forEach { + val res = it(this, data) + if (it.to.isEmpty()) { + builder.update(res) + } else { + builder.putNode(it.to, res) + } + } + builder.build() + } + } + return GenericTask(name, Any::class, descriptor ?: NodeDescriptor.empty(), modelTransform, transform); + } +} + +fun task(name: String, builder: KTaskBuilder.() -> Unit): GenericTask { + return KTaskBuilder(name).apply(builder).build(); +} \ No newline at end of file diff --git a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/KTask.kt b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/KTask.kt deleted file mode 100644 index 7717e9ec..00000000 --- a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/KTask.kt +++ /dev/null @@ -1,180 +0,0 @@ -package hep.dataforge.workspace - -import hep.dataforge.data.Action -import hep.dataforge.data.DataNode -import hep.dataforge.data.DataTree -import hep.dataforge.descriptors.NodeDescriptor -import hep.dataforge.meta.Meta -import hep.dataforge.names.Name -import kotlin.reflect.KClass - -class KTask( - override val name: String, - type: KClass, - descriptor: NodeDescriptor? = null, - private val modelTransform: TaskModel.Builder.(Meta) -> Unit, - private val dataTransform: TaskModel.(DataNode) -> DataNode -) : AbstractTask(type.java, descriptor) { - - override fun run(model: TaskModel, data: DataNode): DataNode { - model.context.logger.info("Starting task '$name' on data node ${data.name} with meta: \n${model.meta}") - return dataTransform.invoke(model, data); - } - - override fun buildModel(model: TaskModel.Builder, meta: Meta) { - modelTransform.invoke(model, meta); - } - - //TODO add validation -} - -class KTaskBuilder(val name: String) { - private var modelTransform: TaskModel.Builder.(Meta) -> Unit = { data("*") } - var descriptor: NodeDescriptor? = null - - private class DataTransformation( - val from: String = "", - val to: String = "", - val transform: TaskModel.(DataNode) -> DataNode - ) { - operator fun invoke(model: TaskModel, node: DataNode): DataNode { - val localData = if (from.isEmpty()) { - node - } else { - node.getNode(from) - } - return transform.invoke(model, localData); - } - } - - private val dataTransforms: MutableList = ArrayList(); - - fun model(modelTransform: TaskModel.Builder.(Meta) -> Unit) { - this.modelTransform = modelTransform - } - - fun transform(inputType: Class, from: String = "", to: String = "", transform: TaskModel.(DataNode) -> DataNode) { - dataTransforms += DataTransformation(from, to) { data: DataNode -> - transform.invoke(this, data.checked(inputType)) - } - } - - inline fun transform(from: String = "", to: String = "", noinline transform: TaskModel.(DataNode) -> DataNode) { - transform(T::class.java, from, to, transform) - } - - /** - * Perform given action on data elements in `from` node in input and put the result to `to` node - */ - inline fun action(action: Action, from: String = "", to: String = "") { - transform(from, to){ data: DataNode -> - action.run(context, data, meta) - } - } - - inline fun pipeAction( - actionName: String = "pipe", - from: String = "", - to: String = "", - noinline action: PipeBuilder.() -> Unit) { - val pipe: Action = KPipe( - actionName = Name.joinString(name, actionName), - inputType = T::class.java, - outputType = R::class.java, - action = action - ) - action(pipe, from, to); - } - - inline fun pipe( - actionName: String = "pipe", - from: String = "", - to: String = "", - noinline action: suspend ActionEnv.(T) -> R) { - val pipe: Action = KPipe( - actionName = Name.joinString(name, actionName), - inputType = T::class.java, - outputType = R::class.java, - action = { result(action) } - ) - action(pipe, from, to); - } - - - inline fun joinAction( - actionName: String = "join", - from: String = "", - to: String = "", - noinline action: JoinGroupBuilder.() -> Unit) { - val join: Action = KJoin( - actionName = Name.joinString(name, actionName), - inputType = T::class.java, - outputType = R::class.java, - action = action - ) - action(join, from, to); - } - - inline fun join( - actionName: String = name, - from: String = "", - to: String = "", - noinline action: suspend ActionEnv.(Map) -> R) { - val join: Action = KJoin( - actionName = Name.joinString(name, actionName), - inputType = T::class.java, - outputType = R::class.java, - action = { - result(meta.getString("@target", actionName), action) - } - ) - action(join, from, to); - } - - inline fun splitAction( - actionName: String = "split", - from: String = "", - to: String = "", - noinline action: SplitBuilder.() -> Unit) { - val split: Action = KSplit( - actionName = Name.joinString(name, actionName), - inputType = T::class.java, - outputType = R::class.java, - action = action - ) - action(split, from, to); - } - - /** - * Use DSL to create a descriptor for this task - */ - fun descriptor(transform: DescriptorBuilder.() -> Unit) { - this.descriptor = DescriptorBuilder(name).apply(transform).build() - } - - fun build(): KTask { - val transform: TaskModel.(DataNode) -> DataNode = { data -> - if (dataTransforms.isEmpty()) { - //return data node as is - logger.warn("No transformation present, returning input data") - data.checked(Any::class.java) - } else { - val builder: DataNodeEditor = DataTree.edit(Any::class.java) - dataTransforms.forEach { - val res = it(this, data) - if (it.to.isEmpty()) { - builder.update(res) - } else { - builder.putNode(it.to, res) - } - } - builder.build() - } - } - return KTask(name, Any::class, descriptor, modelTransform, transform); - } -} - -fun task(name: String, builder: KTaskBuilder.() -> Unit): KTask { - return KTaskBuilder(name).apply(builder).build(); -} \ No newline at end of file