Workspace and task updates
This commit is contained in:
parent
b07d281a83
commit
00d964eef3
@ -17,7 +17,7 @@ public class SimpleWorkspace(
|
|||||||
private val externalTasks: Map<Name, Task<*>>,
|
private val externalTasks: Map<Name, Task<*>>,
|
||||||
) : Workspace {
|
) : Workspace {
|
||||||
|
|
||||||
override val data: TaskResult<*> = internalize(data, Name.EMPTY, Meta.EMPTY)
|
override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY)
|
||||||
|
|
||||||
override val tasks: Map<Name, Task<*>>
|
override val tasks: Map<Name, Task<*>>
|
||||||
get() = context.gather<Task<*>>(Task.TYPE) + externalTasks
|
get() = context.gather<Task<*>>(Task.TYPE) + externalTasks
|
||||||
|
@ -28,7 +28,7 @@ public interface Task<out T : Any> : Described {
|
|||||||
public suspend fun execute(workspace: Workspace, taskName: Name, taskMeta: Meta): TaskResult<T>
|
public suspend fun execute(workspace: Workspace, taskName: Name, taskMeta: Meta): TaskResult<T>
|
||||||
|
|
||||||
public companion object {
|
public companion object {
|
||||||
public const val TYPE: String = "workspace.stage"
|
public const val TYPE: String = "workspace.task"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -42,6 +42,10 @@ public class TaskResultBuilder<T : Any>(
|
|||||||
/**
|
/**
|
||||||
* Create a [Task] that composes a result using [builder]. Only data from the workspace could be used.
|
* Create a [Task] that composes a result using [builder]. Only data from the workspace could be used.
|
||||||
* Data dependency cycles are not allowed.
|
* Data dependency cycles are not allowed.
|
||||||
|
*
|
||||||
|
* @param resultType the type boundary for data produced by this task
|
||||||
|
* @param descriptor of meta accepted by this task
|
||||||
|
* @param builder for resulting data set
|
||||||
*/
|
*/
|
||||||
@Suppress("FunctionName")
|
@Suppress("FunctionName")
|
||||||
@DFInternal
|
@DFInternal
|
||||||
@ -60,9 +64,9 @@ public fun <T : Any> Task(
|
|||||||
): TaskResult<T> = withContext(GoalExecutionRestriction() + workspace.goalLogger) {
|
): TaskResult<T> = withContext(GoalExecutionRestriction() + workspace.goalLogger) {
|
||||||
//TODO use safe builder and check for external data on add and detects cycles
|
//TODO use safe builder and check for external data on add and detects cycles
|
||||||
val dataset = DataTree<T>(resultType) {
|
val dataset = DataTree<T>(resultType) {
|
||||||
TaskResultBuilder(workspace,taskName, taskMeta, this).apply { builder() }
|
TaskResultBuilder(workspace, taskName, taskMeta, this).apply { builder() }
|
||||||
}
|
}
|
||||||
workspace.internalize(dataset, taskName, taskMeta)
|
workspace.wrapResult(dataset, taskName, taskMeta)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -42,6 +42,6 @@ private class TaskDataImpl<out T : Any>(
|
|||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
internal fun <T : Any> Workspace.internalize(data: Data<T>, name: Name, stage: Name, stageMeta: Meta): TaskData<T> =
|
internal fun <T : Any> Workspace.wrapResult(data: Data<T>, name: Name, stage: Name, stageMeta: Meta): TaskData<T> =
|
||||||
TaskDataImpl(this, data, name, stage, stageMeta)
|
TaskDataImpl(this, data, name, stage, stageMeta)
|
||||||
|
|
||||||
|
@ -37,13 +37,16 @@ private class TaskResultImpl<out T : Any>(
|
|||||||
) : TaskResult<T>, DataSet<T> by dataSet {
|
) : TaskResult<T>, DataSet<T> by dataSet {
|
||||||
|
|
||||||
override fun flow(): Flow<TaskData<T>> = dataSet.flow().map {
|
override fun flow(): Flow<TaskData<T>> = dataSet.flow().map {
|
||||||
workspace.internalize(it, it.name, taskName, taskMeta)
|
workspace.wrapResult(it, it.name, taskName, taskMeta)
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun getData(name: Name): TaskData<T>? = dataSet.getData(name)?.let {
|
override suspend fun getData(name: Name): TaskData<T>? = dataSet.getData(name)?.let {
|
||||||
workspace.internalize(it, name, taskName, taskMeta)
|
workspace.wrapResult(it, name, taskName, taskMeta)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
internal fun <T : Any> Workspace.internalize(dataSet: DataSet<T>, stage: Name, stageMeta: Meta): TaskResult<T> =
|
/**
|
||||||
TaskResultImpl(this, dataSet, stage, stageMeta)
|
* Wrap data into [TaskResult]
|
||||||
|
*/
|
||||||
|
public fun <T : Any> Workspace.wrapResult(dataSet: DataSet<T>, taskName: Name, taskMeta: Meta): TaskResult<T> =
|
||||||
|
TaskResultImpl(this, dataSet, taskName, taskMeta)
|
@ -1,6 +1,7 @@
|
|||||||
package space.kscience.dataforge.workspace
|
package space.kscience.dataforge.workspace
|
||||||
|
|
||||||
import space.kscience.dataforge.context.ContextAware
|
import space.kscience.dataforge.context.ContextAware
|
||||||
|
import space.kscience.dataforge.data.DataSet
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.meta.MutableMeta
|
import space.kscience.dataforge.meta.MutableMeta
|
||||||
import space.kscience.dataforge.misc.Type
|
import space.kscience.dataforge.misc.Type
|
||||||
@ -8,6 +9,10 @@ import space.kscience.dataforge.names.Name
|
|||||||
import space.kscience.dataforge.provider.Provider
|
import space.kscience.dataforge.provider.Provider
|
||||||
|
|
||||||
|
|
||||||
|
public interface DataSelector<T: Any>{
|
||||||
|
public suspend fun select(workspace: Workspace, meta: Meta): DataSet<T>
|
||||||
|
}
|
||||||
|
|
||||||
@Type(Workspace.TYPE)
|
@Type(Workspace.TYPE)
|
||||||
public interface Workspace : ContextAware, Provider {
|
public interface Workspace : ContextAware, Provider {
|
||||||
/**
|
/**
|
||||||
|
@ -17,7 +17,18 @@ import space.kscience.dataforge.names.Name
|
|||||||
import kotlin.properties.PropertyDelegateProvider
|
import kotlin.properties.PropertyDelegateProvider
|
||||||
import kotlin.properties.ReadOnlyProperty
|
import kotlin.properties.ReadOnlyProperty
|
||||||
|
|
||||||
public data class TaskReference<T: Any>(public val taskName: Name, public val task: Task<T>)
|
public data class TaskReference<T: Any>(public val taskName: Name, public val task: Task<T>): DataSelector<T>{
|
||||||
|
|
||||||
|
@Suppress("UNCHECKED_CAST")
|
||||||
|
override suspend fun select(workspace: Workspace, meta: Meta): DataSet<T> {
|
||||||
|
if (workspace.tasks[taskName] == task) {
|
||||||
|
return workspace.produce(taskName, meta) as TaskResult<T>
|
||||||
|
} else {
|
||||||
|
error("Task $taskName does not belong to the workspace")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public interface TaskContainer {
|
public interface TaskContainer {
|
||||||
public fun registerTask(taskName: Name, task: Task<*>)
|
public fun registerTask(taskName: Name, task: Task<*>)
|
||||||
|
@ -0,0 +1,9 @@
|
|||||||
|
package space.kscience.dataforge.workspace
|
||||||
|
|
||||||
|
import space.kscience.dataforge.data.DataSet
|
||||||
|
import space.kscience.dataforge.meta.Meta
|
||||||
|
|
||||||
|
public suspend fun <T : Any> TaskResultBuilder<*>.from(
|
||||||
|
selector: DataSelector<T>,
|
||||||
|
meta: Meta = Meta.EMPTY,
|
||||||
|
): DataSet<T> = selector.select(workspace, meta)
|
@ -1,24 +0,0 @@
|
|||||||
package space.kscience.dataforge.workspace
|
|
||||||
|
|
||||||
import space.kscience.dataforge.data.DataSet
|
|
||||||
import space.kscience.dataforge.data.select
|
|
||||||
import space.kscience.dataforge.meta.Meta
|
|
||||||
import space.kscience.dataforge.names.Name
|
|
||||||
|
|
||||||
public suspend inline fun <reified T : Any> TaskResultBuilder<T>.from(
|
|
||||||
task: Name,
|
|
||||||
taskMeta: Meta = Meta.EMPTY,
|
|
||||||
): DataSet<T> = workspace.produce(task, taskMeta).select()
|
|
||||||
|
|
||||||
|
|
||||||
@Suppress("UNCHECKED_CAST")
|
|
||||||
public suspend fun <R : Any> TaskResultBuilder<*>.from(
|
|
||||||
reference: TaskReference<R>,
|
|
||||||
taskMeta: Meta = Meta.EMPTY,
|
|
||||||
): DataSet<R> {
|
|
||||||
if (workspace.tasks[reference.taskName] == reference.task) {
|
|
||||||
return workspace.produce(reference.taskName, taskMeta) as TaskResult<R>
|
|
||||||
} else {
|
|
||||||
throw error("Task ${reference.taskName} does not belong to the workspace")
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,8 +1,17 @@
|
|||||||
package space.kscience.dataforge.workspace
|
package space.kscience.dataforge.workspace
|
||||||
|
|
||||||
import kotlinx.coroutines.runBlocking
|
import kotlinx.coroutines.runBlocking
|
||||||
|
import space.kscience.dataforge.data.DataSet
|
||||||
import space.kscience.dataforge.data.DataSetBuilder
|
import space.kscience.dataforge.data.DataSetBuilder
|
||||||
|
import space.kscience.dataforge.data.select
|
||||||
|
import space.kscience.dataforge.meta.Meta
|
||||||
|
import space.kscience.dataforge.names.Name
|
||||||
|
|
||||||
public fun WorkspaceBuilder.data(builder: suspend DataSetBuilder<Any>.() -> Unit): Unit = runBlocking {
|
public fun WorkspaceBuilder.data(builder: suspend DataSetBuilder<Any>.() -> Unit): Unit = runBlocking {
|
||||||
buildData(builder)
|
buildData(builder)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public suspend inline fun <reified T : Any> TaskResultBuilder<*>.from(
|
||||||
|
task: Name,
|
||||||
|
taskMeta: Meta = Meta.EMPTY,
|
||||||
|
): DataSet<T> = workspace.produce(task, taskMeta).select()
|
Loading…
Reference in New Issue
Block a user