Added strong typing to tasks and task dependencies

This commit is contained in:
Alexander Nozik 2019-09-14 16:50:25 +03:00
parent c175dc7de4
commit 6c1c49d15e
9 changed files with 135 additions and 94 deletions

View File

@ -87,7 +87,11 @@ class DirectTaskDependency<T : Any>(
} }
} }
class WorkspaceTaskDependency(override val name: Name, meta: Meta, placement: Name) : TaskDependency<Any>(meta, placement) { class WorkspaceTaskDependency(
override val name: Name,
meta: Meta,
placement: Name
) : TaskDependency<Any>(meta, placement) {
override fun resolveTask(workspace: Workspace): Task<*> = override fun resolveTask(workspace: Workspace): Task<*> =
workspace.tasks[name] ?: error("Task with name $name is not found in the workspace") workspace.tasks[name] ?: error("Task with name $name is not found in the workspace")
} }

View File

@ -55,7 +55,7 @@ class GenericTask<R : Any>(
override fun build(workspace: Workspace, taskConfig: Meta): TaskModel { override fun build(workspace: Workspace, taskConfig: Meta): TaskModel {
val taskMeta = taskConfig[name]?.node ?: taskConfig val taskMeta = taskConfig[name]?.node ?: taskConfig
val builder = TaskModelBuilder(name, taskMeta) val builder = TaskModelBuilder(name, taskMeta)
modelTransform.invoke(builder, taskMeta) builder.modelTransform(taskMeta)
return builder.build() return builder.build()
} }
//TODO add validation //TODO add validation

View File

@ -1,7 +1,6 @@
package hep.dataforge.workspace package hep.dataforge.workspace
import hep.dataforge.context.Context import hep.dataforge.context.Context
import hep.dataforge.context.Global
import hep.dataforge.context.content import hep.dataforge.context.content
import hep.dataforge.context.toMap import hep.dataforge.context.toMap
import hep.dataforge.data.DataNode import hep.dataforge.data.DataNode
@ -24,7 +23,6 @@ class SimpleWorkspace(
} }
companion object { companion object {
fun build(parent: Context = Global, block: SimpleWorkspaceBuilder.() -> Unit): SimpleWorkspace =
SimpleWorkspaceBuilder(parent).apply(block).build()
} }
} }

View File

@ -8,13 +8,15 @@ import hep.dataforge.meta.get
import hep.dataforge.meta.string import hep.dataforge.meta.string
import hep.dataforge.names.EmptyName import hep.dataforge.names.EmptyName
import hep.dataforge.names.Name import hep.dataforge.names.Name
import hep.dataforge.names.asName import hep.dataforge.names.isEmpty
import hep.dataforge.names.toName import hep.dataforge.names.toName
import kotlin.jvm.JvmName
import kotlin.reflect.KClass import kotlin.reflect.KClass
@TaskBuildScope @TaskBuildScope
class TaskBuilder<R : Any>(val name: String, val type: KClass<out R>) { class TaskBuilder<R : Any>(val name: Name, val type: KClass<out R>) {
private var modelTransform: TaskModelBuilder.(Meta) -> Unit = { allData() } private var modelTransform: TaskModelBuilder.(Meta) -> Unit = { allData() }
// private val additionalDependencies = HashSet<Dependency>()
var descriptor: NodeDescriptor? = null var descriptor: NodeDescriptor? = null
private val dataTransforms: MutableList<DataTransformation> = ArrayList() private val dataTransforms: MutableList<DataTransformation> = ArrayList()
@ -30,12 +32,16 @@ class TaskBuilder<R : Any>(val name: String, val type: KClass<out R>) {
val localData = if (from.isEmpty()) { val localData = if (from.isEmpty()) {
node node
} else { } else {
node[from.toName()].node ?: return null node[from].node ?: return null
} }
return transform(workspace.context, model, localData) return transform(workspace.context, model, localData)
} }
} }
// override fun add(dependency: Dependency) {
// additionalDependencies.add(dependency)
// }
fun model(modelTransform: TaskModelBuilder.(Meta) -> Unit) { fun model(modelTransform: TaskModelBuilder.(Meta) -> Unit) {
this.modelTransform = modelTransform this.modelTransform = modelTransform
} }
@ -43,13 +49,14 @@ class TaskBuilder<R : Any>(val name: String, val type: KClass<out R>) {
/** /**
* Add a transformation on untyped data * Add a transformation on untyped data
*/ */
fun rawTransform( @JvmName("rawTransform")
fun transform(
from: String = "", from: String = "",
to: String = "", to: String = "",
block: TaskEnv.(DataNode<*>) -> DataNode<R> block: TaskEnv.(DataNode<*>) -> DataNode<R>
) { ) {
dataTransforms += DataTransformation(from, to) { context, model, data -> dataTransforms += DataTransformation(from, to) { context, model, data ->
val env = TaskEnv(EmptyName, model.meta, context) val env = TaskEnv(EmptyName, model.meta, context, data)
env.block(data) env.block(data)
} }
} }
@ -62,7 +69,7 @@ class TaskBuilder<R : Any>(val name: String, val type: KClass<out R>) {
) { ) {
dataTransforms += DataTransformation(from, to) { context, model, data -> dataTransforms += DataTransformation(from, to) { context, model, data ->
data.ensureType(inputType) data.ensureType(inputType)
val env = TaskEnv(EmptyName, model.meta, context) val env = TaskEnv(EmptyName, model.meta, context, data)
env.block(data.cast(inputType)) env.block(data.cast(inputType))
} }
} }
@ -88,7 +95,14 @@ class TaskBuilder<R : Any>(val name: String, val type: KClass<out R>) {
} }
} }
class TaskEnv(val name: Name, val meta: Meta, val context: Context) class TaskEnv(val name: Name, val meta: Meta, val context: Context, val data: DataNode<Any>) {
operator fun <T : Any> DirectTaskDependency<T>.invoke(): DataNode<T> = if(placement.isEmpty()){
data.cast(task.type)
} else {
data[placement].node?.cast(task.type)
?: error("Could not find results of direct task dependency $this at \"$placement\"")
}
}
/** /**
* A customized pipe action with ability to change meta and name * A customized pipe action with ability to change meta and name
@ -121,7 +135,7 @@ class TaskBuilder<R : Any>(val name: String, val type: KClass<out R>) {
) { ) {
//TODO automatically append task meta //TODO automatically append task meta
result = { data -> result = { data ->
TaskEnv(name, meta, context).block(data) block(data)
} }
} }
} }
@ -133,7 +147,7 @@ class TaskBuilder<R : Any>(val name: String, val type: KClass<out R>) {
inline fun <reified T : Any> joinByGroup( inline fun <reified T : Any> joinByGroup(
from: String = "", from: String = "",
to: String = "", to: String = "",
crossinline block: JoinGroupBuilder<T, R>.(TaskEnv) -> Unit crossinline block: JoinGroupBuilder<T, R>.(TaskEnv) -> Unit //TODO needs KEEP-176
) { ) {
action(from, to) { action(from, to) {
JoinAction( JoinAction(
@ -159,7 +173,7 @@ class TaskBuilder<R : Any>(val name: String, val type: KClass<out R>) {
result( result(
actionMeta[TaskModel.MODEL_TARGET_KEY]?.string ?: "@anonymous" actionMeta[TaskModel.MODEL_TARGET_KEY]?.string ?: "@anonymous"
) { data -> ) { data ->
TaskEnv(name, meta, context).block(data) block(data)
} }
} }
) )
@ -172,7 +186,7 @@ class TaskBuilder<R : Any>(val name: String, val type: KClass<out R>) {
inline fun <reified T : Any> split( inline fun <reified T : Any> split(
from: String = "", from: String = "",
to: String = "", to: String = "",
crossinline block: SplitBuilder<T, R>.(TaskEnv) -> Unit crossinline block: SplitBuilder<T, R>.(TaskEnv) -> Unit //TODO needs KEEP-176
) { ) {
action(from, to) { action(from, to) {
SplitAction( SplitAction(
@ -189,9 +203,14 @@ class TaskBuilder<R : Any>(val name: String, val type: KClass<out R>) {
this.descriptor = NodeDescriptor.build(transform) this.descriptor = NodeDescriptor.build(transform)
} }
internal fun build(): GenericTask<R> = internal fun build(): GenericTask<R> {
GenericTask( // val actualTransform: TaskModelBuilder.(Meta) -> Unit = {
name.asName(), // modelTransform
// dependencies.addAll(additionalDependencies)
// }
return GenericTask(
name,
type, type,
descriptor ?: NodeDescriptor.empty(), descriptor ?: NodeDescriptor.empty(),
modelTransform modelTransform
@ -221,14 +240,13 @@ class TaskBuilder<R : Any>(val name: String, val type: KClass<out R>) {
} }
} }
} }
}
fun <T : Any> Workspace.Companion.task( fun <T : Any> Workspace.Companion.task(
name: String, name: String,
type: KClass<out T>, type: KClass<out T>,
builder: TaskBuilder<T>.() -> Unit builder: TaskBuilder<T>.() -> Unit
): GenericTask<T> { ): GenericTask<T> = TaskBuilder(name.toName(), type).apply(builder).build()
return TaskBuilder(name, type).apply(builder).build()
}
//TODO add delegates to build gradle-like tasks //TODO add delegates to build gradle-like tasks

View File

@ -63,47 +63,60 @@ fun TaskModel.buildInput(workspace: Workspace): DataTree<Any> {
@DslMarker @DslMarker
annotation class TaskBuildScope annotation class TaskBuildScope
/** interface TaskDependencyContainer {
* A builder for [TaskModel] val defaultMeta: Meta
*/ fun add(dependency: Dependency)
@TaskBuildScope }
class TaskModelBuilder(val name: Name, meta: Meta = EmptyMeta) {
/**
* Meta for current task. By default uses the whole input meta
*/
var meta: MetaBuilder = meta.builder()
val dependencies = HashSet<Dependency>()
var target: String by this.meta.string(key = MODEL_TARGET_KEY, default = "")
/** /**
* Add dependency for a task defined in a workspace and resolved by * Add dependency for a task defined in a workspace and resolved by
*/ */
fun dependsOn(name: Name, meta: Meta = this.meta, placement: Name = EmptyName) { fun TaskDependencyContainer.dependsOn(
dependencies.add(WorkspaceTaskDependency(name, meta, placement)) name: Name,
} placement: Name = EmptyName,
meta: Meta = defaultMeta
): WorkspaceTaskDependency =
WorkspaceTaskDependency(name, meta, placement).also { add(it) }
fun dependsOn(name: String, meta: Meta = this.meta, placement: Name = EmptyName) = fun TaskDependencyContainer.dependsOn(
dependsOn(name.toName(), meta, placement) name: String,
placement: Name = EmptyName,
meta: Meta = defaultMeta
): WorkspaceTaskDependency =
dependsOn(name.toName(), placement, meta)
fun dependsOn(task: Task<*>, meta: Meta = this.meta, placement: Name = EmptyName) { fun <T : Any> TaskDependencyContainer.dependsOn(
dependencies.add(DirectTaskDependency(task, meta, placement)) task: Task<T>,
} placement: Name = EmptyName,
meta: Meta = defaultMeta
): DirectTaskDependency<T> =
DirectTaskDependency(task, meta, placement).also { add(it) }
fun dependsOn(task: Task<*>, placement: Name = EmptyName, metaBuilder: MetaBuilder.() -> Unit) = fun <T : Any> TaskDependencyContainer.dependsOn(
dependsOn(task.name, buildMeta(metaBuilder), placement) task: Task<T>,
placement: String,
meta: Meta = defaultMeta
): DirectTaskDependency<T> =
DirectTaskDependency(task, meta, placement.toName()).also { add(it) }
fun <T : Any> TaskDependencyContainer.dependsOn(
task: Task<T>,
placement: Name = EmptyName,
metaBuilder: MetaBuilder.() -> Unit
): DirectTaskDependency<T> =
dependsOn(task, placement, buildMeta(metaBuilder))
/** /**
* Add custom data dependency * Add custom data dependency
*/ */
fun data(action: DataFilter.() -> Unit) { fun TaskDependencyContainer.data(action: DataFilter.() -> Unit): DataDependency =
dependencies.add(DataDependency(DataFilter.build(action))) DataDependency(DataFilter.build(action)).also { add(it) }
}
/** /**
* User-friendly way to add data dependency * User-friendly way to add data dependency
*/ */
fun data(pattern: String? = null, from: String? = null, to: String? = null) = data { fun TaskDependencyContainer.data(pattern: String? = null, from: String? = null, to: String? = null): DataDependency =
data {
pattern?.let { this.pattern = it } pattern?.let { this.pattern = it }
from?.let { this.from = it } from?.let { this.from = it }
to?.let { this.to = it } to?.let { this.to = it }
@ -112,10 +125,27 @@ class TaskModelBuilder(val name: Name, meta: Meta = EmptyMeta) {
/** /**
* Add all data as root node * Add all data as root node
*/ */
fun allData(to: Name = EmptyName) { fun TaskDependencyContainer.allData(to: Name = EmptyName) = AllDataDependency(to).also { add(it) }
dependencies.add(AllDataDependency(to))
/**
* A builder for [TaskModel]
*/
class TaskModelBuilder(val name: Name, meta: Meta = EmptyMeta) : TaskDependencyContainer {
/**
* Meta for current task. By default uses the whole input meta
*/
var meta: MetaBuilder = meta.builder()
val dependencies = HashSet<Dependency>()
override val defaultMeta: Meta get() = meta
override fun add(dependency: Dependency) {
dependencies.add(dependency)
} }
var target: String by this.meta.string(key = MODEL_TARGET_KEY, default = "")
fun build(): TaskModel = TaskModel(name, meta.seal(), dependencies) fun build(): TaskModel = TaskModel(name, meta.seal(), dependencies)
} }

View File

@ -1,6 +1,8 @@
package hep.dataforge.workspace package hep.dataforge.workspace
import hep.dataforge.context.Context
import hep.dataforge.context.ContextAware import hep.dataforge.context.ContextAware
import hep.dataforge.context.Global
import hep.dataforge.data.Data import hep.dataforge.data.Data
import hep.dataforge.data.DataNode import hep.dataforge.data.DataNode
import hep.dataforge.data.dataSequence import hep.dataforge.data.dataSequence
@ -56,6 +58,8 @@ interface Workspace : ContextAware, Provider {
companion object { companion object {
const val TYPE = "workspace" const val TYPE = "workspace"
operator fun invoke(parent: Context = Global, block: SimpleWorkspaceBuilder.() -> Unit): SimpleWorkspace =
SimpleWorkspaceBuilder(parent).apply(block).build()
} }
} }
@ -73,3 +77,6 @@ fun Workspace.run(task: String, meta: Meta) =
fun Workspace.run(task: String, block: MetaBuilder.() -> Unit = {}) = fun Workspace.run(task: String, block: MetaBuilder.() -> Unit = {}) =
run(task, buildMeta(block)) run(task, buildMeta(block))
fun <T: Any> Workspace.run(task: Task<T>, metaBuilder: MetaBuilder.() -> Unit = {}): DataNode<T> =
run(task, buildMeta(metaBuilder))

View File

@ -8,6 +8,7 @@ import hep.dataforge.meta.*
import hep.dataforge.names.EmptyName import hep.dataforge.names.EmptyName
import hep.dataforge.names.Name import hep.dataforge.names.Name
import hep.dataforge.names.isEmpty import hep.dataforge.names.isEmpty
import hep.dataforge.names.toName
import kotlin.jvm.JvmName import kotlin.jvm.JvmName
import kotlin.reflect.KClass import kotlin.reflect.KClass
@ -70,9 +71,7 @@ fun <T : Any> WorkspaceBuilder.task(
name: String, name: String,
type: KClass<out T>, type: KClass<out T>,
builder: TaskBuilder<T>.() -> Unit builder: TaskBuilder<T>.() -> Unit
): Task<T> { ): Task<T> = TaskBuilder(name.toName(), type).apply(builder).build().also { tasks.add(it) }
return TaskBuilder(name, type).apply(builder).build().also { tasks.add(it) }
}
inline fun <reified T : Any> WorkspaceBuilder.task( inline fun <reified T : Any> WorkspaceBuilder.task(
name: String, name: String,

View File

@ -1,5 +0,0 @@
package hep.dataforge.workspace
//fun <T1: Any, T2: Any, R: Any> TaskBuilder.zip(
//// val firstNo
////) = rawTransform { }

View File

@ -4,7 +4,6 @@ import hep.dataforge.context.PluginTag
import hep.dataforge.data.* import hep.dataforge.data.*
import hep.dataforge.meta.boolean import hep.dataforge.meta.boolean
import hep.dataforge.meta.get import hep.dataforge.meta.get
import hep.dataforge.names.asName
import org.junit.Test import org.junit.Test
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertTrue import kotlin.test.assertTrue
@ -22,7 +21,7 @@ class SimpleWorkspaceTest {
override val tasks: Collection<Task<*>> = listOf(contextTask) override val tasks: Collection<Task<*>> = listOf(contextTask)
} }
val workspace = SimpleWorkspace.build { val workspace = Workspace {
context { context {
plugin(testPlugin) plugin(testPlugin)
@ -35,10 +34,7 @@ class SimpleWorkspaceTest {
} }
val square = task("square") { val square = task<Int>("square") {
model {
allData()
}
pipe<Int> { data -> pipe<Int> { data ->
if (meta["testFlag"].boolean == true) { if (meta["testFlag"].boolean == true) {
println("flag") println("flag")
@ -48,24 +44,21 @@ class SimpleWorkspaceTest {
} }
} }
val linear = task("linear") { val linear = task<Int>("linear") {
model {
allData()
}
pipe<Int> { data -> pipe<Int> { data ->
context.logger.info { "Starting linear on $data" } context.logger.info { "Starting linear on $data" }
data * 2 + 1 data * 2 + 1
} }
} }
val fullSquare = task("fullsquare") { val fullSquare = task<Int>("fullsquare") {
model { model {
dependsOn("square", placement = "square".asName()) val squareDep = dependsOn(square, placement = "square")
dependsOn("linear", placement = "linear".asName()) val linearDep = dependsOn(linear, placement = "linear")
} }
transform<Any> { data -> transform { data ->
val squareNode = data["square"].filterIsInstance<Int>().node!! val squareNode = data["square"].node!!.cast<Int>()//squareDep()
val linearNode = data["linear"].filterIsInstance<Int>().node!! val linearNode = data["linear"].node!!.cast<Int>()//linearDep()
return@transform DataNode(Int::class) { return@transform DataNode(Int::class) {
squareNode.dataSequence().forEach { (name, _) -> squareNode.dataSequence().forEach { (name, _) ->
val newData = Data { val newData = Data {
@ -79,9 +72,9 @@ class SimpleWorkspaceTest {
} }
} }
task("sum") { task<Int>("sum") {
model { model {
dependsOn("square") dependsOn(square)
} }
join<Int> { data -> join<Int> { data ->
context.logger.info { "Starting sum" } context.logger.info { "Starting sum" }
@ -89,10 +82,7 @@ class SimpleWorkspaceTest {
} }
} }
task<Double>("average") { val average = task<Double>("average") {
model {
allData()
}
joinByGroup<Int> { env -> joinByGroup<Int> { env ->
group("even", filter = { name, _ -> name.toString().toInt() % 2 == 0 }) { group("even", filter = { name, _ -> name.toString().toInt() % 2 == 0 }) {
result { data -> result { data ->
@ -111,7 +101,7 @@ class SimpleWorkspaceTest {
task("delta") { task("delta") {
model { model {
dependsOn("average") dependsOn(average)
} }
join<Double> { data -> join<Double> { data ->
data["even"]!! - data["odd"]!! data["even"]!! - data["odd"]!!