From aded38254e647d7cb9ffad6acea653060007ce46 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sat, 9 Oct 2021 10:49:38 +0300 Subject: [PATCH] Workspace and task updates --- .../space/kscience/dataforge/data/select.kt | 10 +++-- .../kscience/dataforge/workspace/TaskData.kt | 4 +- .../dataforge/workspace/TaskResult.kt | 4 +- .../dataforge/workspace/taskBuilders.kt | 40 ++++++++++++++++++- ...workspaceExtensions.kt => workspaceJvm.kt} | 4 ++ .../workspace/SimpleWorkspaceTest.kt | 27 +++++++++---- 6 files changed, 71 insertions(+), 18 deletions(-) rename dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/{workspaceExtensions.kt => workspaceJvm.kt} (63%) diff --git a/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/select.kt b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/select.kt index 7d311159..2cbbc1c1 100644 --- a/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/select.kt +++ b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/select.kt @@ -16,8 +16,12 @@ import kotlin.reflect.typeOf */ @Suppress("UNCHECKED_CAST") private fun Data<*>.castOrNull(type: KType): Data? = - if (!this.type.isSubtypeOf(type)) null else object : Data by (this as Data) { - override val type: KType = type + if (!this.type.isSubtypeOf(type)) { + null + } else { + object : Data by (this as Data) { + override val type: KType = type + } } /** @@ -31,7 +35,6 @@ internal fun DataSet<*>.select( ): ActiveDataSet = object : ActiveDataSet { override val dataType = type - override fun flow(): Flow> = this@select.flow().filter { datum -> datum.type.isSubtypeOf(type) && (namePattern == null || datum.name.matches(namePattern)) }.map { @@ -45,7 +48,6 @@ internal fun DataSet<*>.select( val datum = this@select.getData(it) datum?.type?.isSubtypeOf(type) ?: false } - } /** diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskData.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskData.kt index 6d14616d..bd2bdb48 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskData.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskData.kt @@ -42,6 +42,6 @@ private class TaskDataImpl( // } } -internal fun Workspace.wrapResult(data: Data, name: Name, stage: Name, stageMeta: Meta): TaskData = - TaskDataImpl(this, data, name, stage, stageMeta) +public fun Workspace.wrapData(data: Data, name: Name, taskName: Name, stageMeta: Meta): TaskData = + TaskDataImpl(this, data, name, taskName, stageMeta) diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt index 61e4c650..5a65f219 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt @@ -37,11 +37,11 @@ private class TaskResultImpl( ) : TaskResult, DataSet by dataSet { override fun flow(): Flow> = dataSet.flow().map { - workspace.wrapResult(it, it.name, taskName, taskMeta) + workspace.wrapData(it, it.name, taskName, taskMeta) } override suspend fun getData(name: Name): TaskData? = dataSet.getData(name)?.let { - workspace.wrapResult(it, name, taskName, taskMeta) + workspace.wrapData(it, name, taskName, taskMeta) } } diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskBuilders.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskBuilders.kt index bcfeae32..4744d415 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskBuilders.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskBuilders.kt @@ -1,9 +1,45 @@ package space.kscience.dataforge.workspace import space.kscience.dataforge.data.DataSet +import space.kscience.dataforge.data.forEach +import space.kscience.dataforge.data.map import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.toMutableMeta +import space.kscience.dataforge.misc.DFExperimental +import space.kscience.dataforge.names.Name +/** + * Select data using given [selector] + */ public suspend fun TaskResultBuilder<*>.from( selector: DataSelector, - meta: Meta = Meta.EMPTY, -): DataSet = selector.select(workspace, meta) +): DataSet = selector.select(workspace, taskMeta) + +public val TaskResultBuilder<*>.allData: DataSelector<*> + get() = object : DataSelector { + override suspend fun select(workspace: Workspace, meta: Meta): DataSet = workspace.data + } + +/** + * Perform a lazy mapping task using given [selector] and [action]. The meta of resulting + * TODO move selector to receiver with multi-receivers + */ +@DFExperimental +public suspend inline fun TaskResultBuilder.pipeFrom( + selector: DataSelector, + crossinline action: suspend (arg: T, name: Name, meta: Meta) -> R +) { + from(selector).forEach { data -> + val meta = data.meta.toMutableMeta().apply { + taskName put taskMeta + } + + val res = data.map(workspace.context.coroutineContext, meta) { + action(it, data.name, meta) + } + + emit(data.name, res) + } +} + + diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceExtensions.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt similarity index 63% rename from dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceExtensions.kt rename to dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt index 2690a8f6..61e1dbec 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceExtensions.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt @@ -11,6 +11,10 @@ public fun WorkspaceBuilder.data(builder: suspend DataSetBuilder.() -> Unit buildData(builder) } +public inline fun TaskResultBuilder<*>.selectData(namePattern: Name? = null): DataSelector = object : DataSelector { + override suspend fun select(workspace: Workspace, meta: Meta): DataSet = workspace.data.select(namePattern) +} + public suspend inline fun TaskResultBuilder<*>.from( task: Name, taskMeta: Meta = Meta.EMPTY, diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt index e5657444..edbe3a4b 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt @@ -71,21 +71,32 @@ class SimpleWorkspaceTest { } val square by task { - workspace.data.select().forEach { data -> - if (data.meta["testFlag"].boolean == true) { + pipeFrom(selectData()) { arg, name, meta -> + if (meta["testFlag"].boolean == true) { println("flag") } - val value = data.await() - workspace.logger.info { "Starting square on $value" } - emit(data.name, data.map { it * it }) + workspace.logger.info { "Starting square on $name" } + arg * arg } +// workspace.data.select().forEach { data -> +// if (data.meta["testFlag"].boolean == true) { +// println("flag") +// } +// val value = data.await() +// workspace.logger.info { "Starting square on $value" } +// emit(data.name, data.map { it * it }) +// } } val linear by task { - workspace.data.select().forEach { data -> - workspace.logger.info { "Starting linear on $data" } - emit(data.name, data.data.map { it * 2 + 1 }) + pipeFrom(selectData()) { arg, name, _ -> + workspace.logger.info { "Starting linear on $name" } + arg * 2 + 1 } +// workspace.data.select().forEach { data -> +// workspace.logger.info { "Starting linear on $data" } +// emit(data.name, data.data.map { it * 2 + 1 }) +// } } val fullSquare by task {