Simplified task builder.
This commit is contained in:
parent
e3127503ec
commit
55db973cd6
@ -208,4 +208,9 @@ fun <T : Any> DataNode<T>.filter(predicate: (Name, Data<T>) -> Boolean): DataNod
|
|||||||
|
|
||||||
fun <T: Any> DataNode<T>.first(): Data<T> = data().first().second
|
fun <T: Any> DataNode<T>.first(): Data<T> = data().first().second
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check that node is compatible with given type meaning that each element could be cast to the type
|
||||||
|
*/
|
||||||
|
expect fun DataNode<*>.checkType(type: KClass<*>)
|
||||||
|
|
||||||
//fun <T : Any, R: T> DataNode<T>.filterIsInstance(type: KClass<R>): DataNode<R> = filter{_,data -> type.}
|
//fun <T : Any, R: T> DataNode<T>.filterIsInstance(type: KClass<R>): DataNode<R> = filter{_,data -> type.}
|
@ -18,7 +18,6 @@ package hep.dataforge.data
|
|||||||
import hep.dataforge.meta.Meta
|
import hep.dataforge.meta.Meta
|
||||||
import hep.dataforge.meta.get
|
import hep.dataforge.meta.get
|
||||||
import hep.dataforge.meta.string
|
import hep.dataforge.meta.string
|
||||||
import java.util.*
|
|
||||||
|
|
||||||
interface GroupRule {
|
interface GroupRule {
|
||||||
operator fun <T : Any> invoke(node: DataNode<T>): Map<String, DataNode<T>>
|
operator fun <T : Any> invoke(node: DataNode<T>): Map<String, DataNode<T>>
|
||||||
@ -40,7 +39,8 @@ object GroupBuilder {
|
|||||||
* @param defaultTagValue
|
* @param defaultTagValue
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
fun byValue(key: String, defaultTagValue: String): GroupRule = object : GroupRule {
|
fun byValue(key: String, defaultTagValue: String): GroupRule = object :
|
||||||
|
GroupRule {
|
||||||
override fun <T : Any> invoke(node: DataNode<T>): Map<String, DataNode<T>> {
|
override fun <T : Any> invoke(node: DataNode<T>): Map<String, DataNode<T>> {
|
||||||
val map = HashMap<String, DataTreeBuilder<T>>()
|
val map = HashMap<String, DataTreeBuilder<T>>()
|
||||||
|
|
||||||
@ -62,7 +62,12 @@ object GroupBuilder {
|
|||||||
// )
|
// )
|
||||||
fun byMeta(config: Meta): GroupRule {
|
fun byMeta(config: Meta): GroupRule {
|
||||||
//TODO expand grouping options
|
//TODO expand grouping options
|
||||||
return config["byValue"]?.string?.let { byValue(it, config["defaultValue"]?.string ?: "default") }
|
return config["byValue"]?.string?.let {
|
||||||
|
byValue(
|
||||||
|
it,
|
||||||
|
config["defaultValue"]?.string ?: "default"
|
||||||
|
)
|
||||||
|
}
|
||||||
?: object : GroupRule {
|
?: object : GroupRule {
|
||||||
override fun <T : Any> invoke(node: DataNode<T>): Map<String, DataNode<T>> = mapOf("" to node)
|
override fun <T : Any> invoke(node: DataNode<T>): Map<String, DataNode<T>> = mapOf("" to node)
|
||||||
}
|
}
|
@ -9,7 +9,6 @@ import hep.dataforge.names.toName
|
|||||||
import kotlin.coroutines.CoroutineContext
|
import kotlin.coroutines.CoroutineContext
|
||||||
import kotlin.coroutines.EmptyCoroutineContext
|
import kotlin.coroutines.EmptyCoroutineContext
|
||||||
import kotlin.reflect.KClass
|
import kotlin.reflect.KClass
|
||||||
import kotlin.reflect.full.isSuperclassOf
|
|
||||||
|
|
||||||
|
|
||||||
class JoinGroup<T : Any, R : Any>(var name: String, internal val node: DataNode<T>) {
|
class JoinGroup<T : Any, R : Any>(var name: String, internal val node: DataNode<T>) {
|
||||||
@ -76,9 +75,7 @@ class JoinAction<T : Any, R : Any>(
|
|||||||
) : Action<T, R> {
|
) : Action<T, R> {
|
||||||
|
|
||||||
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> {
|
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> {
|
||||||
if (!this.inputType.isSuperclassOf(node.type)) {
|
node.checkType(inputType)
|
||||||
error("$inputType expected, but ${node.type} received")
|
|
||||||
}
|
|
||||||
return DataNode.build(outputType) {
|
return DataNode.build(outputType) {
|
||||||
JoinGroupBuilder<T, R>(meta).apply(action).buildGroups(node).forEach { group ->
|
JoinGroupBuilder<T, R>(meta).apply(action).buildGroups(node).forEach { group ->
|
||||||
|
|
@ -5,7 +5,6 @@ import hep.dataforge.names.Name
|
|||||||
import kotlin.coroutines.CoroutineContext
|
import kotlin.coroutines.CoroutineContext
|
||||||
import kotlin.coroutines.EmptyCoroutineContext
|
import kotlin.coroutines.EmptyCoroutineContext
|
||||||
import kotlin.reflect.KClass
|
import kotlin.reflect.KClass
|
||||||
import kotlin.reflect.full.isSuperclassOf
|
|
||||||
|
|
||||||
class ActionEnv(val name: Name, val meta: Meta)
|
class ActionEnv(val name: Name, val meta: Meta)
|
||||||
|
|
||||||
@ -33,9 +32,8 @@ class PipeAction<T : Any, R : Any>(
|
|||||||
) : Action<T, R> {
|
) : Action<T, R> {
|
||||||
|
|
||||||
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> {
|
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> {
|
||||||
if (!this.inputType.isSuperclassOf(node.type)) {
|
node.checkType(inputType)
|
||||||
error("$inputType expected, but ${node.type} received")
|
|
||||||
}
|
|
||||||
return DataNode.build(outputType) {
|
return DataNode.build(outputType) {
|
||||||
node.data().forEach { (name, data) ->
|
node.data().forEach { (name, data) ->
|
||||||
//merging data meta with action meta (data meta is primary)
|
//merging data meta with action meta (data meta is primary)
|
@ -10,7 +10,6 @@ import kotlin.collections.set
|
|||||||
import kotlin.coroutines.CoroutineContext
|
import kotlin.coroutines.CoroutineContext
|
||||||
import kotlin.coroutines.EmptyCoroutineContext
|
import kotlin.coroutines.EmptyCoroutineContext
|
||||||
import kotlin.reflect.KClass
|
import kotlin.reflect.KClass
|
||||||
import kotlin.reflect.full.isSuperclassOf
|
|
||||||
|
|
||||||
|
|
||||||
class FragmentRule<T : Any, R : Any>(val name: Name, var meta: MetaBuilder) {
|
class FragmentRule<T : Any, R : Any>(val name: Name, var meta: MetaBuilder) {
|
||||||
@ -43,9 +42,7 @@ class SplitAction<T : Any, R : Any>(
|
|||||||
) : Action<T, R> {
|
) : Action<T, R> {
|
||||||
|
|
||||||
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> {
|
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> {
|
||||||
if (!this.inputType.isSuperclassOf(node.type)) {
|
node.checkType(inputType)
|
||||||
error("$inputType expected, but ${node.type} received")
|
|
||||||
}
|
|
||||||
|
|
||||||
return DataNode.build(outputType) {
|
return DataNode.build(outputType) {
|
||||||
node.data().forEach { (name, data) ->
|
node.data().forEach { (name, data) ->
|
||||||
@ -56,7 +53,7 @@ class SplitAction<T : Any, R : Any>(
|
|||||||
|
|
||||||
|
|
||||||
// apply individual fragment rules to result
|
// apply individual fragment rules to result
|
||||||
split.fragments.forEach { fragmentName, rule ->
|
split.fragments.forEach { (fragmentName, rule) ->
|
||||||
val env = FragmentRule<T, R>(fragmentName, laminate.builder())
|
val env = FragmentRule<T, R>(fragmentName, laminate.builder())
|
||||||
|
|
||||||
rule(env)
|
rule(env)
|
@ -0,0 +1,10 @@
|
|||||||
|
package hep.dataforge.data
|
||||||
|
|
||||||
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check that node is compatible with given type meaning that each element could be cast to the type
|
||||||
|
*/
|
||||||
|
actual fun DataNode<*>.checkType(type: KClass<*>) {
|
||||||
|
//Not supported in js yet
|
||||||
|
}
|
@ -0,0 +1,13 @@
|
|||||||
|
package hep.dataforge.data
|
||||||
|
|
||||||
|
import kotlin.reflect.KClass
|
||||||
|
import kotlin.reflect.full.isSuperclassOf
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check that node is compatible with given type meaning that each element could be cast to the type
|
||||||
|
*/
|
||||||
|
actual fun DataNode<*>.checkType(type: KClass<*>) {
|
||||||
|
if (!type.isSuperclassOf(type)) {
|
||||||
|
error("$type expected, but $type received")
|
||||||
|
}
|
||||||
|
}
|
@ -58,9 +58,13 @@ fun TaskModel.buildInput(workspace: Workspace): DataTree<Any> {
|
|||||||
}.build()
|
}.build()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@DslMarker
|
||||||
|
annotation class TaskBuildScope
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A builder for [TaskModel]
|
* A builder for [TaskModel]
|
||||||
*/
|
*/
|
||||||
|
@TaskBuildScope
|
||||||
class TaskModelBuilder(val name: String, meta: Meta = EmptyMeta) {
|
class TaskModelBuilder(val name: String, meta: Meta = EmptyMeta) {
|
||||||
/**
|
/**
|
||||||
* Meta for current task. By default uses the whole input meta
|
* Meta for current task. By default uses the whole input meta
|
||||||
|
@ -11,6 +11,7 @@ import hep.dataforge.names.toName
|
|||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.GlobalScope
|
import kotlinx.coroutines.GlobalScope
|
||||||
|
|
||||||
|
@TaskBuildScope
|
||||||
interface WorkspaceBuilder {
|
interface WorkspaceBuilder {
|
||||||
val parentContext: Context
|
val parentContext: Context
|
||||||
var context: Context
|
var context: Context
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package hep.dataforge.workspace
|
package hep.dataforge.workspace
|
||||||
|
|
||||||
|
import hep.dataforge.context.Context
|
||||||
import hep.dataforge.data.*
|
import hep.dataforge.data.*
|
||||||
import hep.dataforge.descriptors.NodeDescriptor
|
import hep.dataforge.descriptors.NodeDescriptor
|
||||||
import hep.dataforge.meta.Meta
|
import hep.dataforge.meta.Meta
|
||||||
@ -63,6 +64,7 @@ class GenericTask<R : Any>(
|
|||||||
//TODO add validation
|
//TODO add validation
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@TaskBuildScope
|
||||||
class KTaskBuilder(val name: String) {
|
class KTaskBuilder(val name: String) {
|
||||||
private var modelTransform: TaskModelBuilder.(Meta) -> Unit = { data("*") }
|
private var modelTransform: TaskModelBuilder.(Meta) -> Unit = { data("*") }
|
||||||
var descriptor: NodeDescriptor? = null
|
var descriptor: NodeDescriptor? = null
|
||||||
@ -73,7 +75,7 @@ class KTaskBuilder(val name: String) {
|
|||||||
private class DataTransformation(
|
private class DataTransformation(
|
||||||
val from: String = "",
|
val from: String = "",
|
||||||
val to: String = "",
|
val to: String = "",
|
||||||
val transform: Workspace.() -> TaskModel.(DataNode<Any>) -> DataNode<Any>
|
val transform: (Context, TaskModel, DataNode<Any>) -> DataNode<Any>
|
||||||
) {
|
) {
|
||||||
operator fun invoke(workspace: Workspace, model: TaskModel, node: DataNode<Any>): DataNode<Any>? {
|
operator fun invoke(workspace: Workspace, model: TaskModel, node: DataNode<Any>): DataNode<Any>? {
|
||||||
val localData = if (from.isEmpty()) {
|
val localData = if (from.isEmpty()) {
|
||||||
@ -81,7 +83,7 @@ class KTaskBuilder(val name: String) {
|
|||||||
} else {
|
} else {
|
||||||
node.getNode(from.toName()) ?: return null
|
node.getNode(from.toName()) ?: return null
|
||||||
}
|
}
|
||||||
return transform(workspace).invoke(model, localData)
|
return transform(workspace.context, model, localData)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -91,27 +93,23 @@ class KTaskBuilder(val name: String) {
|
|||||||
this.modelTransform = modelTransform
|
this.modelTransform = modelTransform
|
||||||
}
|
}
|
||||||
|
|
||||||
//class TaskEnv(val workspace: Workspace, val model: TaskModel)
|
|
||||||
|
|
||||||
fun <T : Any> transform(
|
fun <T : Any> transform(
|
||||||
inputType: KClass<T>,
|
inputType: KClass<T>,
|
||||||
from: String = "",
|
from: String = "",
|
||||||
to: String = "",
|
to: String = "",
|
||||||
transform: Workspace.() -> TaskModel.(DataNode<T>) -> DataNode<Any>
|
block: TaskModel.(Context, DataNode<T>) -> DataNode<Any>
|
||||||
) {
|
) {
|
||||||
dataTransforms += DataTransformation(from, to) {
|
dataTransforms += DataTransformation(from, to) { context, model, data ->
|
||||||
{ data: DataNode<Any> ->
|
block(model, context, data.cast(inputType))
|
||||||
transform().invoke(this, data.cast(inputType))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
inline fun <reified T : Any> transform(
|
inline fun <reified T : Any> transform(
|
||||||
from: String = "",
|
from: String = "",
|
||||||
to: String = "",
|
to: String = "",
|
||||||
noinline transform: Workspace.() -> TaskModel.(DataNode<T>) -> DataNode<Any>
|
noinline block: TaskModel.(Context, DataNode<T>) -> DataNode<Any>
|
||||||
) {
|
) {
|
||||||
transform(T::class, from, to, transform)
|
transform(T::class, from, to, block)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -120,69 +118,74 @@ class KTaskBuilder(val name: String) {
|
|||||||
inline fun <reified T : Any, reified R : Any> action(
|
inline fun <reified T : Any, reified R : Any> action(
|
||||||
from: String = "",
|
from: String = "",
|
||||||
to: String = "",
|
to: String = "",
|
||||||
crossinline actionBuilder: Workspace.() -> Action<T, R>
|
crossinline block: Context.() -> Action<T, R>
|
||||||
) {
|
) {
|
||||||
transform(from, to) {
|
transform(from, to) { context, data: DataNode<T> ->
|
||||||
val res: TaskModel.(DataNode<T>) -> DataNode<Any> = { data: DataNode<T> ->
|
block(context).invoke(data, meta)
|
||||||
actionBuilder().invoke(data, meta)
|
|
||||||
}
|
|
||||||
res
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class TaskEnv(val name: Name, val meta: Meta, val context: Context)
|
||||||
|
|
||||||
inline fun <reified T : Any, reified R : Any> pipeAction(
|
inline fun <reified T : Any, reified R : Any> pipeAction(
|
||||||
from: String = "",
|
from: String = "",
|
||||||
to: String = "",
|
to: String = "",
|
||||||
crossinline block: Workspace.() -> PipeBuilder<T, R>.() -> Unit
|
crossinline block: PipeBuilder<T, R>.(Context) -> Unit
|
||||||
) {
|
) {
|
||||||
action(from, to) {
|
action(from, to) {
|
||||||
|
val context = this
|
||||||
PipeAction(
|
PipeAction(
|
||||||
inputType = T::class,
|
inputType = T::class,
|
||||||
outputType = R::class,
|
outputType = R::class
|
||||||
block = block()
|
) { block(context) }
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
inline fun <reified T : Any, reified R : Any> pipe(
|
inline fun <reified T : Any, reified R : Any> pipe(
|
||||||
from: String = "",
|
from: String = "",
|
||||||
to: String = "",
|
to: String = "",
|
||||||
crossinline block: Workspace.() -> suspend ActionEnv.(T) -> R
|
crossinline block: suspend TaskEnv.(T) -> R
|
||||||
) {
|
) {
|
||||||
action(from, to) {
|
action(from, to) {
|
||||||
|
val context = this
|
||||||
PipeAction(
|
PipeAction(
|
||||||
inputType = T::class,
|
inputType = T::class,
|
||||||
outputType = R::class
|
outputType = R::class
|
||||||
) { result(block()) }
|
) {
|
||||||
|
result = { data ->
|
||||||
|
TaskEnv(name, meta, context).block(data)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
inline fun <reified T : Any, reified R : Any> joinAction(
|
inline fun <reified T : Any, reified R : Any> joinAction(
|
||||||
from: String = "",
|
from: String = "",
|
||||||
to: String = "",
|
to: String = "",
|
||||||
crossinline block: Workspace.() -> JoinGroupBuilder<T, R>.() -> Unit
|
crossinline block: JoinGroupBuilder<T, R>.(Context) -> Unit
|
||||||
) {
|
) {
|
||||||
action(from, to) {
|
action(from, to) {
|
||||||
JoinAction(
|
JoinAction(
|
||||||
inputType = T::class,
|
inputType = T::class,
|
||||||
outputType = R::class,
|
outputType = R::class
|
||||||
action = block()
|
) { block(this@action) }
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
inline fun <reified T : Any, reified R : Any> join(
|
inline fun <reified T : Any, reified R : Any> join(
|
||||||
from: String = "",
|
from: String = "",
|
||||||
to: String = "",
|
to: String = "",
|
||||||
crossinline block: Workspace.() -> suspend ActionEnv.(Map<Name, T>) -> R
|
crossinline block: suspend TaskEnv.(Map<Name, T>) -> R
|
||||||
) {
|
) {
|
||||||
action(from, to) {
|
action(from, to) {
|
||||||
|
val context = this
|
||||||
JoinAction(
|
JoinAction(
|
||||||
inputType = T::class,
|
inputType = T::class,
|
||||||
outputType = R::class,
|
outputType = R::class,
|
||||||
action = {
|
action = {
|
||||||
result(actionMeta[TaskModel.MODEL_TARGET_KEY]?.string ?: "@anonimous", block())
|
result(actionMeta[TaskModel.MODEL_TARGET_KEY]?.string ?: "@anonimous") { data ->
|
||||||
|
TaskEnv(name, meta, context).block(data)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@ -191,21 +194,20 @@ class KTaskBuilder(val name: String) {
|
|||||||
inline fun <reified T : Any, reified R : Any> splitAction(
|
inline fun <reified T : Any, reified R : Any> splitAction(
|
||||||
from: String = "",
|
from: String = "",
|
||||||
to: String = "",
|
to: String = "",
|
||||||
crossinline block: Workspace.() -> SplitBuilder<T, R>.() -> Unit
|
crossinline block: SplitBuilder<T, R>.(Context) -> Unit
|
||||||
) {
|
) {
|
||||||
action(from, to) {
|
action(from, to) {
|
||||||
SplitAction(
|
SplitAction(
|
||||||
inputType = T::class,
|
inputType = T::class,
|
||||||
outputType = R::class,
|
outputType = R::class
|
||||||
action = block()
|
) { block(this@action) }
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Use DSL to create a descriptor for this task
|
* Use DSL to create a descriptor for this task
|
||||||
*/
|
*/
|
||||||
fun descriptor(transform: NodeDescriptor.() -> Unit) {
|
fun description(transform: NodeDescriptor.() -> Unit) {
|
||||||
this.descriptor = NodeDescriptor.build(transform)
|
this.descriptor = NodeDescriptor.build(transform)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -18,26 +18,22 @@ class SimpleWorkspaceTest {
|
|||||||
model {
|
model {
|
||||||
allData()
|
allData()
|
||||||
}
|
}
|
||||||
pipe<Int, Int> {
|
pipe<Int, Int> { data ->
|
||||||
{ data ->
|
|
||||||
context.logger.info { "Starting square on $data" }
|
context.logger.info { "Starting square on $data" }
|
||||||
data * data
|
data * data
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
task("sum") {
|
task("sum") {
|
||||||
model {
|
model {
|
||||||
dependsOn("square")
|
dependsOn("square")
|
||||||
}
|
}
|
||||||
join<Int,Int> {
|
join<Int, Int> { data ->
|
||||||
{ data ->
|
|
||||||
context.logger.info { "Starting sum" }
|
context.logger.info { "Starting sum" }
|
||||||
data.values.sum()
|
data.values.sum()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun testWorkspace() {
|
fun testWorkspace() {
|
||||||
|
Loading…
Reference in New Issue
Block a user