Workspace is working

This commit is contained in:
Alexander Nozik 2019-03-29 12:16:24 +03:00
parent 929f1b9d69
commit e3127503ec
11 changed files with 295 additions and 121 deletions

View File

@ -206,4 +206,6 @@ fun <T : Any> DataNode<T>.filter(predicate: (Name, Data<T>) -> Boolean): DataNod
} }
} }
fun <T: Any> DataNode<T>.first(): Data<T> = data().first().second
//fun <T : Any, R: T> DataNode<T>.filterIsInstance(type: KClass<R>): DataNode<R> = filter{_,data -> type.} //fun <T : Any, R: T> DataNode<T>.filterIsInstance(type: KClass<R>): DataNode<R> = filter{_,data -> type.}

View File

@ -16,7 +16,7 @@ class JoinGroup<T : Any, R : Any>(var name: String, internal val node: DataNode<
var meta: MetaBuilder = MetaBuilder() var meta: MetaBuilder = MetaBuilder()
var result: suspend ActionEnv.(Map<Name, T>) -> R = TODO("Action not implemented") lateinit var result: suspend ActionEnv.(Map<Name, T>) -> R
fun result(f: suspend ActionEnv.(Map<Name, T>) -> R) { fun result(f: suspend ActionEnv.(Map<Name, T>) -> R) {
this.result = f; this.result = f;

View File

@ -14,7 +14,7 @@ class ActionEnv(val name: Name, val meta: Meta)
* Action environment * Action environment
*/ */
class PipeBuilder<T, R>(var name: Name, var meta: MetaBuilder) { class PipeBuilder<T, R>(var name: Name, var meta: MetaBuilder) {
var result: suspend ActionEnv.(T) -> R = TODO("Action not implemented") lateinit var result: suspend ActionEnv.(T) -> R
/** /**
* Calculate the result of goal * Calculate the result of goal

View File

@ -14,7 +14,7 @@ import kotlin.reflect.full.isSuperclassOf
class FragmentRule<T : Any, R : Any>(val name: Name, var meta: MetaBuilder) { class FragmentRule<T : Any, R : Any>(val name: Name, var meta: MetaBuilder) {
var result: suspend (T) -> R = TODO("Action not implemented") lateinit var result: suspend (T) -> R
fun result(f: suspend (T) -> R) { fun result(f: suspend (T) -> R) {
result = f; result = f;

View File

@ -23,31 +23,45 @@ class DataDependency(val filter: DataFilter, val placement: Name = EmptyName) :
return if (placement.isEmpty()) { return if (placement.isEmpty()) {
result result
} else { } else {
DataNode.build(Any::class){ this[placement] = result } DataNode.build(Any::class) { this[placement] = result }
} }
} }
override fun toMeta(): Meta = filter.config override fun toMeta(): Meta = buildMeta {
"data" to filter.config
companion object { "to" to placement
val all: DataDependency = DataDependency(DataFilter.build { })
} }
} }
class AllDataDependency(val placement: Name = EmptyName) : Dependency() {
override fun apply(workspace: Workspace): DataNode<Any> = if (placement.isEmpty()) {
workspace.data
} else {
DataNode.build(Any::class) { this[placement] = workspace.data }
}
override fun toMeta() = buildMeta {
"data" to "*"
"to" to placement
}
}
class TaskModelDependency(val name: String, val meta: Meta, val placement: Name = EmptyName) : Dependency() { class TaskModelDependency(val name: String, val meta: Meta, val placement: Name = EmptyName) : Dependency() {
override fun apply(workspace: Workspace): DataNode<Any> { override fun apply(workspace: Workspace): DataNode<Any> {
val task = workspace.tasks[name] ?: error("Task with name ${name} is not found in the workspace") val task = workspace.tasks[name] ?: error("Task with name ${name} is not found in the workspace")
if (task.isTerminal) TODO("Support terminal task") if (task.isTerminal) TODO("Support terminal task")
val result = with(workspace) { task(meta) } val result = workspace.run(task, meta)
return if (placement.isEmpty()) { return if (placement.isEmpty()) {
result result
} else { } else {
DataNode.build(Any::class){ this[placement] = result } DataNode.build(Any::class) { this[placement] = result }
} }
} }
override fun toMeta(): Meta = buildMeta { override fun toMeta(): Meta = buildMeta {
"name" to name "task" to name
"meta" to meta "meta" to meta
"to" to placement
} }
} }

View File

@ -0,0 +1,27 @@
package hep.dataforge.workspace
import hep.dataforge.context.Context
import hep.dataforge.context.Global
import hep.dataforge.context.members
import hep.dataforge.data.DataNode
import hep.dataforge.meta.Meta
/**
* A simple workspace without caching
*/
class SimpleWorkspace(
override val context: Context,
override val data: DataNode<Any>,
override val targets: Map<String, Meta>,
tasks: Collection<Task<Any>>
) : Workspace {
override val tasks: Map<String, Task<*>> by lazy {
(context.members<Task<*>>(Task.TYPE) + tasks).associate { it.name to it }
}
companion object {
fun build(parent: Context = Global, block: SimpleWorkspaceBuilder.() -> Unit): SimpleWorkspace =
SimpleWorkspaceBuilder(parent).apply(block).build()
}
}

View File

@ -68,15 +68,21 @@ class TaskModelBuilder(val name: String, meta: Meta = EmptyMeta) {
var meta: MetaBuilder = meta.builder() var meta: MetaBuilder = meta.builder()
val dependencies = HashSet<Dependency>() val dependencies = HashSet<Dependency>()
var target: String by this.meta.string(key = MODEL_TARGET_KEY,default = "") var target: String by this.meta.string(key = MODEL_TARGET_KEY, default = "")
/** /**
* Add dependency for * Add dependency for
*/ */
fun dependsOn(name: String, meta: Meta, placement: Name = EmptyName) { fun dependsOn(name: String, meta: Meta = this.meta, placement: Name = EmptyName) {
dependencies.add(TaskModelDependency(name, meta, placement)) dependencies.add(TaskModelDependency(name, meta, placement))
} }
fun dependsOn(task: Task<*>, meta: Meta = this.meta, placement: Name = EmptyName) =
dependsOn(task.name, meta, placement)
fun dependsOn(task: Task<*>, placement: Name = EmptyName, metaBuilder: MetaBuilder.() -> Unit) =
dependsOn(task.name, buildMeta(metaBuilder), placement)
/** /**
* Add custom data dependency * Add custom data dependency
*/ */
@ -96,8 +102,8 @@ class TaskModelBuilder(val name: String, meta: Meta = EmptyMeta) {
/** /**
* Add all data as root node * Add all data as root node
*/ */
fun allData() { fun allData(to: Name = EmptyName) {
dependencies.add(DataDependency.all) dependencies.add(AllDataDependency(to))
} }
fun build(): TaskModel = TaskModel(name, meta.seal(), dependencies) fun build(): TaskModel = TaskModel(name, meta.seal(), dependencies)

View File

@ -1,11 +1,11 @@
package hep.dataforge.workspace package hep.dataforge.workspace
import hep.dataforge.context.Context
import hep.dataforge.context.ContextAware import hep.dataforge.context.ContextAware
import hep.dataforge.context.members
import hep.dataforge.data.Data import hep.dataforge.data.Data
import hep.dataforge.data.DataNode import hep.dataforge.data.DataNode
import hep.dataforge.meta.Meta import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
import hep.dataforge.meta.buildMeta
import hep.dataforge.names.Name import hep.dataforge.names.Name
import hep.dataforge.names.toName import hep.dataforge.names.toName
import hep.dataforge.provider.Provider import hep.dataforge.provider.Provider
@ -49,40 +49,46 @@ interface Workspace : ContextAware, Provider {
} }
} }
operator fun <R : Any> Task<R>.invoke(config: Meta): DataNode<R> {
/**
* Invoke a task in the workspace utilizing caching if possible
*/
fun <R : Any> run(task: Task<R>, config: Meta): DataNode<R> {
context.activate(this) context.activate(this)
try { try {
val model = build(this@Workspace, config) val model = task.build(this, config)
validate(model) task.validate(model)
return run(this@Workspace, model) return task.run(this, model)
} finally { } finally {
context.deactivate(this) context.deactivate(this)
} }
} }
/** // /**
* Invoke a task in the workspace utilizing caching if possible // * Invoke a task in the workspace utilizing caching if possible
*/ // */
operator fun <R : Any> Task<R>.invoke(targetName: String): DataNode<R> { // operator fun <R : Any> Task<R>.invoke(targetName: String): DataNode<R> {
val target = targets[targetName] ?: error("A target with name $targetName not found in ${this@Workspace}") // val target = targets[targetName] ?: error("A target with name $targetName not found in ${this@Workspace}")
return invoke(target) // context.logger.info { "Running ${this.name} on $target" }
} // return invoke(target)
// }
companion object { companion object {
const val TYPE = "workspace" const val TYPE = "workspace"
} }
} }
class SimpleWorkspace( fun Workspace.run(task: Task<*>, target: String): DataNode<Any> {
override val context: Context, val meta = targets[target] ?: error("A target with name $target not found in ${this}")
override val data: DataNode<Any>, return run(task, meta)
override val targets: Map<String, Meta>,
tasks: Collection<Task<Any>>
) : Workspace {
override val tasks: Map<String, Task<*>> by lazy {
(context.members<Task<*>>(Task.TYPE) + tasks).associate { it.name to it }
}
} }
fun Workspace.run(task: String, target: String) =
tasks[task]?.let { run(it, target) } ?: error("Task with name $task not found")
fun Workspace.run(task: String, meta: Meta) =
tasks[task]?.let { run(it, meta) } ?: error("Task with name $task not found")
fun Workspace.run(task: String, block: MetaBuilder.() -> Unit = {}) =
run(task, buildMeta(block))

View File

@ -2,39 +2,90 @@ package hep.dataforge.workspace
import hep.dataforge.context.Context import hep.dataforge.context.Context
import hep.dataforge.context.ContextBuilder import hep.dataforge.context.ContextBuilder
import hep.dataforge.data.Data
import hep.dataforge.data.DataNode
import hep.dataforge.data.DataTreeBuilder import hep.dataforge.data.DataTreeBuilder
import hep.dataforge.meta.Meta import hep.dataforge.meta.*
import hep.dataforge.meta.MetaBuilder import hep.dataforge.names.Name
import hep.dataforge.meta.buildMeta import hep.dataforge.names.toName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
interface WorkspaceBuilder {
val parentContext: Context
var context: Context
var data: DataTreeBuilder<Any>
var tasks: MutableSet<Task<Any>>
var targets: MutableMap<String, Meta>
fun build(): Workspace
}
/** /**
* A builder for a workspace * Set the context for future workspcace
*/ */
class WorkspaceBuilder(var context: Context) { fun WorkspaceBuilder.context(name: String, block: ContextBuilder.() -> Unit) {
val data = DataTreeBuilder(Any::class) context = ContextBuilder(name, parentContext).apply(block).build()
val targets = HashMap<String, Meta>() }
val tasks = HashSet<Task<Any>>()
fun WorkspaceBuilder.data(name: Name, data: Data<Any>) {
fun context(action: ContextBuilder.() -> Unit) { this.data[name] = data
this.context = ContextBuilder().apply(action).build() }
}
fun WorkspaceBuilder.data(name: String, data: Data<Any>) = data(name.toName(), data)
fun data(action: DataTreeBuilder<Any>.() -> Unit) = data.apply(action)
fun WorkspaceBuilder.static(name: Name, data: Any, scope: CoroutineScope = GlobalScope, meta: Meta = EmptyMeta) =
fun target(name: String, meta: Meta) { data(name, Data.static(scope, data, meta))
targets[name] = meta
} fun WorkspaceBuilder.static(name: Name, data: Any, scope: CoroutineScope = GlobalScope, block: MetaBuilder.() -> Unit = {}) =
data(name, Data.static(scope, data, buildMeta(block)))
fun target(name: String, action: MetaBuilder.() -> Unit) = target(name, buildMeta(action))
fun WorkspaceBuilder.static(name: String, data: Any, scope: CoroutineScope = GlobalScope, block: MetaBuilder.() -> Unit = {}) =
fun task(task: Task<*>) { data(name, Data.static(scope, data, buildMeta(block)))
tasks.add(task)
} fun WorkspaceBuilder.data(name: Name, node: DataNode<Any>) {
this.data[name] = node
fun build(): Workspace = SimpleWorkspace( }
context,
data.build(), fun WorkspaceBuilder.data(name: String, node: DataNode<Any>) = data(name.toName(), node)
targets,
tasks fun WorkspaceBuilder.data(name: Name, block: DataTreeBuilder<Any>.() -> Unit) {
) this.data[name] = DataNode.build(Any::class, block)
}
fun WorkspaceBuilder.data(name: String, block: DataTreeBuilder<Any>.() -> Unit) = data(name.toName(), block)
fun WorkspaceBuilder.target(name: String, block: MetaBuilder.() -> Unit) {
targets[name] = buildMeta(block).seal()
}
/**
* Use existing target as a base updating it with the block
*/
fun WorkspaceBuilder.target(name: String, base: String, block: MetaBuilder.() -> Unit) {
val parentTarget = targets[base] ?: error("Base target with name $base not found")
targets[name] = parentTarget.builder()
.apply { "@baseTarget" to base }
.apply(block)
.seal()
}
fun WorkspaceBuilder.task(task: Task<Any>) {
this.tasks.add(task)
}
/**
* A builder for a simple workspace
*/
class SimpleWorkspaceBuilder(override val parentContext: Context) : WorkspaceBuilder {
override var context: Context = parentContext
override var data = DataTreeBuilder(Any::class)
override var tasks: MutableSet<Task<Any>> = HashSet()
override var targets: MutableMap<String, Meta> = HashMap()
override fun build(): SimpleWorkspace {
return SimpleWorkspace(context, data.build(), targets, tasks)
}
} }

View File

@ -75,13 +75,13 @@ class KTaskBuilder(val name: String) {
val to: String = "", val to: String = "",
val transform: Workspace.() -> TaskModel.(DataNode<Any>) -> DataNode<Any> val transform: Workspace.() -> TaskModel.(DataNode<Any>) -> DataNode<Any>
) { ) {
operator fun Workspace.invoke(model: TaskModel, node: DataNode<Any>): DataNode<Any>? { operator fun invoke(workspace: Workspace, model: TaskModel, node: DataNode<Any>): DataNode<Any>? {
val localData = if (from.isEmpty()) { val localData = if (from.isEmpty()) {
node node
} else { } else {
node.getNode(from.toName()) ?: return null node.getNode(from.toName()) ?: return null
} }
return transform(this).invoke(model, localData) return transform(workspace).invoke(model, localData)
} }
} }
@ -91,14 +91,18 @@ class KTaskBuilder(val name: String) {
this.modelTransform = modelTransform this.modelTransform = modelTransform
} }
//class TaskEnv(val workspace: Workspace, val model: TaskModel)
fun <T : Any> transform( fun <T : Any> transform(
inputType: KClass<T>, inputType: KClass<T>,
from: String = "", from: String = "",
to: String = "", to: String = "",
transform: Workspace.() -> TaskModel.(DataNode<T>) -> DataNode<Any> transform: Workspace.() -> TaskModel.(DataNode<T>) -> DataNode<Any>
) { ) {
dataTransforms += DataTransformation(from, to) { data: DataNode<Any> -> dataTransforms += DataTransformation(from, to) {
transform.invoke(this, data.cast(inputType)) { data: DataNode<Any> ->
transform().invoke(this, data.cast(inputType))
}
} }
} }
@ -113,77 +117,89 @@ class KTaskBuilder(val name: String) {
/** /**
* Perform given action on data elements in `from` node in input and put the result to `to` node * Perform given action on data elements in `from` node in input and put the result to `to` node
*/ */
inline fun <reified T : Any, reified R : Any> action(action: Action<T, R>, from: String = "", to: String = "") { inline fun <reified T : Any, reified R : Any> action(
transform(from, to) { data: DataNode<T> -> from: String = "",
action(data, model.meta) to: String = "",
crossinline actionBuilder: Workspace.() -> Action<T, R>
) {
transform(from, to) {
val res: TaskModel.(DataNode<T>) -> DataNode<Any> = { data: DataNode<T> ->
actionBuilder().invoke(data, meta)
}
res
} }
} }
inline fun <reified T : Any, reified R : Any> pipeAction( inline fun <reified T : Any, reified R : Any> pipeAction(
from: String = "", from: String = "",
to: String = "", to: String = "",
noinline action: PipeBuilder<T, R>.() -> Unit crossinline block: Workspace.() -> PipeBuilder<T, R>.() -> Unit
) { ) {
val pipe: Action<T, R> = PipeAction( action(from, to) {
PipeAction(
inputType = T::class, inputType = T::class,
outputType = R::class, outputType = R::class,
block = action block = block()
) )
action(pipe, from, to); }
} }
inline fun <reified T : Any, reified R : Any> pipe( inline fun <reified T : Any, reified R : Any> pipe(
from: String = "", from: String = "",
to: String = "", to: String = "",
noinline action: suspend ActionEnv.(T) -> R crossinline block: Workspace.() -> suspend ActionEnv.(T) -> R
) { ) {
val pipe: Action<T, R> = PipeAction( action(from, to) {
PipeAction(
inputType = T::class, inputType = T::class,
outputType = R::class outputType = R::class
) { result(action) } ) { result(block()) }
action(pipe, from, to); }
} }
inline fun <reified T : Any, reified R : Any> joinAction( inline fun <reified T : Any, reified R : Any> joinAction(
from: String = "", from: String = "",
to: String = "", to: String = "",
noinline action: JoinGroupBuilder<T, R>.() -> Unit crossinline block: Workspace.() -> JoinGroupBuilder<T, R>.() -> Unit
) { ) {
val join: Action<T, R> = JoinAction( action(from, to) {
JoinAction(
inputType = T::class, inputType = T::class,
outputType = R::class, outputType = R::class,
action = action action = block()
) )
action(join, from, to); }
} }
inline fun <reified T : Any, reified R : Any> join( inline fun <reified T : Any, reified R : Any> join(
from: String = "", from: String = "",
to: String = "", to: String = "",
noinline action: suspend ActionEnv.(Map<Name, T>) -> R crossinline block: Workspace.() -> suspend ActionEnv.(Map<Name, T>) -> R
) { ) {
val join: Action<T, R> = JoinAction( action(from, to) {
JoinAction(
inputType = T::class, inputType = T::class,
outputType = R::class, outputType = R::class,
action = { action = {
result(actionMeta[TaskModel.MODEL_TARGET_KEY]?.string ?: "@anonimous", action) result(actionMeta[TaskModel.MODEL_TARGET_KEY]?.string ?: "@anonimous", block())
} }
) )
action(join, from, to); }
} }
inline fun <reified T : Any, reified R : Any> splitAction( inline fun <reified T : Any, reified R : Any> splitAction(
from: String = "", from: String = "",
to: String = "", to: String = "",
noinline action: SplitBuilder<T, R>.() -> Unit crossinline block: Workspace.() -> SplitBuilder<T, R>.() -> Unit
) { ) {
val split: Action<T, R> = SplitAction( action(from, to) {
SplitAction(
inputType = T::class, inputType = T::class,
outputType = R::class, outputType = R::class,
action = action action = block()
) )
action(split, from, to); }
} }
/** /**
@ -193,22 +209,24 @@ class KTaskBuilder(val name: String) {
this.descriptor = NodeDescriptor.build(transform) this.descriptor = NodeDescriptor.build(transform)
} }
fun build(): GenericTask<Any> { fun build(): GenericTask<Any> =
val transform: Workspace.() -> TaskModel.(DataNode<Any>) -> DataNode<Any> = { GenericTask(name, Any::class, descriptor ?: NodeDescriptor.empty(), modelTransform) {
val workspace = this
{ data -> { data ->
val model = this
if (dataTransforms.isEmpty()) { if (dataTransforms.isEmpty()) {
//return data node as is //return data node as is
logger.warn("No transformation present, returning input data") logger.warn("No transformation present, returning input data")
data data
} else { } else {
val builder = DataTreeBuilder(Any::class) val builder = DataTreeBuilder(Any::class)
dataTransforms.forEach { transform -> dataTransforms.forEach { transformation ->
val res = transform(this, data) val res = transformation(workspace, model, data)
if (res != null) { if (res != null) {
if (transform.to.isEmpty()) { if (transformation.to.isEmpty()) {
builder.update(res) builder.update(res)
} else { } else {
builder[transform.to.toName()] = res builder[transformation.to.toName()] = res
} }
} }
} }
@ -216,10 +234,12 @@ class KTaskBuilder(val name: String) {
} }
} }
} }
return GenericTask(name, Any::class, descriptor ?: NodeDescriptor.empty(), modelTransform, transform);
}
} }
fun task(name: String, builder: KTaskBuilder.() -> Unit): GenericTask<Any> { fun task(name: String, builder: KTaskBuilder.() -> Unit): GenericTask<Any> {
return KTaskBuilder(name).apply(builder).build(); return KTaskBuilder(name).apply(builder).build()
}
fun WorkspaceBuilder.task(name: String, builder: KTaskBuilder.() -> Unit) {
task(KTaskBuilder(name).apply(builder).build())
} }

View File

@ -0,0 +1,48 @@
package hep.dataforge.workspace
import hep.dataforge.data.first
import hep.dataforge.data.get
import org.junit.Test
import kotlin.test.assertEquals
class SimpleWorkspaceTest {
val workspace = SimpleWorkspace.build {
repeat(100) {
static("myData[$it]", it)
}
task("square") {
model {
allData()
}
pipe<Int, Int> {
{ data ->
context.logger.info { "Starting square on $data" }
data * data
}
}
}
task("sum"){
model {
dependsOn("square")
}
join<Int,Int> {
{ data ->
context.logger.info { "Starting sum" }
data.values.sum()
}
}
}
}
@Test
fun testWorkspace(){
val node = workspace.run("sum")
val res = node.first()
assertEquals(328350, res.get())
}
}