Migrating action builders

This commit is contained in:
Alexander Nozik 2019-03-20 22:12:27 +03:00
parent 00c9e70c09
commit 4e31bef631
15 changed files with 604 additions and 74 deletions

View File

@ -11,6 +11,7 @@ kotlin {
val commonMain by getting{
dependencies {
api(project(":dataforge-meta"))
api(kotlin("reflect"))
api("org.jetbrains.kotlinx:kotlinx-coroutines-core-common:$coroutinesVersion")
}
}

View File

@ -34,28 +34,19 @@ infix fun <T : Any, I : Any, R : Any> Action<T, I>.then(action: Action<I, R>): A
}
}
/**
* An action that performs the same transformation on each of input data nodes. Null results are ignored.
*/
class PipeAction<in T : Any, out R : Any>(val transform: (Name, Data<T>, Meta) -> Data<R>?) : Action<T, R> {
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> = DataNode.build {
node.dataSequence().forEach { (name, data) ->
val res = transform(name, data, meta)
if (res != null) {
set(name, res)
}
}
}
companion object {
/**
* A simple pipe that performs transformation on the data and copies input meta into the output
*/
inline fun <T : Any, reified R : Any> simple(noinline transform: suspend (Name, T, Meta) -> R) =
PipeAction { name, data: Data<T>, meta ->
val goal = data.goal.pipe { transform(name, it, meta) }
return@PipeAction Data.of(goal, data.meta)
}
}
}
///**
// * An action that performs the same transformation on each of input data nodes. Null results are ignored.
// * The transformation is non-suspending because it is lazy.
// */
//class PipeAction<in T : Any, out R : Any>(val transform: (Name, Data<T>, Meta) -> Data<R>?) : Action<T, R> {
// override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> = DataNode.build {
// node.dataSequence().forEach { (name, data) ->
// val res = transform(name, data, meta)
// if (res != null) {
// set(name, res)
// }
// }
// }
//}

View File

@ -2,6 +2,7 @@ package hep.dataforge.data
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaRepr
import kotlinx.coroutines.CoroutineScope
import kotlin.coroutines.CoroutineContext
import kotlin.reflect.KClass
@ -36,8 +37,8 @@ interface Data<out T : Any> : MetaRepr {
inline fun <reified T : Any> of(name: String, goal: Goal<T>, meta: Meta): Data<T> =
of(name, T::class, goal, meta)
fun <T : Any> static(context: CoroutineContext, value: T, meta: Meta): Data<T> =
DataImpl(value::class, Goal.static(context, value), meta)
fun <T : Any> static(scope: CoroutineScope, value: T, meta: Meta): Data<T> =
DataImpl(value::class, Goal.static(scope, value), meta)
}
}

View File

@ -22,7 +22,7 @@ class DataFilter(override val config: Config) : Specification {
fun <T : Any> DataNode<T>.filter(filter: DataFilter): DataNode<T> {
val sourceNode = filter.from?.let { getNode(it.toName()) } ?: this@filter
val regex = filter.pattern.toRegex()
val targetNode = DataTreeBuilder<T>().apply {
val targetNode = DataTreeBuilder(type).apply {
sourceNode.dataSequence().forEach { (name, data) ->
if (name.toString().matches(regex)) {
this[name] = data
@ -30,7 +30,7 @@ fun <T : Any> DataNode<T>.filter(filter: DataFilter): DataNode<T> {
}
}
return filter.to?.let {
DataTreeBuilder<T>().apply { this[it.toName()] = targetNode }.build()
DataTreeBuilder(type).apply { this[it.toName()] = targetNode }.build()
} ?: targetNode.build()
}

View File

@ -1,15 +1,18 @@
package hep.dataforge.data
import hep.dataforge.names.Name
import hep.dataforge.names.NameToken
import hep.dataforge.names.plus
import hep.dataforge.names.toName
import hep.dataforge.names.asName
import hep.dataforge.names.*
import kotlin.reflect.KClass
/**
* A tree-like data structure grouped into the node. All data inside the node must inherit its type
*/
interface DataNode<out T : Any> {
/**
* The minimal common ancestor to all data in the node
*/
val type: KClass<out T>
/**
* Get the specific data if it exists
*/
@ -35,7 +38,10 @@ interface DataNode<out T : Any> {
companion object {
const val TYPE = "dataNode"
fun <T : Any> build(block: DataTreeBuilder<T>.() -> Unit) = DataTreeBuilder<T>().apply(block).build()
fun <T : Any> build(type: KClass<out T>, block: DataTreeBuilder<T>.() -> Unit) =
DataTreeBuilder<T>(type).apply(block).build()
fun <T : Any> builder(type: KClass<out T>) = DataTreeBuilder(type)
}
}
@ -45,7 +51,10 @@ internal sealed class DataTreeItem<out T : Any> {
class Value<out T : Any>(val value: Data<T>) : DataTreeItem<T>()
}
class DataTree<out T : Any> internal constructor(private val items: Map<NameToken, DataTreeItem<T>>) : DataNode<T> {
class DataTree<out T : Any> internal constructor(
override val type: KClass<out T>,
private val items: Map<NameToken, DataTreeItem<T>>
) : DataNode<T> {
//TODO add node-level meta?
override fun get(name: Name): Data<T>? = when (name.length) {
@ -97,7 +106,7 @@ private sealed class DataTreeBuilderItem<out T : Any> {
/**
* A builder for a DataTree.
*/
class DataTreeBuilder<T : Any> {
class DataTreeBuilder<T : Any>(private val type: KClass<out T>) {
private val map = HashMap<NameToken, DataTreeBuilderItem<T>>()
operator fun set(token: NameToken, node: DataTreeBuilder<T>) {
@ -112,7 +121,7 @@ class DataTreeBuilder<T : Any> {
private fun buildNode(token: NameToken): DataTreeBuilder<T> {
return if (!map.containsKey(token)) {
DataTreeBuilder<T>().also { map[token] = DataTreeBuilderItem.Node(it) }
DataTreeBuilder<T>(type).also { map[token] = DataTreeBuilderItem.Node(it) }
} else {
(map[token] as? DataTreeBuilderItem.Node ?: error("The node with name $token is occupied by leaf")).tree
}
@ -157,7 +166,7 @@ class DataTreeBuilder<T : Any> {
/**
* Build and append node
*/
infix fun String.to(block: DataTreeBuilder<T>.() -> Unit) = set(toName(), DataTreeBuilder<T>().apply(block))
infix fun String.to(block: DataTreeBuilder<T>.() -> Unit) = set(toName(), DataTreeBuilder<T>(type).apply(block))
fun build(): DataTree<T> {
val resMap = map.mapValues { (_, value) ->
@ -166,14 +175,14 @@ class DataTreeBuilder<T : Any> {
is DataTreeBuilderItem.Node -> DataTreeItem.Node(value.tree.build())
}
}
return DataTree(resMap)
return DataTree(type, resMap)
}
}
/**
* Generate a mutable builder from this node. Node content is not changed
*/
fun <T : Any> DataNode<T>.builder(): DataTreeBuilder<T> = DataTreeBuilder<T>().apply {
fun <T : Any> DataNode<T>.builder(): DataTreeBuilder<T> = DataTreeBuilder(type).apply {
dataSequence().forEach { (name, data) -> this[name] = data }
}
@ -182,7 +191,7 @@ fun <T : Any> DataNode<T>.builder(): DataTreeBuilder<T> = DataTreeBuilder<T>().a
*/
fun DataNode<*>.startAll() = dataSequence().forEach { (_, data) -> data.goal.start() }
fun <T : Any> DataNode<T>.filter(predicate: (Name, Data<T>) -> Boolean): DataNode<T> = DataNode.build {
fun <T : Any> DataNode<T>.filter(predicate: (Name, Data<T>) -> Boolean): DataNode<T> = DataNode.build(type) {
dataSequence().forEach { (name, data) ->
if (predicate(name, data)) {
this[name] = data

View File

@ -2,26 +2,28 @@ package hep.dataforge.data
import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
/**
* A special deferred with explicit dependencies and some additional information like progress and unique id
*/
interface Goal<out T> : Deferred<T>, CoroutineScope {
val scope: CoroutineScope
override val coroutineContext get() = scope.coroutineContext
val dependencies: Collection<Goal<*>>
val status: String
val totalWork: Double
val workDone: Double
val totalWork: Double get() = dependencies.sumByDouble { totalWork } + (monitor?.totalWork ?: 0.0)
val workDone: Double get() = dependencies.sumByDouble { workDone } + (monitor?.workDone ?: 0.0)
val status: String get() = monitor?.status ?: ""
val progress: Double get() = workDone / totalWork
companion object {
/**
* Create goal wrapping static value. This goal is always completed
*/
fun <T> static(context: CoroutineContext, value: T): Goal<T> =
StaticGoalImpl(context, CompletableDeferred(value))
fun <T> static(scope: CoroutineScope, value: T): Goal<T> =
StaticGoalImpl(scope, CompletableDeferred(value))
}
}
@ -52,24 +54,20 @@ class GoalMonitor : CoroutineContext.Element {
companion object : CoroutineContext.Key<GoalMonitor>
}
private class GoalImpl<T>(
override val dependencies: Collection<Goal<*>>,
val monitor: GoalMonitor,
deferred: Deferred<T>
) : Goal<T>, Deferred<T> by deferred {
override val coroutineContext: CoroutineContext get() = this
override val totalWork: Double get() = dependencies.sumByDouble { totalWork } + monitor.totalWork
override val workDone: Double get() = dependencies.sumByDouble { workDone } + monitor.workDone
override val status: String get() = monitor.status
}
val CoroutineScope.monitor: GoalMonitor? get() = coroutineContext[GoalMonitor]
private class StaticGoalImpl<T>(val context: CoroutineContext, deferred: CompletableDeferred<T>) : Goal<T>,
private class GoalImpl<T>(
override val scope: CoroutineScope,
override val dependencies: Collection<Goal<*>>,
deferred: Deferred<T>
) : Goal<T>, Deferred<T> by deferred
private class StaticGoalImpl<T>(override val scope: CoroutineScope, deferred: CompletableDeferred<T>) : Goal<T>,
Deferred<T> by deferred {
override val dependencies: Collection<Goal<*>> get() = emptyList()
override val status: String get() = ""
override val totalWork: Double get() = 0.0
override val workDone: Double get() = 0.0
override val coroutineContext: CoroutineContext get() = context
}
@ -79,23 +77,32 @@ private class StaticGoalImpl<T>(val context: CoroutineContext, deferred: Complet
*
* **Important:** Unlike regular deferred, the [Goal] is started lazily, so the actual calculation is called only when result is requested.
*/
fun <R> CoroutineScope.createGoal(dependencies: Collection<Goal<*>>, block: suspend GoalMonitor.() -> R): Goal<R> {
val monitor = GoalMonitor()
val deferred = async(start = CoroutineStart.LAZY) {
fun <R> CoroutineScope.createGoal(
dependencies: Collection<Goal<*>>,
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> R
): Goal<R> {
val deferred = async(context + GoalMonitor(), start = CoroutineStart.LAZY) {
dependencies.forEach { it.start() }
monitor.start()
return@async supervisorScope { monitor.block() }
}.also {
monitor.finish()
monitor?.start()
//Running in supervisor scope in order to allow manual error handling
return@async supervisorScope {
block().also {
monitor?.finish()
}
}
}
return GoalImpl(dependencies, monitor, deferred)
return GoalImpl(this, dependencies, deferred)
}
/**
* Create a one-to-one goal based on existing goal
*/
fun <T, R> Goal<T>.pipe(block: suspend GoalMonitor.(T) -> R): Goal<R> = createGoal(listOf(this)) { block(await()) }
fun <T, R> Goal<T>.pipe(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(T) -> R
): Goal<R> = createGoal(listOf(this), context) { block(await()) }
/**
* Create a joining goal.
@ -103,8 +110,9 @@ fun <T, R> Goal<T>.pipe(block: suspend GoalMonitor.(T) -> R): Goal<R> = createGo
*/
fun <T, R> Collection<Goal<T>>.join(
scope: CoroutineScope = first(),
block: suspend GoalMonitor.(Collection<T>) -> R
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(Collection<T>) -> R
): Goal<R> =
scope.createGoal(this) {
scope.createGoal(this, context) {
block(map { it.await() })
}

View File

@ -0,0 +1,70 @@
/*
* Copyright 2015 Alexander Nozik.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hep.dataforge.data
import hep.dataforge.meta.Meta
import hep.dataforge.meta.get
import hep.dataforge.meta.string
import java.util.*
interface GroupRule {
operator fun <T : Any> invoke(node: DataNode<T>): Map<String, DataNode<T>>
}
/**
* The class to builder groups of content with annotation defined rules
*
* @author Alexander Nozik
*/
object GroupBuilder {
/**
* Create grouping rule that creates groups for different values of value
* field with name [key]
*
* @param key
* @param defaultTagValue
* @return
*/
fun byValue(key: String, defaultTagValue: String): GroupRule = object : GroupRule {
override fun <T : Any> invoke(node: DataNode<T>): Map<String, DataNode<T>> {
val map = HashMap<String, DataTreeBuilder<T>>()
node.dataSequence().forEach { (name, data) ->
val tagValue = data.meta[key]?.string ?: defaultTagValue
map.getOrPut(tagValue) { DataNode.builder(node.type) }[name] = data
}
return map.mapValues { it.value.build() }
}
}
// @ValueDef(key = "byValue", required = true, info = "The name of annotation value by which grouping should be made")
// @ValueDef(
// key = "defaultValue",
// def = "default",
// info = "Default value which should be used for content in which the grouping value is not presented"
// )
fun byMeta(config: Meta): GroupRule {
//TODO expand grouping options
return config["byValue"]?.string?.let { byValue(it, config["defaultValue"]?.string ?: "default") }
?: object : GroupRule {
override fun <T : Any> invoke(node: DataNode<T>): Map<String, DataNode<T>> = mapOf("" to node)
}
}
}

View File

@ -0,0 +1,113 @@
package hep.dataforge.data
import hep.dataforge.meta.Laminate
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
import hep.dataforge.names.Name
import java.util.stream.Collectors
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KClass
import kotlin.reflect.full.isSuperclassOf
class JoinGroup<T : Any, R : Any>(var name: String, internal val node: DataNode<T>) {
var meta: MetaBuilder = MetaBuilder()
lateinit var result: suspend ActionEnv.(Map<String, T>) -> R
fun result(f: suspend ActionEnv.(Map<String, T>) -> R) {
this.result = f;
}
}
class JoinGroupBuilder<T : Any, R : Any> {
private val groupRules: MutableList<(DataNode<T>) -> List<JoinGroup<T, R>>> = ArrayList();
/**
* introduce grouping by value name
*/
fun byValue(tag: String, defaultTag: String = "@default", action: JoinGroup<T, R>.() -> Unit) {
groupRules += { node ->
GroupBuilder.byValue(tag, defaultTag).invoke(node).map {
JoinGroup<T, R>(it.key, it.value).apply(action)
}
}
}
/**
* Add a single fixed group to grouping rules
*/
fun group(groupName: String, filter: DataFilter, action: JoinGroup<T, R>.() -> Unit) {
groupRules += { node ->
listOf(
JoinGroup<T, R>(groupName, node.filter(filter)).apply(action)
)
}
}
/**
* Apply transformation to the whole node
*/
fun result(resultName: String, f: suspend ActionEnv.(Map<String, T>) -> R) {
groupRules += { node ->
listOf(JoinGroup<T, R>(resultName, node).apply { result(f) })
}
}
internal fun buildGroups(input: DataNode<T>): List<JoinGroup<T, R>> {
return groupRules.flatMap { it.invoke(input) }
}
}
/**
* The same rules as for KPipe
*/
class JoinAction<T : Any, R : Any>(
val inputType: KClass<T>,
val outputType: KClass<R>,
val context: CoroutineContext = EmptyCoroutineContext,
private val action: JoinGroupBuilder<T, R>.() -> Unit
) : Action<T, R> {
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> {
if (!this.inputType.isSuperclassOf(node.type)) {
error("$inputType expected, but ${node.type} received")
}
return DataNode.build(outputType) {
JoinGroupBuilder<T, R>().apply(action).buildGroups(node).forEach { group ->
val laminate = Laminate(group.meta, meta)
val goalMap: Map<String, Goal<T>> = group.node
.dataStream()
.filter { it.isValid }
.collect(Collectors.toMap({ it.name }, { it.goal }))
val groupName: String = group.name;
if (groupName.isEmpty()) {
throw AnonymousNotAlowedException("Anonymous groups are not allowed");
}
val env = ActionEnv(
context,
groupName,
laminate.builder,
context.history.getChronicle(Name.joinString(groupName, name))
)
val dispatcher = context + getExecutorService(context, group.meta).asCoroutineDispatcher()
val goal = goalMap.join(dispatcher) { group.result.invoke(env, it) }
val res = NamedData(env.name, outputType, goal, env.meta)
builder.add(res)
}
}
}
}

View File

@ -0,0 +1,60 @@
package hep.dataforge.data
import hep.dataforge.meta.*
import hep.dataforge.names.Name
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KClass
import kotlin.reflect.full.isSuperclassOf
class ActionEnv(val name: Name, val meta: Meta)
/**
* Action environment
*/
class PipeBuilder<T, R>(var name: Name, var meta: MetaBuilder) {
lateinit var result: suspend ActionEnv.(T) -> R;
/**
* Calculate the result of goal
*/
fun result(f: suspend ActionEnv.(T) -> R) {
result = f;
}
}
abstract class PipeAction<T : Any, R : Any>(
val inputType: KClass<T>,
val outputType: KClass<R>,
val context: CoroutineContext = EmptyCoroutineContext,
private val block: PipeBuilder<T, R>.() -> Unit
) : Action<T, R> {
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> {
if (!this.inputType.isSuperclassOf(node.type)) {
error("$inputType expected, but ${node.type} received")
}
return DataNode.build(outputType) {
node.dataSequence().forEach { (name, data) ->
//merging data meta with action meta (data meta is primary)
val oldMeta = meta.builder().apply { update(data.meta) }
// creating environment from old meta and name
val env = ActionEnv(name, oldMeta)
//applying transformation from builder
val builder = PipeBuilder<T, R>(name, oldMeta).apply(block)
//getting new name
val newName = builder.name
//getting new meta
val newMeta = builder.meta.seal()
//creating a goal with custom context if provided
val goal = data.goal.pipe(context) { builder.result(env, it) }
//setting the data node
this[newName] = Data.of(outputType, goal, newMeta)
}
}
}
}

View File

@ -0,0 +1,90 @@
package hep.dataforge.data
import hep.dataforge.meta.Laminate
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
import hep.dataforge.names.Name
import kotlinx.coroutines.runBlocking
class FragmentEnv<T : Any, R : Any>(val context: Context, val name: String, var meta: MetaBuilder, val log: Chronicle) {
lateinit var result: suspend (T) -> R
fun result(f: suspend (T) -> R) {
result = f;
}
}
class SplitBuilder<T : Any, R : Any>(val context: Context, val name: String, val meta: Meta) {
internal val fragments: MutableMap<String, FragmentEnv<T, R>.() -> Unit> = HashMap()
/**
* Add new fragment building rule. If the framgent not defined, result won't be available even if it is present in the map
* @param name the name of a fragment
* @param rule the rule to transform fragment name and meta using
*/
fun fragment(name: String, rule: FragmentEnv<T, R>.() -> Unit) {
fragments[name] = rule
}
}
class KSplit<T : Any, R : Any>(
actionName: String,
inputType: Class<T>,
outputType: Class<R>,
private val action: SplitBuilder<T, R>.() -> Unit
) : GenericAction<T, R>(actionName, inputType, outputType) {
override fun run(context: Context, data: DataNode<out T>, actionMeta: Meta): DataNode<R> {
if (!this.inputType.isAssignableFrom(data.type)) {
throw RuntimeException("Type mismatch in action $name. $inputType expected, but ${data.type} received")
}
val builder = DataSet.edit(outputType)
runBlocking {
data.dataStream(true).forEach {
val laminate = Laminate(it.meta, actionMeta)
val split = SplitBuilder<T, R>(context, it.name, it.meta).apply(action)
val dispatcher = context + getExecutorService(context, laminate).asCoroutineDispatcher()
// Create a map of results in a single goal
//val commonGoal = it.goal.pipe(dispatcher) { split.result.invoke(env, it) }
// apply individual fragment rules to result
split.fragments.forEach { name, rule ->
val env = FragmentEnv<T, R>(
context,
it.name,
laminate.builder,
context.history.getChronicle(Name.joinString(it.name, name))
)
rule.invoke(env)
val goal = it.goal.pipe(dispatcher, env.result)
val res = NamedData(env.name, outputType, goal, env.meta)
builder.add(res)
}
}
}
return builder.build();
}
}
inline fun <reified T : Any, reified R : Any> DataNode<T>.pipe(
context: Context,
meta: Meta,
name: String = "pipe",
noinline action: PipeBuilder<T, R>.() -> Unit
): DataNode<R> {
return KPipe(name, T::class.java, R::class.java, action).run(context, this, meta);
}

View File

@ -0,0 +1,5 @@
package hep.dataforge.descriptors
interface Described {
val descriptor: NodeDescriptor
}

View File

@ -2,13 +2,14 @@ package hep.dataforge.workspace
import hep.dataforge.context.Named
import hep.dataforge.data.DataNode
import hep.dataforge.descriptors.Described
import hep.dataforge.meta.Meta
import hep.dataforge.provider.Type
import hep.dataforge.workspace.Task.Companion.TYPE
import kotlin.reflect.KClass
@Type(TYPE)
interface Task<out R : Any> : Named {
interface Task<out R : Any> : Named, Described {
/**
* Terminal task is the one that could not build model lazily
*/
@ -41,10 +42,11 @@ interface Task<out R : Any> : Named {
* Run given task model. Type check expected to be performed before actual
* calculation.
*
* @param model
* @param workspace - a workspace to run task model in
* @param model - a model to be executed
* @return
*/
fun run(model: TaskModel): DataNode<R>
fun run(workspace: Workspace, model: TaskModel): DataNode<R>
companion object {
const val TYPE = "task"

View File

@ -54,7 +54,7 @@ interface Workspace : ContextAware, Provider {
try {
val model = build(this@Workspace, config)
validate(model)
return run(model)
return run(this@Workspace, model)
} finally {
context.deactivate(this)
}

View File

@ -0,0 +1,180 @@
package hep.dataforge.workspace
import hep.dataforge.data.Action
import hep.dataforge.data.DataNode
import hep.dataforge.data.DataTree
import hep.dataforge.descriptors.NodeDescriptor
import hep.dataforge.meta.Meta
import hep.dataforge.names.Name
import kotlin.reflect.KClass
class KTask<R : Any>(
override val name: String,
type: KClass<R>,
descriptor: NodeDescriptor? = null,
private val modelTransform: TaskModel.Builder.(Meta) -> Unit,
private val dataTransform: TaskModel.(DataNode<Any>) -> DataNode<R>
) : AbstractTask<R>(type.java, descriptor) {
override fun run(model: TaskModel, data: DataNode<Any>): DataNode<R> {
model.context.logger.info("Starting task '$name' on data node ${data.name} with meta: \n${model.meta}")
return dataTransform.invoke(model, data);
}
override fun buildModel(model: TaskModel.Builder, meta: Meta) {
modelTransform.invoke(model, meta);
}
//TODO add validation
}
class KTaskBuilder(val name: String) {
private var modelTransform: TaskModel.Builder.(Meta) -> Unit = { data("*") }
var descriptor: NodeDescriptor? = null
private class DataTransformation(
val from: String = "",
val to: String = "",
val transform: TaskModel.(DataNode<out Any>) -> DataNode<out Any>
) {
operator fun invoke(model: TaskModel, node: DataNode<Any>): DataNode<out Any> {
val localData = if (from.isEmpty()) {
node
} else {
node.getNode(from)
}
return transform.invoke(model, localData);
}
}
private val dataTransforms: MutableList<DataTransformation> = ArrayList();
fun model(modelTransform: TaskModel.Builder.(Meta) -> Unit) {
this.modelTransform = modelTransform
}
fun <T : Any> transform(inputType: Class<T>, from: String = "", to: String = "", transform: TaskModel.(DataNode<out T>) -> DataNode<out Any>) {
dataTransforms += DataTransformation(from, to) { data: DataNode<out Any> ->
transform.invoke(this, data.checked(inputType))
}
}
inline fun <reified T : Any> transform(from: String = "", to: String = "", noinline transform: TaskModel.(DataNode<out T>) -> DataNode<out Any>) {
transform(T::class.java, from, to, transform)
}
/**
* Perform given action on data elements in `from` node in input and put the result to `to` node
*/
inline fun <reified T : Any, reified R : Any> action(action: Action<T, R>, from: String = "", to: String = "") {
transform(from, to){ data: DataNode<out T> ->
action.run(context, data, meta)
}
}
inline fun <reified T : Any, reified R : Any> pipeAction(
actionName: String = "pipe",
from: String = "",
to: String = "",
noinline action: PipeBuilder<T, R>.() -> Unit) {
val pipe: Action<T, R> = KPipe(
actionName = Name.joinString(name, actionName),
inputType = T::class.java,
outputType = R::class.java,
action = action
)
action(pipe, from, to);
}
inline fun <reified T : Any, reified R : Any> pipe(
actionName: String = "pipe",
from: String = "",
to: String = "",
noinline action: suspend ActionEnv.(T) -> R) {
val pipe: Action<T, R> = KPipe(
actionName = Name.joinString(name, actionName),
inputType = T::class.java,
outputType = R::class.java,
action = { result(action) }
)
action(pipe, from, to);
}
inline fun <reified T : Any, reified R : Any> joinAction(
actionName: String = "join",
from: String = "",
to: String = "",
noinline action: JoinGroupBuilder<T, R>.() -> Unit) {
val join: Action<T, R> = KJoin(
actionName = Name.joinString(name, actionName),
inputType = T::class.java,
outputType = R::class.java,
action = action
)
action(join, from, to);
}
inline fun <reified T : Any, reified R : Any> join(
actionName: String = name,
from: String = "",
to: String = "",
noinline action: suspend ActionEnv.(Map<String, T>) -> R) {
val join: Action<T, R> = KJoin(
actionName = Name.joinString(name, actionName),
inputType = T::class.java,
outputType = R::class.java,
action = {
result(meta.getString("@target", actionName), action)
}
)
action(join, from, to);
}
inline fun <reified T : Any, reified R : Any> splitAction(
actionName: String = "split",
from: String = "",
to: String = "",
noinline action: SplitBuilder<T, R>.() -> Unit) {
val split: Action<T, R> = KSplit(
actionName = Name.joinString(name, actionName),
inputType = T::class.java,
outputType = R::class.java,
action = action
)
action(split, from, to);
}
/**
* Use DSL to create a descriptor for this task
*/
fun descriptor(transform: DescriptorBuilder.() -> Unit) {
this.descriptor = DescriptorBuilder(name).apply(transform).build()
}
fun build(): KTask<Any> {
val transform: TaskModel.(DataNode<Any>) -> DataNode<Any> = { data ->
if (dataTransforms.isEmpty()) {
//return data node as is
logger.warn("No transformation present, returning input data")
data.checked(Any::class.java)
} else {
val builder: DataNodeEditor<Any> = DataTree.edit(Any::class.java)
dataTransforms.forEach {
val res = it(this, data)
if (it.to.isEmpty()) {
builder.update(res)
} else {
builder.putNode(it.to, res)
}
}
builder.build()
}
}
return KTask(name, Any::class, descriptor, modelTransform, transform);
}
}
fun task(name: String, builder: KTaskBuilder.() -> Unit): KTask<Any> {
return KTaskBuilder(name).apply(builder).build();
}