Workspace and task updates

This commit is contained in:
Alexander Nozik 2021-10-09 10:49:38 +03:00
parent 00d964eef3
commit aded38254e
6 changed files with 71 additions and 18 deletions

View File

@ -16,8 +16,12 @@ import kotlin.reflect.typeOf
*/ */
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
private fun <R : Any> Data<*>.castOrNull(type: KType): Data<R>? = private fun <R : Any> Data<*>.castOrNull(type: KType): Data<R>? =
if (!this.type.isSubtypeOf(type)) null else object : Data<R> by (this as Data<R>) { if (!this.type.isSubtypeOf(type)) {
override val type: KType = type null
} else {
object : Data<R> by (this as Data<R>) {
override val type: KType = type
}
} }
/** /**
@ -31,7 +35,6 @@ internal fun <R : Any> DataSet<*>.select(
): ActiveDataSet<R> = object : ActiveDataSet<R> { ): ActiveDataSet<R> = object : ActiveDataSet<R> {
override val dataType = type override val dataType = type
override fun flow(): Flow<NamedData<R>> = this@select.flow().filter { datum -> override fun flow(): Flow<NamedData<R>> = this@select.flow().filter { datum ->
datum.type.isSubtypeOf(type) && (namePattern == null || datum.name.matches(namePattern)) datum.type.isSubtypeOf(type) && (namePattern == null || datum.name.matches(namePattern))
}.map { }.map {
@ -45,7 +48,6 @@ internal fun <R : Any> DataSet<*>.select(
val datum = this@select.getData(it) val datum = this@select.getData(it)
datum?.type?.isSubtypeOf(type) ?: false datum?.type?.isSubtypeOf(type) ?: false
} }
} }
/** /**

View File

@ -42,6 +42,6 @@ private class TaskDataImpl<out T : Any>(
// } // }
} }
internal fun <T : Any> Workspace.wrapResult(data: Data<T>, name: Name, stage: Name, stageMeta: Meta): TaskData<T> = public fun <T : Any> Workspace.wrapData(data: Data<T>, name: Name, taskName: Name, stageMeta: Meta): TaskData<T> =
TaskDataImpl(this, data, name, stage, stageMeta) TaskDataImpl(this, data, name, taskName, stageMeta)

View File

@ -37,11 +37,11 @@ private class TaskResultImpl<out T : Any>(
) : TaskResult<T>, DataSet<T> by dataSet { ) : TaskResult<T>, DataSet<T> by dataSet {
override fun flow(): Flow<TaskData<T>> = dataSet.flow().map { override fun flow(): Flow<TaskData<T>> = dataSet.flow().map {
workspace.wrapResult(it, it.name, taskName, taskMeta) workspace.wrapData(it, it.name, taskName, taskMeta)
} }
override suspend fun getData(name: Name): TaskData<T>? = dataSet.getData(name)?.let { override suspend fun getData(name: Name): TaskData<T>? = dataSet.getData(name)?.let {
workspace.wrapResult(it, name, taskName, taskMeta) workspace.wrapData(it, name, taskName, taskMeta)
} }
} }

View File

@ -1,9 +1,45 @@
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import space.kscience.dataforge.data.DataSet import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.forEach
import space.kscience.dataforge.data.map
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.toMutableMeta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
/**
* Select data using given [selector]
*/
public suspend fun <T : Any> TaskResultBuilder<*>.from( public suspend fun <T : Any> TaskResultBuilder<*>.from(
selector: DataSelector<T>, selector: DataSelector<T>,
meta: Meta = Meta.EMPTY, ): DataSet<T> = selector.select(workspace, taskMeta)
): DataSet<T> = selector.select(workspace, meta)
public val TaskResultBuilder<*>.allData: DataSelector<*>
get() = object : DataSelector<Any> {
override suspend fun select(workspace: Workspace, meta: Meta): DataSet<Any> = workspace.data
}
/**
* Perform a lazy mapping task using given [selector] and [action]. The meta of resulting
* TODO move selector to receiver with multi-receivers
*/
@DFExperimental
public suspend inline fun <T : Any, reified R : Any> TaskResultBuilder<R>.pipeFrom(
selector: DataSelector<T>,
crossinline action: suspend (arg: T, name: Name, meta: Meta) -> R
) {
from(selector).forEach { data ->
val meta = data.meta.toMutableMeta().apply {
taskName put taskMeta
}
val res = data.map(workspace.context.coroutineContext, meta) {
action(it, data.name, meta)
}
emit(data.name, res)
}
}

View File

@ -11,6 +11,10 @@ public fun WorkspaceBuilder.data(builder: suspend DataSetBuilder<Any>.() -> Unit
buildData(builder) buildData(builder)
} }
public inline fun <reified T: Any> TaskResultBuilder<*>.selectData(namePattern: Name? = null): DataSelector<T> = object : DataSelector<T> {
override suspend fun select(workspace: Workspace, meta: Meta): DataSet<T> = workspace.data.select(namePattern)
}
public suspend inline fun <reified T : Any> TaskResultBuilder<*>.from( public suspend inline fun <reified T : Any> TaskResultBuilder<*>.from(
task: Name, task: Name,
taskMeta: Meta = Meta.EMPTY, taskMeta: Meta = Meta.EMPTY,

View File

@ -71,21 +71,32 @@ class SimpleWorkspaceTest {
} }
val square by task<Int> { val square by task<Int> {
workspace.data.select<Int>().forEach { data -> pipeFrom(selectData<Int>()) { arg, name, meta ->
if (data.meta["testFlag"].boolean == true) { if (meta["testFlag"].boolean == true) {
println("flag") println("flag")
} }
val value = data.await() workspace.logger.info { "Starting square on $name" }
workspace.logger.info { "Starting square on $value" } arg * arg
emit(data.name, data.map { it * it })
} }
// workspace.data.select<Int>().forEach { data ->
// if (data.meta["testFlag"].boolean == true) {
// println("flag")
// }
// val value = data.await()
// workspace.logger.info { "Starting square on $value" }
// emit(data.name, data.map { it * it })
// }
} }
val linear by task<Int> { val linear by task<Int> {
workspace.data.select<Int>().forEach { data -> pipeFrom(selectData<Int>()) { arg, name, _ ->
workspace.logger.info { "Starting linear on $data" } workspace.logger.info { "Starting linear on $name" }
emit(data.name, data.data.map { it * 2 + 1 }) arg * 2 + 1
} }
// workspace.data.select<Int>().forEach { data ->
// workspace.logger.info { "Starting linear on $data" }
// emit(data.name, data.data.map { it * 2 + 1 })
// }
} }
val fullSquare by task<Int> { val fullSquare by task<Int> {