Actions builder finished. DataNode cast

This commit is contained in:
Alexander Nozik 2019-03-22 19:03:52 +03:00
parent 0ddf64a423
commit 0ff915236c
18 changed files with 365 additions and 270 deletions

View File

@ -41,7 +41,7 @@ infix fun <T : Any, I : Any, R : Any> Action<T, I>.then(action: Action<I, R>): A
// */
//class PipeAction<in T : Any, out R : Any>(val transform: (Name, Data<T>, Meta) -> Data<R>?) : Action<T, R> {
// override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> = DataNode.build {
// node.dataSequence().forEach { (name, data) ->
// node.data().forEach { (name, data) ->
// val res = transform(name, data, meta)
// if (res != null) {
// set(name, res)

View File

@ -3,7 +3,6 @@ package hep.dataforge.data
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaRepr
import kotlinx.coroutines.CoroutineScope
import kotlin.coroutines.CoroutineContext
import kotlin.reflect.KClass
/**
@ -30,7 +29,9 @@ interface Data<out T : Any> : MetaRepr {
const val TYPE = "data"
fun <T : Any> of(type: KClass<out T>, goal: Goal<T>, meta: Meta): Data<T> = DataImpl(type, goal, meta)
inline fun <reified T : Any> of(goal: Goal<T>, meta: Meta): Data<T> = of(T::class, goal, meta)
fun <T : Any> of(name: String, type: KClass<out T>, goal: Goal<T>, meta: Meta): Data<T> =
NamedData(name, of(type, goal, meta))
@ -42,7 +43,18 @@ interface Data<out T : Any> : MetaRepr {
}
}
suspend fun <T: Any> Data<T>.await(): T = goal.await()
/**
* Upcast a [Data] to a supertype
*/
inline fun <reified R : Any, reified T : R> Data<T>.cast(): Data<R> {
return Data.of(R::class, goal, meta)
}
fun <R : Any, T : R> Data<T>.cast(type: KClass<R>): Data<R> {
return Data.of(type, goal, meta)
}
suspend fun <T : Any> Data<T>.await(): T = goal.await()
/**
* Generic Data implementation

View File

@ -23,7 +23,7 @@ fun <T : Any> DataNode<T>.filter(filter: DataFilter): DataNode<T> {
val sourceNode = filter.from?.let { getNode(it.toName()) } ?: this@filter
val regex = filter.pattern.toRegex()
val targetNode = DataTreeBuilder(type).apply {
sourceNode.dataSequence().forEach { (name, data) ->
sourceNode.data().forEach { (name, data) ->
if (name.toString().matches(regex)) {
this[name] = data
}

View File

@ -26,14 +26,14 @@ interface DataNode<out T : Any> {
/**
* Walk the tree upside down and provide all data nodes with full names
*/
fun dataSequence(): Sequence<Pair<Name, Data<T>>>
fun data(): Sequence<Pair<Name, Data<T>>>
/**
* A sequence of all nodes in the tree walking upside down, excluding self
*/
fun nodeSequence(): Sequence<Pair<Name, DataNode<T>>>
fun nodes(): Sequence<Pair<Name, DataNode<T>>>
operator fun iterator(): Iterator<Pair<Name, Data<T>>> = dataSequence().iterator()
operator fun iterator(): Iterator<Pair<Name, Data<T>>> = data().iterator()
companion object {
const val TYPE = "dataNode"
@ -43,7 +43,6 @@ interface DataNode<out T : Any> {
fun <T : Any> builder(type: KClass<out T>) = DataTreeBuilder(type)
}
}
internal sealed class DataTreeItem<out T : Any> {
@ -69,14 +68,14 @@ class DataTree<out T : Any> internal constructor(
else -> getNode(name.first()!!.asName())?.getNode(name.cutFirst())
}
override fun dataSequence(): Sequence<Pair<Name, Data<T>>> {
override fun data(): Sequence<Pair<Name, Data<T>>> {
return sequence {
items.forEach { (head, tree) ->
when (tree) {
is DataTreeItem.Value -> yield(head.asName() to tree.value)
is DataTreeItem.Node -> {
val subSequence =
tree.tree.dataSequence().map { (name, data) -> (head.asName() + name) to data }
tree.tree.data().map { (name, data) -> (head.asName() + name) to data }
yieldAll(subSequence)
}
}
@ -84,13 +83,13 @@ class DataTree<out T : Any> internal constructor(
}
}
override fun nodeSequence(): Sequence<Pair<Name, DataNode<T>>> {
override fun nodes(): Sequence<Pair<Name, DataNode<T>>> {
return sequence {
items.forEach { (head, tree) ->
if (tree is DataTreeItem.Node) {
yield(head.asName() to tree.tree)
val subSequence =
tree.tree.nodeSequence().map { (name, node) -> (head.asName() + name) to node }
tree.tree.nodes().map { (name, node) -> (head.asName() + name) to node }
yieldAll(subSequence)
}
}
@ -183,16 +182,16 @@ class DataTreeBuilder<T : Any>(private val type: KClass<out T>) {
* Generate a mutable builder from this node. Node content is not changed
*/
fun <T : Any> DataNode<T>.builder(): DataTreeBuilder<T> = DataTreeBuilder(type).apply {
dataSequence().forEach { (name, data) -> this[name] = data }
data().forEach { (name, data) -> this[name] = data }
}
/**
* Start computation for all goals in data node
*/
fun DataNode<*>.startAll() = dataSequence().forEach { (_, data) -> data.goal.start() }
fun DataNode<*>.startAll() = data().forEach { (_, data) -> data.goal.start() }
fun <T : Any> DataNode<T>.filter(predicate: (Name, Data<T>) -> Boolean): DataNode<T> = DataNode.build(type) {
dataSequence().forEach { (name, data) ->
data().forEach { (name, data) ->
if (predicate(name, data)) {
this[name] = data
}

View File

@ -112,7 +112,20 @@ fun <T, R> Collection<Goal<T>>.join(
scope: CoroutineScope = first(),
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(Collection<T>) -> R
): Goal<R> =
scope.createGoal(this, context) {
): Goal<R> = scope.createGoal(this, context) {
block(map { it.await() })
}
}
/**
* A joining goal for a map
* @param K type of the map key
* @param T type of the input goal
* @param R type of the result goal
*/
fun <K, T, R> Map<K, Goal<T>>.join(
scope: CoroutineScope = values.first(),
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(Map<K, T>) -> R
): Goal<R> = scope.createGoal(this.values, context) {
block(mapValues { it.value.await() })
}

View File

@ -0,0 +1,46 @@
package hep.dataforge.data
import hep.dataforge.names.Name
import kotlin.reflect.KClass
import kotlin.reflect.full.isSubclassOf
fun <T : Any, R : Any> Data<T>.safeCast(type: KClass<R>): Data<R>? {
return if (type.isSubclassOf(type)) {
@Suppress("UNCHECKED_CAST")
Data.of(type, goal as Goal<R>, meta)
} else {
null
}
}
/**
* Filter a node by data and node type. Resulting node and its subnodes is guaranteed to have border type [type],
* but could contain empty nodes
*/
fun <T : Any, R : Any> DataNode<T>.cast(type: KClass<out R>): DataNode<R> {
return if (this is CheckedDataNode) {
origin.cast(type)
} else {
CheckedDataNode(this, type)
}
}
inline fun <T : Any, reified R : Any> DataNode<T>.cast(): DataNode<R> = cast(R::class)
class CheckedDataNode<out T : Any>(val origin: DataNode<Any>, override val type: KClass<out T>) : DataNode<T> {
override fun get(name: Name): Data<T>? =
origin[name]?.safeCast(type)
override fun getNode(name: Name): DataNode<T>? {
return origin.getNode(name)?.cast(type)
}
override fun data(): Sequence<Pair<Name, Data<T>>> =
origin.data().mapNotNull { pair ->
pair.second.safeCast(type)?.let { pair.first to it }
}
override fun nodes(): Sequence<Pair<Name, DataNode<T>>> =
origin.nodes().map { it.first to it.second.cast(type) }
}

View File

@ -44,7 +44,7 @@ object GroupBuilder {
override fun <T : Any> invoke(node: DataNode<T>): Map<String, DataNode<T>> {
val map = HashMap<String, DataTreeBuilder<T>>()
node.dataSequence().forEach { (name, data) ->
node.data().forEach { (name, data) ->
val tagValue = data.meta[key]?.string ?: defaultTagValue
map.getOrPut(tagValue) { DataNode.builder(node.type) }[name] = data
}

View File

@ -6,7 +6,6 @@ import hep.dataforge.meta.MetaBuilder
import hep.dataforge.meta.builder
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import java.util.stream.Collectors
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KClass
@ -17,9 +16,9 @@ class JoinGroup<T : Any, R : Any>(var name: String, internal val node: DataNode<
var meta: MetaBuilder = MetaBuilder()
lateinit var result: suspend ActionEnv.(Map<String, T>) -> R
lateinit var result: suspend ActionEnv.(Map<Name, T>) -> R
fun result(f: suspend ActionEnv.(Map<String, T>) -> R) {
fun result(f: suspend ActionEnv.(Map<Name, T>) -> R) {
this.result = f;
}
@ -53,7 +52,7 @@ class JoinGroupBuilder<T : Any, R : Any> {
/**
* Apply transformation to the whole node
*/
fun result(resultName: String, f: suspend ActionEnv.(Map<String, T>) -> R) {
fun result(resultName: String, f: suspend ActionEnv.(Map<Name, T>) -> R) {
groupRules += { node ->
listOf(JoinGroup<T, R>(resultName, node).apply { result(f) })
}
@ -85,21 +84,19 @@ class JoinAction<T : Any, R : Any>(
val laminate = Laminate(group.meta, meta)
val goalMap: Map<String, Goal<T>> = group.node
.dataSequence()
val goalMap: Map<Name, Goal<T>> = group.node
.data()
.associate { it.first to it.second.goal }
val groupName: String = group.name;
if (groupName.isEmpty()) {
throw AnonymousNotAlowedException("Anonymous groups are not allowed");
}
val env = ActionEnv(groupName.toName(), laminate.builder())
val goal = goalMap.join(dispatcher) { group.result.invoke(env, it) }
val res = NamedData(env.name, outputType, goal, env.meta)
builder.add(res)
val goal = goalMap.join(context = context) { group.result.invoke(env, it) }
val res = Data.of(outputType, goal, env.meta)
set(env.name, res)
}
}

View File

@ -25,7 +25,7 @@ class PipeBuilder<T, R>(var name: Name, var meta: MetaBuilder) {
}
abstract class PipeAction<T : Any, R : Any>(
class PipeAction<T : Any, R : Any>(
val inputType: KClass<T>,
val outputType: KClass<R>,
val context: CoroutineContext = EmptyCoroutineContext,
@ -37,7 +37,7 @@ abstract class PipeAction<T : Any, R : Any>(
error("$inputType expected, but ${node.type} received")
}
return DataNode.build(outputType) {
node.dataSequence().forEach { (name, data) ->
node.data().forEach { (name, data) ->
//merging data meta with action meta (data meta is primary)
val oldMeta = meta.builder().apply { update(data.meta) }
// creating environment from old meta and name
@ -57,4 +57,11 @@ abstract class PipeAction<T : Any, R : Any>(
}
}
inline fun <reified T : Any, reified R : Any> DataNode<T>.pipe(
meta: Meta,
context: CoroutineContext = EmptyCoroutineContext,
noinline action: PipeBuilder<T, R>.() -> Unit
): DataNode<R> = PipeAction(T::class, R::class, context, action).invoke(this, meta)

View File

@ -3,11 +3,17 @@ package hep.dataforge.data
import hep.dataforge.meta.Laminate
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
import hep.dataforge.meta.builder
import hep.dataforge.names.Name
import kotlinx.coroutines.runBlocking
import hep.dataforge.names.toName
import kotlin.collections.set
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KClass
import kotlin.reflect.full.isSuperclassOf
class FragmentEnv<T : Any, R : Any>(val context: Context, val name: String, var meta: MetaBuilder, val log: Chronicle) {
class FragmentRule<T : Any, R : Any>(val name: Name, var meta: MetaBuilder) {
lateinit var result: suspend (T) -> R
fun result(f: suspend (T) -> R) {
@ -16,75 +22,51 @@ class FragmentEnv<T : Any, R : Any>(val context: Context, val name: String, var
}
class SplitBuilder<T : Any, R : Any>(val context: Context, val name: String, val meta: Meta) {
internal val fragments: MutableMap<String, FragmentEnv<T, R>.() -> Unit> = HashMap()
class SplitBuilder<T : Any, R : Any>(val name: Name, val meta: Meta) {
internal val fragments: MutableMap<Name, FragmentRule<T, R>.() -> Unit> = HashMap()
/**
* Add new fragment building rule. If the framgent not defined, result won't be available even if it is present in the map
* @param name the name of a fragment
* @param rule the rule to transform fragment name and meta using
*/
fun fragment(name: String, rule: FragmentEnv<T, R>.() -> Unit) {
fragments[name] = rule
fun fragment(name: String, rule: FragmentRule<T, R>.() -> Unit) {
fragments[name.toName()] = rule
}
}
class KSplit<T : Any, R : Any>(
actionName: String,
inputType: Class<T>,
outputType: Class<R>,
class SplitAction<T : Any, R : Any>(
val inputType: KClass<T>,
val outputType: KClass<R>,
val context: CoroutineContext = EmptyCoroutineContext,
private val action: SplitBuilder<T, R>.() -> Unit
) : GenericAction<T, R>(actionName, inputType, outputType) {
) : Action<T, R> {
override fun run(context: Context, data: DataNode<out T>, actionMeta: Meta): DataNode<R> {
if (!this.inputType.isAssignableFrom(data.type)) {
throw RuntimeException("Type mismatch in action $name. $inputType expected, but ${data.type} received")
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> {
if (!this.inputType.isSuperclassOf(node.type)) {
error("$inputType expected, but ${node.type} received")
}
val builder = DataSet.edit(outputType)
return DataNode.build(outputType) {
node.data().forEach { (name, data) ->
val laminate = Laminate(data.meta, meta)
runBlocking {
data.dataStream(true).forEach {
val split = SplitBuilder<T, R>(name, data.meta).apply(action)
val laminate = Laminate(it.meta, actionMeta)
val split = SplitBuilder<T, R>(context, it.name, it.meta).apply(action)
val dispatcher = context + getExecutorService(context, laminate).asCoroutineDispatcher()
// Create a map of results in a single goal
//val commonGoal = it.goal.pipe(dispatcher) { split.result.invoke(env, it) }
// apply individual fragment rules to result
split.fragments.forEach { name, rule ->
val env = FragmentEnv<T, R>(
context,
it.name,
laminate.builder,
context.history.getChronicle(Name.joinString(it.name, name))
)
split.fragments.forEach { fragmentName, rule ->
val env = FragmentRule<T, R>(fragmentName, laminate.builder())
rule.invoke(env)
rule(env)
val goal = it.goal.pipe(dispatcher, env.result)
val goal = data.goal.pipe(context = context) { env.result(it) }
val res = NamedData(env.name, outputType, goal, env.meta)
builder.add(res)
val res = Data.of(outputType, goal, env.meta)
set(env.name, res)
}
}
}
return builder.build();
}
}
inline fun <reified T : Any, reified R : Any> DataNode<T>.pipe(
context: Context,
meta: Meta,
name: String = "pipe",
noinline action: PipeBuilder<T, R>.() -> Unit
): DataNode<R> {
return KPipe(name, T::class.java, R::class.java, action).run(context, this, meta);
}

View File

@ -22,13 +22,14 @@ interface SpecificationCompanion<T : Specification> {
fun build(action: T.() -> Unit) = update(Config(), action)
fun empty() = build { }
/**
* Wrap generic configuration producing instance of desired type
*/
fun wrap(config: Config): T
fun wrap(meta: Meta): T = wrap(meta.toConfig())
}
fun <T : Specification> specification(wrapper: (Config) -> T): SpecificationCompanion<T> =

View File

@ -37,7 +37,7 @@ class TaskModelDependency(val name: String, val meta: Meta, val placement: Name
return if (placement.isEmpty()) {
result
} else {
DataTreeBuilder<Any>().apply { this[placement] = result }.build()
DataTreeBuilder(Any::class).apply { this[placement] = result }.build()
}
}

View File

@ -45,8 +45,8 @@ data class TaskModel(
* Build input for the task
*/
fun TaskModel.buildInput(workspace: Workspace): DataTree<Any> {
return DataTreeBuilder<Any>().apply {
dependencies.asSequence().flatMap { it.apply(workspace).dataSequence() }.forEach { (name, data) ->
return DataTreeBuilder(Any::class).apply {
dependencies.asSequence().flatMap { it.apply(workspace).data() }.forEach { (name, data) ->
//TODO add concise error on replacement
this[name] = data
}

View File

@ -43,8 +43,8 @@ interface Workspace : ContextAware, Provider {
return when (target) {
"target", Meta.TYPE -> targets.keys.asSequence().map { it.toName() }
Task.TYPE -> tasks.keys.asSequence().map { it.toName() }
Data.TYPE -> data.dataSequence().map { it.first }
DataNode.TYPE -> data.nodeSequence().map { it.first }
Data.TYPE -> data.data().map { it.first }
DataNode.TYPE -> data.nodes().map { it.first }
else -> emptySequence()
}
}

View File

@ -11,7 +11,7 @@ import hep.dataforge.meta.buildMeta
* A builder for a workspace
*/
class WorkspaceBuilder(var context: Context) {
val data = DataTreeBuilder<Any>()
val data = DataTreeBuilder(Any::class)
val targets = HashMap<String, Meta>()
val tasks = HashSet<Task<Any>>()

View File

@ -0,0 +1,218 @@
package hep.dataforge.workspace
import hep.dataforge.data.*
import hep.dataforge.descriptors.NodeDescriptor
import hep.dataforge.meta.Meta
import hep.dataforge.meta.get
import hep.dataforge.meta.node
import hep.dataforge.names.toName
import kotlin.reflect.KClass
class GenericTask<R : Any>(
override val name: String,
override val type: KClass<out R>,
override val descriptor: NodeDescriptor,
private val modelTransform: TaskModelBuilder.(Meta) -> Unit,
private val dataTransform: TaskModel.(DataNode<Any>) -> DataNode<R>
) : Task<R> {
private fun gather(workspace: Workspace, model: TaskModel): DataNode<Any> {
// val builder = DataTreeBuilder(Any::class)
// model.dependencies.forEach { dep ->
// dep.apply(workspace)
// }
// return builder.build()
}
override fun run(workspace: Workspace, model: TaskModel): DataNode<R> {
//validate model
validate(model)
// gather data
val input = gather(workspace, model)
//execute
workspace.context.logger.info("Starting task '$name' on data node ${input.name} with meta: \n${model.meta}")
val output = dataTransform.invoke(model, input)
//handle result
output.handle(model.context.dispatcher) { this.handle(it) }
return output
}
/**
* Build new TaskModel and apply specific model transformation for this
* task. By default model uses the meta node with the same node as the name of the task.
*
* @param workspace
* @param taskConfig
* @return
*/
override fun build(workspace: Workspace, taskConfig: Meta): TaskModel {
val taskMeta = taskConfig[name]?.node ?: taskConfig
val builder = TaskModelBuilder(name, taskMeta)
modelTransform.invoke(builder, taskMeta)
return builder.build()
}
//TODO add validation
}
class KTaskBuilder(val name: String) {
private var modelTransform: TaskModelBuilder.(Meta) -> Unit = { data("*") }
var descriptor: NodeDescriptor? = null
private class DataTransformation(
val from: String = "",
val to: String = "",
val transform: TaskModel.(DataNode<Any>?) -> DataNode<Any>
) {
operator fun invoke(model: TaskModel, node: DataNode<Any>): DataNode<Any> {
val localData = if (from.isEmpty()) {
node
} else {
node.getNode(from.toName())
}
return transform.invoke(model, localData);
}
}
private val dataTransforms: MutableList<DataTransformation> = ArrayList();
fun model(modelTransform: TaskModelBuilder.(Meta) -> Unit) {
this.modelTransform = modelTransform
}
fun <T : Any> transform(
inputType: Class<T>,
from: String = "",
to: String = "",
transform: TaskModel.(DataNode<T>) -> DataNode<Any>
) {
dataTransforms += DataTransformation(from, to) { data: DataNode<Any> ->
transform.invoke(this, data.checked(inputType))
}
}
inline fun <reified T : Any> transform(
from: String = "",
to: String = "",
noinline transform: TaskModel.(DataNode<T>) -> DataNode<Any>
) {
transform(T::class.java, from, to, transform)
}
/**
* Perform given action on data elements in `from` node in input and put the result to `to` node
*/
inline fun <reified T : Any, reified R : Any> action(action: Action<T, R>, from: String = "", to: String = "") {
transform(from, to) { data: DataNode<T> ->
action(data, meta)
}
}
inline fun <reified T : Any, reified R : Any> pipeAction(
actionName: String = "pipe",
from: String = "",
to: String = "",
noinline action: PipeBuilder<T, R>.() -> Unit
) {
val pipe: Action<T, R> = PipeAction(
inputType = T::class,
outputType = R::class,
block = action
)
action(pipe, from, to);
}
inline fun <reified T : Any, reified R : Any> pipe(
actionName: String = "pipe",
from: String = "",
to: String = "",
noinline action: suspend ActionEnv.(T) -> R
) {
val pipe: Action<T, R> = PipeAction(
inputType = T::class,
outputType = R::class
) { result(action) }
action(pipe, from, to);
}
inline fun <reified T : Any, reified R : Any> joinAction(
actionName: String = "join",
from: String = "",
to: String = "",
noinline action: JoinGroupBuilder<T, R>.() -> Unit
) {
val join: Action<T, R> = JoinAction(
inputType = T::class,
outputType = R::class,
action = action
)
action(join, from, to);
}
inline fun <reified T : Any, reified R : Any> join(
actionName: String = name,
from: String = "",
to: String = "",
noinline action: suspend ActionEnv.(Map<String, T>) -> R
) {
val join: Action<T, R> = JoinAction(
inputType = T::class,
outputType = R::class,
action = {
result(meta.getString("@target", actionName), action)
}
)
action(join, from, to);
}
inline fun <reified T : Any, reified R : Any> splitAction(
actionName: String = "split",
from: String = "",
to: String = "",
noinline action: SplitBuilder<T, R>.() -> Unit
) {
val split: Action<T, R> = SplitAction(
inputType = T::class,
outputType = R::class,
action = action
)
action(split, from, to);
}
/**
* Use DSL to create a descriptor for this task
*/
fun descriptor(transform: NodeDescriptor.() -> Unit) {
this.descriptor = NodeDescriptor.build(transform)
}
fun build(): GenericTask<Any> {
val transform: TaskModel.(DataNode<Any>) -> DataNode<Any> = { data ->
if (dataTransforms.isEmpty()) {
//return data node as is
logger.warn("No transformation present, returning input data")
data.checked(Any::class.java)
} else {
val builder = DataTreeBuilder(Any::class)
dataTransforms.forEach {
val res = it(this, data)
if (it.to.isEmpty()) {
builder.update(res)
} else {
builder.putNode(it.to, res)
}
}
builder.build()
}
}
return GenericTask<Any>(name, Any::class, descriptor ?: NodeDescriptor.empty(), modelTransform, transform);
}
}
fun task(name: String, builder: KTaskBuilder.() -> Unit): GenericTask<Any> {
return KTaskBuilder(name).apply(builder).build();
}

View File

@ -1,180 +0,0 @@
package hep.dataforge.workspace
import hep.dataforge.data.Action
import hep.dataforge.data.DataNode
import hep.dataforge.data.DataTree
import hep.dataforge.descriptors.NodeDescriptor
import hep.dataforge.meta.Meta
import hep.dataforge.names.Name
import kotlin.reflect.KClass
class KTask<R : Any>(
override val name: String,
type: KClass<R>,
descriptor: NodeDescriptor? = null,
private val modelTransform: TaskModel.Builder.(Meta) -> Unit,
private val dataTransform: TaskModel.(DataNode<Any>) -> DataNode<R>
) : AbstractTask<R>(type.java, descriptor) {
override fun run(model: TaskModel, data: DataNode<Any>): DataNode<R> {
model.context.logger.info("Starting task '$name' on data node ${data.name} with meta: \n${model.meta}")
return dataTransform.invoke(model, data);
}
override fun buildModel(model: TaskModel.Builder, meta: Meta) {
modelTransform.invoke(model, meta);
}
//TODO add validation
}
class KTaskBuilder(val name: String) {
private var modelTransform: TaskModel.Builder.(Meta) -> Unit = { data("*") }
var descriptor: NodeDescriptor? = null
private class DataTransformation(
val from: String = "",
val to: String = "",
val transform: TaskModel.(DataNode<out Any>) -> DataNode<out Any>
) {
operator fun invoke(model: TaskModel, node: DataNode<Any>): DataNode<out Any> {
val localData = if (from.isEmpty()) {
node
} else {
node.getNode(from)
}
return transform.invoke(model, localData);
}
}
private val dataTransforms: MutableList<DataTransformation> = ArrayList();
fun model(modelTransform: TaskModel.Builder.(Meta) -> Unit) {
this.modelTransform = modelTransform
}
fun <T : Any> transform(inputType: Class<T>, from: String = "", to: String = "", transform: TaskModel.(DataNode<out T>) -> DataNode<out Any>) {
dataTransforms += DataTransformation(from, to) { data: DataNode<out Any> ->
transform.invoke(this, data.checked(inputType))
}
}
inline fun <reified T : Any> transform(from: String = "", to: String = "", noinline transform: TaskModel.(DataNode<out T>) -> DataNode<out Any>) {
transform(T::class.java, from, to, transform)
}
/**
* Perform given action on data elements in `from` node in input and put the result to `to` node
*/
inline fun <reified T : Any, reified R : Any> action(action: Action<T, R>, from: String = "", to: String = "") {
transform(from, to){ data: DataNode<out T> ->
action.run(context, data, meta)
}
}
inline fun <reified T : Any, reified R : Any> pipeAction(
actionName: String = "pipe",
from: String = "",
to: String = "",
noinline action: PipeBuilder<T, R>.() -> Unit) {
val pipe: Action<T, R> = KPipe(
actionName = Name.joinString(name, actionName),
inputType = T::class.java,
outputType = R::class.java,
action = action
)
action(pipe, from, to);
}
inline fun <reified T : Any, reified R : Any> pipe(
actionName: String = "pipe",
from: String = "",
to: String = "",
noinline action: suspend ActionEnv.(T) -> R) {
val pipe: Action<T, R> = KPipe(
actionName = Name.joinString(name, actionName),
inputType = T::class.java,
outputType = R::class.java,
action = { result(action) }
)
action(pipe, from, to);
}
inline fun <reified T : Any, reified R : Any> joinAction(
actionName: String = "join",
from: String = "",
to: String = "",
noinline action: JoinGroupBuilder<T, R>.() -> Unit) {
val join: Action<T, R> = KJoin(
actionName = Name.joinString(name, actionName),
inputType = T::class.java,
outputType = R::class.java,
action = action
)
action(join, from, to);
}
inline fun <reified T : Any, reified R : Any> join(
actionName: String = name,
from: String = "",
to: String = "",
noinline action: suspend ActionEnv.(Map<String, T>) -> R) {
val join: Action<T, R> = KJoin(
actionName = Name.joinString(name, actionName),
inputType = T::class.java,
outputType = R::class.java,
action = {
result(meta.getString("@target", actionName), action)
}
)
action(join, from, to);
}
inline fun <reified T : Any, reified R : Any> splitAction(
actionName: String = "split",
from: String = "",
to: String = "",
noinline action: SplitBuilder<T, R>.() -> Unit) {
val split: Action<T, R> = KSplit(
actionName = Name.joinString(name, actionName),
inputType = T::class.java,
outputType = R::class.java,
action = action
)
action(split, from, to);
}
/**
* Use DSL to create a descriptor for this task
*/
fun descriptor(transform: DescriptorBuilder.() -> Unit) {
this.descriptor = DescriptorBuilder(name).apply(transform).build()
}
fun build(): KTask<Any> {
val transform: TaskModel.(DataNode<Any>) -> DataNode<Any> = { data ->
if (dataTransforms.isEmpty()) {
//return data node as is
logger.warn("No transformation present, returning input data")
data.checked(Any::class.java)
} else {
val builder: DataNodeEditor<Any> = DataTree.edit(Any::class.java)
dataTransforms.forEach {
val res = it(this, data)
if (it.to.isEmpty()) {
builder.update(res)
} else {
builder.putNode(it.to, res)
}
}
builder.build()
}
}
return KTask(name, Any::class, descriptor, modelTransform, transform);
}
}
fun task(name: String, builder: KTaskBuilder.() -> Unit): KTask<Any> {
return KTaskBuilder(name).apply(builder).build();
}