From 4e31bef6312e417885c8f5d9b407fe9377150608 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Wed, 20 Mar 2019 22:12:27 +0300 Subject: [PATCH] Migrating action builders --- dataforge-data/build.gradle.kts | 1 + .../kotlin/hep/dataforge/data/Action.kt | 37 ++-- .../kotlin/hep/dataforge/data/Data.kt | 5 +- .../kotlin/hep/dataforge/data/DataFilter.kt | 4 +- .../kotlin/hep/dataforge/data/DataNode.kt | 35 ++-- .../kotlin/hep/dataforge/data/Goal.kt | 68 ++++--- .../kotlin/hep/dataforge/data/GroupBuilder.kt | 70 +++++++ .../kotlin/hep/dataforge/data/JoinAction.kt | 113 +++++++++++ .../kotlin/hep/dataforge/data/PipeAction.kt | 60 ++++++ .../kotlin/hep/dataforge/data/SplitAction.kt | 90 +++++++++ .../hep/dataforge/data/{_Data.kt => data.kt} | 0 .../hep/dataforge/descriptors/Described.kt | 5 + .../kotlin/hep/dataforge/workspace/Task.kt | 8 +- .../hep/dataforge/workspace/Workspace.kt | 2 +- .../kotlin/hep/dataforge/workspace/KTask.kt | 180 ++++++++++++++++++ 15 files changed, 604 insertions(+), 74 deletions(-) create mode 100644 dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/GroupBuilder.kt create mode 100644 dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/JoinAction.kt create mode 100644 dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/PipeAction.kt create mode 100644 dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/SplitAction.kt rename dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/{_Data.kt => data.kt} (100%) create mode 100644 dataforge-meta/src/commonMain/kotlin/hep/dataforge/descriptors/Described.kt create mode 100644 dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/KTask.kt diff --git a/dataforge-data/build.gradle.kts b/dataforge-data/build.gradle.kts index eb6b3669..f0598595 100644 --- a/dataforge-data/build.gradle.kts +++ b/dataforge-data/build.gradle.kts @@ -11,6 +11,7 @@ kotlin { val commonMain by getting{ dependencies { api(project(":dataforge-meta")) + api(kotlin("reflect")) api("org.jetbrains.kotlinx:kotlinx-coroutines-core-common:$coroutinesVersion") } } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Action.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Action.kt index c5fcfcfc..d55fac7b 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Action.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Action.kt @@ -34,28 +34,19 @@ infix fun Action.then(action: Action): A } } -/** - * An action that performs the same transformation on each of input data nodes. Null results are ignored. - */ -class PipeAction(val transform: (Name, Data, Meta) -> Data?) : Action { - override fun invoke(node: DataNode, meta: Meta): DataNode = DataNode.build { - node.dataSequence().forEach { (name, data) -> - val res = transform(name, data, meta) - if (res != null) { - set(name, res) - } - } - } - companion object { - /** - * A simple pipe that performs transformation on the data and copies input meta into the output - */ - inline fun simple(noinline transform: suspend (Name, T, Meta) -> R) = - PipeAction { name, data: Data, meta -> - val goal = data.goal.pipe { transform(name, it, meta) } - return@PipeAction Data.of(goal, data.meta) - } - } -} +///** +// * An action that performs the same transformation on each of input data nodes. Null results are ignored. +// * The transformation is non-suspending because it is lazy. +// */ +//class PipeAction(val transform: (Name, Data, Meta) -> Data?) : Action { +// override fun invoke(node: DataNode, meta: Meta): DataNode = DataNode.build { +// node.dataSequence().forEach { (name, data) -> +// val res = transform(name, data, meta) +// if (res != null) { +// set(name, res) +// } +// } +// } +//} diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt index 6096a799..058916b8 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt @@ -2,6 +2,7 @@ package hep.dataforge.data import hep.dataforge.meta.Meta import hep.dataforge.meta.MetaRepr +import kotlinx.coroutines.CoroutineScope import kotlin.coroutines.CoroutineContext import kotlin.reflect.KClass @@ -36,8 +37,8 @@ interface Data : MetaRepr { inline fun of(name: String, goal: Goal, meta: Meta): Data = of(name, T::class, goal, meta) - fun static(context: CoroutineContext, value: T, meta: Meta): Data = - DataImpl(value::class, Goal.static(context, value), meta) + fun static(scope: CoroutineScope, value: T, meta: Meta): Data = + DataImpl(value::class, Goal.static(scope, value), meta) } } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataFilter.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataFilter.kt index 6f230c0f..79f1aacc 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataFilter.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataFilter.kt @@ -22,7 +22,7 @@ class DataFilter(override val config: Config) : Specification { fun DataNode.filter(filter: DataFilter): DataNode { val sourceNode = filter.from?.let { getNode(it.toName()) } ?: this@filter val regex = filter.pattern.toRegex() - val targetNode = DataTreeBuilder().apply { + val targetNode = DataTreeBuilder(type).apply { sourceNode.dataSequence().forEach { (name, data) -> if (name.toString().matches(regex)) { this[name] = data @@ -30,7 +30,7 @@ fun DataNode.filter(filter: DataFilter): DataNode { } } return filter.to?.let { - DataTreeBuilder().apply { this[it.toName()] = targetNode }.build() + DataTreeBuilder(type).apply { this[it.toName()] = targetNode }.build() } ?: targetNode.build() } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataNode.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataNode.kt index 53063520..dc71c28c 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataNode.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataNode.kt @@ -1,15 +1,18 @@ package hep.dataforge.data -import hep.dataforge.names.Name -import hep.dataforge.names.NameToken -import hep.dataforge.names.plus -import hep.dataforge.names.toName -import hep.dataforge.names.asName +import hep.dataforge.names.* +import kotlin.reflect.KClass /** * A tree-like data structure grouped into the node. All data inside the node must inherit its type */ interface DataNode { + + /** + * The minimal common ancestor to all data in the node + */ + val type: KClass + /** * Get the specific data if it exists */ @@ -35,7 +38,10 @@ interface DataNode { companion object { const val TYPE = "dataNode" - fun build(block: DataTreeBuilder.() -> Unit) = DataTreeBuilder().apply(block).build() + fun build(type: KClass, block: DataTreeBuilder.() -> Unit) = + DataTreeBuilder(type).apply(block).build() + + fun builder(type: KClass) = DataTreeBuilder(type) } } @@ -45,7 +51,10 @@ internal sealed class DataTreeItem { class Value(val value: Data) : DataTreeItem() } -class DataTree internal constructor(private val items: Map>) : DataNode { +class DataTree internal constructor( + override val type: KClass, + private val items: Map> +) : DataNode { //TODO add node-level meta? override fun get(name: Name): Data? = when (name.length) { @@ -97,7 +106,7 @@ private sealed class DataTreeBuilderItem { /** * A builder for a DataTree. */ -class DataTreeBuilder { +class DataTreeBuilder(private val type: KClass) { private val map = HashMap>() operator fun set(token: NameToken, node: DataTreeBuilder) { @@ -112,7 +121,7 @@ class DataTreeBuilder { private fun buildNode(token: NameToken): DataTreeBuilder { return if (!map.containsKey(token)) { - DataTreeBuilder().also { map[token] = DataTreeBuilderItem.Node(it) } + DataTreeBuilder(type).also { map[token] = DataTreeBuilderItem.Node(it) } } else { (map[token] as? DataTreeBuilderItem.Node ?: error("The node with name $token is occupied by leaf")).tree } @@ -157,7 +166,7 @@ class DataTreeBuilder { /** * Build and append node */ - infix fun String.to(block: DataTreeBuilder.() -> Unit) = set(toName(), DataTreeBuilder().apply(block)) + infix fun String.to(block: DataTreeBuilder.() -> Unit) = set(toName(), DataTreeBuilder(type).apply(block)) fun build(): DataTree { val resMap = map.mapValues { (_, value) -> @@ -166,14 +175,14 @@ class DataTreeBuilder { is DataTreeBuilderItem.Node -> DataTreeItem.Node(value.tree.build()) } } - return DataTree(resMap) + return DataTree(type, resMap) } } /** * Generate a mutable builder from this node. Node content is not changed */ -fun DataNode.builder(): DataTreeBuilder = DataTreeBuilder().apply { +fun DataNode.builder(): DataTreeBuilder = DataTreeBuilder(type).apply { dataSequence().forEach { (name, data) -> this[name] = data } } @@ -182,7 +191,7 @@ fun DataNode.builder(): DataTreeBuilder = DataTreeBuilder().a */ fun DataNode<*>.startAll() = dataSequence().forEach { (_, data) -> data.goal.start() } -fun DataNode.filter(predicate: (Name, Data) -> Boolean): DataNode = DataNode.build { +fun DataNode.filter(predicate: (Name, Data) -> Boolean): DataNode = DataNode.build(type) { dataSequence().forEach { (name, data) -> if (predicate(name, data)) { this[name] = data diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt index 1441a40e..738f7dbc 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt @@ -2,26 +2,28 @@ package hep.dataforge.data import kotlinx.coroutines.* import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext /** * A special deferred with explicit dependencies and some additional information like progress and unique id */ interface Goal : Deferred, CoroutineScope { + val scope: CoroutineScope + override val coroutineContext get() = scope.coroutineContext + val dependencies: Collection> - val status: String - - val totalWork: Double - val workDone: Double - + val totalWork: Double get() = dependencies.sumByDouble { totalWork } + (monitor?.totalWork ?: 0.0) + val workDone: Double get() = dependencies.sumByDouble { workDone } + (monitor?.workDone ?: 0.0) + val status: String get() = monitor?.status ?: "" val progress: Double get() = workDone / totalWork companion object { /** * Create goal wrapping static value. This goal is always completed */ - fun static(context: CoroutineContext, value: T): Goal = - StaticGoalImpl(context, CompletableDeferred(value)) + fun static(scope: CoroutineScope, value: T): Goal = + StaticGoalImpl(scope, CompletableDeferred(value)) } } @@ -52,24 +54,20 @@ class GoalMonitor : CoroutineContext.Element { companion object : CoroutineContext.Key } -private class GoalImpl( - override val dependencies: Collection>, - val monitor: GoalMonitor, - deferred: Deferred -) : Goal, Deferred by deferred { - override val coroutineContext: CoroutineContext get() = this - override val totalWork: Double get() = dependencies.sumByDouble { totalWork } + monitor.totalWork - override val workDone: Double get() = dependencies.sumByDouble { workDone } + monitor.workDone - override val status: String get() = monitor.status -} +val CoroutineScope.monitor: GoalMonitor? get() = coroutineContext[GoalMonitor] -private class StaticGoalImpl(val context: CoroutineContext, deferred: CompletableDeferred) : Goal, +private class GoalImpl( + override val scope: CoroutineScope, + override val dependencies: Collection>, + deferred: Deferred +) : Goal, Deferred by deferred + +private class StaticGoalImpl(override val scope: CoroutineScope, deferred: CompletableDeferred) : Goal, Deferred by deferred { override val dependencies: Collection> get() = emptyList() override val status: String get() = "" override val totalWork: Double get() = 0.0 override val workDone: Double get() = 0.0 - override val coroutineContext: CoroutineContext get() = context } @@ -79,23 +77,32 @@ private class StaticGoalImpl(val context: CoroutineContext, deferred: Complet * * **Important:** Unlike regular deferred, the [Goal] is started lazily, so the actual calculation is called only when result is requested. */ -fun CoroutineScope.createGoal(dependencies: Collection>, block: suspend GoalMonitor.() -> R): Goal { - val monitor = GoalMonitor() - val deferred = async(start = CoroutineStart.LAZY) { +fun CoroutineScope.createGoal( + dependencies: Collection>, + context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.() -> R +): Goal { + val deferred = async(context + GoalMonitor(), start = CoroutineStart.LAZY) { dependencies.forEach { it.start() } - monitor.start() - return@async supervisorScope { monitor.block() } - }.also { - monitor.finish() + monitor?.start() + //Running in supervisor scope in order to allow manual error handling + return@async supervisorScope { + block().also { + monitor?.finish() + } + } } - return GoalImpl(dependencies, monitor, deferred) + return GoalImpl(this, dependencies, deferred) } /** * Create a one-to-one goal based on existing goal */ -fun Goal.pipe(block: suspend GoalMonitor.(T) -> R): Goal = createGoal(listOf(this)) { block(await()) } +fun Goal.pipe( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.(T) -> R +): Goal = createGoal(listOf(this), context) { block(await()) } /** * Create a joining goal. @@ -103,8 +110,9 @@ fun Goal.pipe(block: suspend GoalMonitor.(T) -> R): Goal = createGo */ fun Collection>.join( scope: CoroutineScope = first(), - block: suspend GoalMonitor.(Collection) -> R + context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.(Collection) -> R ): Goal = - scope.createGoal(this) { + scope.createGoal(this, context) { block(map { it.await() }) } \ No newline at end of file diff --git a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/GroupBuilder.kt b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/GroupBuilder.kt new file mode 100644 index 00000000..a47dd4c6 --- /dev/null +++ b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/GroupBuilder.kt @@ -0,0 +1,70 @@ +/* + * Copyright 2015 Alexander Nozik. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package hep.dataforge.data + +import hep.dataforge.meta.Meta +import hep.dataforge.meta.get +import hep.dataforge.meta.string +import java.util.* + +interface GroupRule { + operator fun invoke(node: DataNode): Map> +} + +/** + * The class to builder groups of content with annotation defined rules + * + * @author Alexander Nozik + */ + +object GroupBuilder { + + /** + * Create grouping rule that creates groups for different values of value + * field with name [key] + * + * @param key + * @param defaultTagValue + * @return + */ + fun byValue(key: String, defaultTagValue: String): GroupRule = object : GroupRule { + override fun invoke(node: DataNode): Map> { + val map = HashMap>() + + node.dataSequence().forEach { (name, data) -> + val tagValue = data.meta[key]?.string ?: defaultTagValue + map.getOrPut(tagValue) { DataNode.builder(node.type) }[name] = data + } + + return map.mapValues { it.value.build() } + } + } + + + // @ValueDef(key = "byValue", required = true, info = "The name of annotation value by which grouping should be made") +// @ValueDef( +// key = "defaultValue", +// def = "default", +// info = "Default value which should be used for content in which the grouping value is not presented" +// ) + fun byMeta(config: Meta): GroupRule { + //TODO expand grouping options + return config["byValue"]?.string?.let { byValue(it, config["defaultValue"]?.string ?: "default") } + ?: object : GroupRule { + override fun invoke(node: DataNode): Map> = mapOf("" to node) + } + } +} diff --git a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/JoinAction.kt b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/JoinAction.kt new file mode 100644 index 00000000..5d9f2517 --- /dev/null +++ b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/JoinAction.kt @@ -0,0 +1,113 @@ +package hep.dataforge.data + +import hep.dataforge.meta.Laminate +import hep.dataforge.meta.Meta +import hep.dataforge.meta.MetaBuilder +import hep.dataforge.names.Name +import java.util.stream.Collectors +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.reflect.KClass +import kotlin.reflect.full.isSuperclassOf + + +class JoinGroup(var name: String, internal val node: DataNode) { + + var meta: MetaBuilder = MetaBuilder() + + lateinit var result: suspend ActionEnv.(Map) -> R + + fun result(f: suspend ActionEnv.(Map) -> R) { + this.result = f; + } + +} + +class JoinGroupBuilder { + private val groupRules: MutableList<(DataNode) -> List>> = ArrayList(); + + /** + * introduce grouping by value name + */ + fun byValue(tag: String, defaultTag: String = "@default", action: JoinGroup.() -> Unit) { + groupRules += { node -> + GroupBuilder.byValue(tag, defaultTag).invoke(node).map { + JoinGroup(it.key, it.value).apply(action) + } + } + } + + /** + * Add a single fixed group to grouping rules + */ + fun group(groupName: String, filter: DataFilter, action: JoinGroup.() -> Unit) { + groupRules += { node -> + listOf( + JoinGroup(groupName, node.filter(filter)).apply(action) + ) + } + } + + /** + * Apply transformation to the whole node + */ + fun result(resultName: String, f: suspend ActionEnv.(Map) -> R) { + groupRules += { node -> + listOf(JoinGroup(resultName, node).apply { result(f) }) + } + } + + internal fun buildGroups(input: DataNode): List> { + return groupRules.flatMap { it.invoke(input) } + } + +} + + +/** + * The same rules as for KPipe + */ +class JoinAction( + val inputType: KClass, + val outputType: KClass, + val context: CoroutineContext = EmptyCoroutineContext, + private val action: JoinGroupBuilder.() -> Unit +) : Action { + + override fun invoke(node: DataNode, meta: Meta): DataNode { + if (!this.inputType.isSuperclassOf(node.type)) { + error("$inputType expected, but ${node.type} received") + } + return DataNode.build(outputType) { + JoinGroupBuilder().apply(action).buildGroups(node).forEach { group -> + + val laminate = Laminate(group.meta, meta) + + val goalMap: Map> = group.node + .dataStream() + .filter { it.isValid } + .collect(Collectors.toMap({ it.name }, { it.goal })) + + val groupName: String = group.name; + + if (groupName.isEmpty()) { + throw AnonymousNotAlowedException("Anonymous groups are not allowed"); + } + + val env = ActionEnv( + context, + groupName, + laminate.builder, + context.history.getChronicle(Name.joinString(groupName, name)) + ) + + val dispatcher = context + getExecutorService(context, group.meta).asCoroutineDispatcher() + + val goal = goalMap.join(dispatcher) { group.result.invoke(env, it) } + val res = NamedData(env.name, outputType, goal, env.meta) + builder.add(res) + } + + } + } +} diff --git a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/PipeAction.kt b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/PipeAction.kt new file mode 100644 index 00000000..b52830d3 --- /dev/null +++ b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/PipeAction.kt @@ -0,0 +1,60 @@ +package hep.dataforge.data + +import hep.dataforge.meta.* +import hep.dataforge.names.Name +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.reflect.KClass +import kotlin.reflect.full.isSuperclassOf + +class ActionEnv(val name: Name, val meta: Meta) + + +/** + * Action environment + */ +class PipeBuilder(var name: Name, var meta: MetaBuilder) { + lateinit var result: suspend ActionEnv.(T) -> R; + + /** + * Calculate the result of goal + */ + fun result(f: suspend ActionEnv.(T) -> R) { + result = f; + } +} + + +abstract class PipeAction( + val inputType: KClass, + val outputType: KClass, + val context: CoroutineContext = EmptyCoroutineContext, + private val block: PipeBuilder.() -> Unit +) : Action { + + override fun invoke(node: DataNode, meta: Meta): DataNode { + if (!this.inputType.isSuperclassOf(node.type)) { + error("$inputType expected, but ${node.type} received") + } + return DataNode.build(outputType) { + node.dataSequence().forEach { (name, data) -> + //merging data meta with action meta (data meta is primary) + val oldMeta = meta.builder().apply { update(data.meta) } + // creating environment from old meta and name + val env = ActionEnv(name, oldMeta) + //applying transformation from builder + val builder = PipeBuilder(name, oldMeta).apply(block) + //getting new name + val newName = builder.name + //getting new meta + val newMeta = builder.meta.seal() + //creating a goal with custom context if provided + val goal = data.goal.pipe(context) { builder.result(env, it) } + //setting the data node + this[newName] = Data.of(outputType, goal, newMeta) + } + } + } +} + + diff --git a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/SplitAction.kt b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/SplitAction.kt new file mode 100644 index 00000000..880c065f --- /dev/null +++ b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/SplitAction.kt @@ -0,0 +1,90 @@ +package hep.dataforge.data + +import hep.dataforge.meta.Laminate +import hep.dataforge.meta.Meta +import hep.dataforge.meta.MetaBuilder +import hep.dataforge.names.Name +import kotlinx.coroutines.runBlocking + + +class FragmentEnv(val context: Context, val name: String, var meta: MetaBuilder, val log: Chronicle) { + lateinit var result: suspend (T) -> R + + fun result(f: suspend (T) -> R) { + result = f; + } +} + + +class SplitBuilder(val context: Context, val name: String, val meta: Meta) { + internal val fragments: MutableMap.() -> Unit> = HashMap() + + /** + * Add new fragment building rule. If the framgent not defined, result won't be available even if it is present in the map + * @param name the name of a fragment + * @param rule the rule to transform fragment name and meta using + */ + fun fragment(name: String, rule: FragmentEnv.() -> Unit) { + fragments[name] = rule + } +} + +class KSplit( + actionName: String, + inputType: Class, + outputType: Class, + private val action: SplitBuilder.() -> Unit +) : GenericAction(actionName, inputType, outputType) { + + override fun run(context: Context, data: DataNode, actionMeta: Meta): DataNode { + if (!this.inputType.isAssignableFrom(data.type)) { + throw RuntimeException("Type mismatch in action $name. $inputType expected, but ${data.type} received") + } + + val builder = DataSet.edit(outputType) + + + runBlocking { + data.dataStream(true).forEach { + + val laminate = Laminate(it.meta, actionMeta) + + val split = SplitBuilder(context, it.name, it.meta).apply(action) + + + val dispatcher = context + getExecutorService(context, laminate).asCoroutineDispatcher() + + // Create a map of results in a single goal + //val commonGoal = it.goal.pipe(dispatcher) { split.result.invoke(env, it) } + + // apply individual fragment rules to result + split.fragments.forEach { name, rule -> + val env = FragmentEnv( + context, + it.name, + laminate.builder, + context.history.getChronicle(Name.joinString(it.name, name)) + ) + + rule.invoke(env) + + val goal = it.goal.pipe(dispatcher, env.result) + + val res = NamedData(env.name, outputType, goal, env.meta) + builder.add(res) + } + } + } + + return builder.build(); + } +} + +inline fun DataNode.pipe( + context: Context, + meta: Meta, + name: String = "pipe", + noinline action: PipeBuilder.() -> Unit +): DataNode { + return KPipe(name, T::class.java, R::class.java, action).run(context, this, meta); +} \ No newline at end of file diff --git a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/_Data.kt b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/data.kt similarity index 100% rename from dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/_Data.kt rename to dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/data.kt diff --git a/dataforge-meta/src/commonMain/kotlin/hep/dataforge/descriptors/Described.kt b/dataforge-meta/src/commonMain/kotlin/hep/dataforge/descriptors/Described.kt new file mode 100644 index 00000000..a0780a3e --- /dev/null +++ b/dataforge-meta/src/commonMain/kotlin/hep/dataforge/descriptors/Described.kt @@ -0,0 +1,5 @@ +package hep.dataforge.descriptors + +interface Described { + val descriptor: NodeDescriptor +} \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Task.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Task.kt index bb6d33f3..0b11a7d5 100644 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Task.kt +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Task.kt @@ -2,13 +2,14 @@ package hep.dataforge.workspace import hep.dataforge.context.Named import hep.dataforge.data.DataNode +import hep.dataforge.descriptors.Described import hep.dataforge.meta.Meta import hep.dataforge.provider.Type import hep.dataforge.workspace.Task.Companion.TYPE import kotlin.reflect.KClass @Type(TYPE) -interface Task : Named { +interface Task : Named, Described { /** * Terminal task is the one that could not build model lazily */ @@ -41,10 +42,11 @@ interface Task : Named { * Run given task model. Type check expected to be performed before actual * calculation. * - * @param model + * @param workspace - a workspace to run task model in + * @param model - a model to be executed * @return */ - fun run(model: TaskModel): DataNode + fun run(workspace: Workspace, model: TaskModel): DataNode companion object { const val TYPE = "task" diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Workspace.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Workspace.kt index b0b1cf4d..40c45aaf 100644 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Workspace.kt +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Workspace.kt @@ -54,7 +54,7 @@ interface Workspace : ContextAware, Provider { try { val model = build(this@Workspace, config) validate(model) - return run(model) + return run(this@Workspace, model) } finally { context.deactivate(this) } diff --git a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/KTask.kt b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/KTask.kt new file mode 100644 index 00000000..7717e9ec --- /dev/null +++ b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/KTask.kt @@ -0,0 +1,180 @@ +package hep.dataforge.workspace + +import hep.dataforge.data.Action +import hep.dataforge.data.DataNode +import hep.dataforge.data.DataTree +import hep.dataforge.descriptors.NodeDescriptor +import hep.dataforge.meta.Meta +import hep.dataforge.names.Name +import kotlin.reflect.KClass + +class KTask( + override val name: String, + type: KClass, + descriptor: NodeDescriptor? = null, + private val modelTransform: TaskModel.Builder.(Meta) -> Unit, + private val dataTransform: TaskModel.(DataNode) -> DataNode +) : AbstractTask(type.java, descriptor) { + + override fun run(model: TaskModel, data: DataNode): DataNode { + model.context.logger.info("Starting task '$name' on data node ${data.name} with meta: \n${model.meta}") + return dataTransform.invoke(model, data); + } + + override fun buildModel(model: TaskModel.Builder, meta: Meta) { + modelTransform.invoke(model, meta); + } + + //TODO add validation +} + +class KTaskBuilder(val name: String) { + private var modelTransform: TaskModel.Builder.(Meta) -> Unit = { data("*") } + var descriptor: NodeDescriptor? = null + + private class DataTransformation( + val from: String = "", + val to: String = "", + val transform: TaskModel.(DataNode) -> DataNode + ) { + operator fun invoke(model: TaskModel, node: DataNode): DataNode { + val localData = if (from.isEmpty()) { + node + } else { + node.getNode(from) + } + return transform.invoke(model, localData); + } + } + + private val dataTransforms: MutableList = ArrayList(); + + fun model(modelTransform: TaskModel.Builder.(Meta) -> Unit) { + this.modelTransform = modelTransform + } + + fun transform(inputType: Class, from: String = "", to: String = "", transform: TaskModel.(DataNode) -> DataNode) { + dataTransforms += DataTransformation(from, to) { data: DataNode -> + transform.invoke(this, data.checked(inputType)) + } + } + + inline fun transform(from: String = "", to: String = "", noinline transform: TaskModel.(DataNode) -> DataNode) { + transform(T::class.java, from, to, transform) + } + + /** + * Perform given action on data elements in `from` node in input and put the result to `to` node + */ + inline fun action(action: Action, from: String = "", to: String = "") { + transform(from, to){ data: DataNode -> + action.run(context, data, meta) + } + } + + inline fun pipeAction( + actionName: String = "pipe", + from: String = "", + to: String = "", + noinline action: PipeBuilder.() -> Unit) { + val pipe: Action = KPipe( + actionName = Name.joinString(name, actionName), + inputType = T::class.java, + outputType = R::class.java, + action = action + ) + action(pipe, from, to); + } + + inline fun pipe( + actionName: String = "pipe", + from: String = "", + to: String = "", + noinline action: suspend ActionEnv.(T) -> R) { + val pipe: Action = KPipe( + actionName = Name.joinString(name, actionName), + inputType = T::class.java, + outputType = R::class.java, + action = { result(action) } + ) + action(pipe, from, to); + } + + + inline fun joinAction( + actionName: String = "join", + from: String = "", + to: String = "", + noinline action: JoinGroupBuilder.() -> Unit) { + val join: Action = KJoin( + actionName = Name.joinString(name, actionName), + inputType = T::class.java, + outputType = R::class.java, + action = action + ) + action(join, from, to); + } + + inline fun join( + actionName: String = name, + from: String = "", + to: String = "", + noinline action: suspend ActionEnv.(Map) -> R) { + val join: Action = KJoin( + actionName = Name.joinString(name, actionName), + inputType = T::class.java, + outputType = R::class.java, + action = { + result(meta.getString("@target", actionName), action) + } + ) + action(join, from, to); + } + + inline fun splitAction( + actionName: String = "split", + from: String = "", + to: String = "", + noinline action: SplitBuilder.() -> Unit) { + val split: Action = KSplit( + actionName = Name.joinString(name, actionName), + inputType = T::class.java, + outputType = R::class.java, + action = action + ) + action(split, from, to); + } + + /** + * Use DSL to create a descriptor for this task + */ + fun descriptor(transform: DescriptorBuilder.() -> Unit) { + this.descriptor = DescriptorBuilder(name).apply(transform).build() + } + + fun build(): KTask { + val transform: TaskModel.(DataNode) -> DataNode = { data -> + if (dataTransforms.isEmpty()) { + //return data node as is + logger.warn("No transformation present, returning input data") + data.checked(Any::class.java) + } else { + val builder: DataNodeEditor = DataTree.edit(Any::class.java) + dataTransforms.forEach { + val res = it(this, data) + if (it.to.isEmpty()) { + builder.update(res) + } else { + builder.putNode(it.to, res) + } + } + builder.build() + } + } + return KTask(name, Any::class, descriptor, modelTransform, transform); + } +} + +fun task(name: String, builder: KTaskBuilder.() -> Unit): KTask { + return KTaskBuilder(name).apply(builder).build(); +} \ No newline at end of file