diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSink.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSink.kt index c8a0f2a7..c9786244 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSink.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSink.kt @@ -8,6 +8,9 @@ import space.kscience.dataforge.names.* import kotlin.reflect.KType import kotlin.reflect.typeOf +/** + * A marker scope for data builders + */ public interface DataBuilderScope<in T> { public companion object : DataBuilderScope<Nothing> } @@ -30,21 +33,19 @@ public fun interface DataSink<in T> : DataBuilderScope<T> { * A mutable version of [DataTree] */ public interface MutableDataTree<T> : DataTree<T>, DataSink<T> { - override var data: Data<T>? - override val items: Map<NameToken, MutableDataTree<T>> - - public fun getOrCreateItem(token: NameToken): MutableDataTree<T> - - public suspend fun put(token: NameToken, data: Data<T>?) - - override suspend fun put(name: Name, data: Data<T>?): Unit { - when (name.length) { - 0 -> this.data = data - 1 -> put(name.first(), data) - else -> getOrCreateItem(name.first()).put(name.cutFirst(), data) - } - } +// +// public fun getOrCreateItem(token: NameToken): MutableDataTree<T> +// +// public suspend fun put(token: NameToken, data: Data<T>?) +// +// override suspend fun put(name: Name, data: Data<T>?): Unit { +// when (name.length) { +// 0 -> this.data = data +// 1 -> put(name.first(), data) +// else -> getOrCreateItem(name.first()).put(name.cutFirst(), data) +// } +// } } /** @@ -62,11 +63,12 @@ private class MutableDataTreeRoot<T>( ) : MutableDataTree<T> { override val items = HashMap<NameToken, MutableDataTree<T>>() - override val updates = MutableSharedFlow<Name>(extraBufferCapacity = 100) + override val updates = MutableSharedFlow<Name>() inner class MutableDataTreeBranch(val branchName: Name) : MutableDataTree<T> { override var data: Data<T>? = null + private set override val items = HashMap<NameToken, MutableDataTree<T>>() @@ -75,26 +77,43 @@ private class MutableDataTreeRoot<T>( } override val dataType: KType get() = this@MutableDataTreeRoot.dataType + override suspend fun put( + name: Name, + data: Data<T>? + ) { + when (name.length) { + 0 -> { + this.data = data + this@MutableDataTreeRoot.updates.emit(branchName) + } - override fun getOrCreateItem(token: NameToken): MutableDataTree<T> = - items.getOrPut(token) { MutableDataTreeBranch(branchName + token) } + else -> { + val token = name.first() + items.getOrPut(token) { MutableDataTreeBranch(branchName + token) }.put(name.cutFirst(), data) + } + } + } + } + override var data: Data<T>? = null + private set - override suspend fun put(token: NameToken, data: Data<T>?) { - this.data = data - this@MutableDataTreeRoot.updates.emit(branchName + token) + override suspend fun put( + name: Name, + data: Data<T>? + ) { + when (name.length) { + 0 -> { + this.data = data + this@MutableDataTreeRoot.updates.emit(Name.EMPTY) + } + + else -> { + val token = name.first() + items.getOrPut(token) { MutableDataTreeBranch(token.asName()) }.put(name.cutFirst(), data) + } } } - override var data: Data<T>? = null - - override fun getOrCreateItem(token: NameToken): MutableDataTree<T> = items.getOrPut(token) { - MutableDataTreeBranch(token.asName()) - } - - override suspend fun put(token: NameToken, data: Data<T>?) { - this.data = data - updates.emit(token.asName()) - } } /** @@ -106,7 +125,7 @@ public fun <T> MutableDataTree( ): MutableDataTree<T> = MutableDataTreeRoot<T>(type) /** - * Create and initialize a observable mutable data tree. + * Create and initialize an observable mutable data tree. */ @OptIn(UnsafeKType::class) public inline fun <reified T> MutableDataTree( diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSource.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSource.kt index 7ee87180..531d37fd 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSource.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSource.kt @@ -38,9 +38,8 @@ public interface ObservableDataSource<out T> : DataSource<T> { public val updates: Flow<Name> } -public suspend fun <T> ObservableDataSource<T>.awaitData(name: Name): Data<T> { - return read(name) ?: updates.filter { it == name }.map { read(name) }.filterNotNull().first() -} +public suspend fun <T> ObservableDataSource<T>.awaitData(name: Name): Data<T> = + read(name) ?: updates.filter { it == name }.mapNotNull { read(name) }.first() public suspend fun <T> ObservableDataSource<T>.awaitData(name: String): Data<T> = awaitData(name.parseAsName()) diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataBuilder.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataBuilder.kt index 0c1fe0b9..54bcf19c 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataBuilder.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataBuilder.kt @@ -11,13 +11,13 @@ import kotlin.reflect.typeOf public fun interface StaticDataBuilder<T> : DataBuilderScope<T> { - public fun put(name: Name, data: Data<T>) + public fun data(name: Name, data: Data<T>) } private class DataMapBuilder<T> : StaticDataBuilder<T> { val map = mutableMapOf<Name, Data<T>>() - override fun put(name: Name, data: Data<T>) { + override fun data(name: Name, data: Data<T>) { if (map.containsKey(name)) { error("Duplicate key '$name'") } else { @@ -26,31 +26,31 @@ private class DataMapBuilder<T> : StaticDataBuilder<T> { } } -public fun <T> StaticDataBuilder<T>.put(name: String, data: Data<T>) { - put(name.parseAsName(), data) +public fun <T> StaticDataBuilder<T>.data(name: String, data: Data<T>) { + data(name.parseAsName(), data) } -public inline fun <T, reified T1 : T> StaticDataBuilder<T>.putValue( +public inline fun <T, reified T1 : T> StaticDataBuilder<T>.value( name: String, value: T1, metaBuilder: MutableMeta.() -> Unit = {} ) { - put(name, Data(value, Meta(metaBuilder))) + data(name, Data(value, Meta(metaBuilder))) } -public fun <T> StaticDataBuilder<T>.putAll(prefix: Name, block: StaticDataBuilder<T>.() -> Unit) { +public fun <T> StaticDataBuilder<T>.node(prefix: Name, block: StaticDataBuilder<T>.() -> Unit) { val map = DataMapBuilder<T>().apply(block).map map.forEach { (name, data) -> - put(prefix + name, data) + data(prefix + name, data) } } -public fun <T> StaticDataBuilder<T>.putAll(prefix: String, block: StaticDataBuilder<T>.() -> Unit) = - putAll(prefix.parseAsName(), block) +public fun <T> StaticDataBuilder<T>.node(prefix: String, block: StaticDataBuilder<T>.() -> Unit) = + node(prefix.parseAsName(), block) -public fun <T> StaticDataBuilder<T>.putAll(prefix: String, tree: DataTree<T>) { +public fun <T> StaticDataBuilder<T>.node(prefix: String, tree: DataTree<T>) { tree.forEach { data -> - put(prefix + data.name, data) + data(prefix.parseAsName() + data.name, data) } } diff --git a/dataforge-data/src/commonTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt b/dataforge-data/src/commonTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt index 1d4d2ea4..6f1a7ed1 100644 --- a/dataforge-data/src/commonTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt +++ b/dataforge-data/src/commonTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt @@ -13,12 +13,12 @@ internal class DataTreeBuilderTest { @Test fun testTreeBuild() = runTest(timeout = 500.milliseconds) { val node = DataTree.static<Any> { - putAll("primary") { - putValue("a", "a") - putValue("b", "b") + node("primary") { + value("a", "a") + value("b", "b") } - putValue("c.d", "c.d") - putValue("c.f", "c.f") + value("c.d", "c.d") + value("c.f", "c.f") } assertEquals("a", node["primary.a"]?.await()) assertEquals("b", node["primary.b"]?.await()) @@ -30,17 +30,17 @@ internal class DataTreeBuilderTest { @Test fun testDataUpdate() = runTest(timeout = 500.milliseconds) { val updateData = DataTree.static<Any> { - put("a", Data.wrapValue("a")) - put("b", Data.wrapValue("b")) + data("a", Data.wrapValue("a")) + data("b", Data.wrapValue("b")) } val node = DataTree.static<Any> { - putAll("primary") { - putValue("a", "a") - putValue("b", "b") + node("primary") { + value("a", "a") + value("b", "b") } - putValue("root", "root") - putAll("update", updateData) + value("root", "root") + node("update", updateData) } assertEquals("a", node["update.a"]?.await()) @@ -54,7 +54,9 @@ internal class DataTreeBuilderTest { val subNode = MutableDataTree<Int>() val rootNode = MutableDataTree<Int>() { - job = launch { putAllAndWatch(subNode, "sub".asName()) } + job = launch { + putAllAndWatch(subNode, "sub".asName()) + } } repeat(10) { diff --git a/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt b/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt index 6828b674..1789237e 100644 --- a/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt +++ b/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt @@ -1,7 +1,6 @@ package space.kscience.dataforge.data import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.advanceUntilIdle import kotlinx.coroutines.test.runTest import space.kscience.dataforge.actions.Action import space.kscience.dataforge.actions.invoke @@ -21,14 +20,13 @@ internal class ActionsTest { val data: DataTree<Int> = DataTree.static { repeat(10) { - putValue(it.toString(), it) + value(it.toString(), it) } } val result = plusOne(data) - advanceUntilIdle() - assertEquals(2, result["1"]?.await()) + assertEquals(2, result.awaitData("1").await()) } @Test diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt index f1e9130a..06134ce6 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt @@ -62,7 +62,7 @@ public interface TaskWithSpec<T, C : Any> : Task<T> { // block: C.() -> Unit = {}, //): TaskResult<T> = execute(workspace, taskName, spec(block)) -public class TaskResultScope<T>( +public class TaskResultScope<in T>( public val resultType: KType, public val workspace: Workspace, public val taskName: Name, diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskBuilders.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskResultBuilders.kt similarity index 87% rename from dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskBuilders.kt rename to dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskResultBuilders.kt index 49b485e5..9df49aba 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskBuilders.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskResultBuilders.kt @@ -1,14 +1,13 @@ package space.kscience.dataforge.workspace import space.kscience.dataforge.context.PluginFactory -import space.kscience.dataforge.data.DataTree -import space.kscience.dataforge.data.NamedValueWithMeta -import space.kscience.dataforge.data.transformEach +import space.kscience.dataforge.data.* import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.MutableMeta import space.kscience.dataforge.meta.copy import space.kscience.dataforge.meta.remove import space.kscience.dataforge.misc.DFExperimental +import space.kscience.dataforge.misc.UnsafeKType import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.plus @@ -77,13 +76,15 @@ public val TaskResultScope<*>.allData: DataSelector<*> * @param dataMetaTransform additional transformation of individual data meta. * @param action process individual data asynchronously. */ +@OptIn(UnsafeKType::class) @DFExperimental -public suspend inline fun <T, reified R> TaskResultScope<R>.transformEach( +public suspend fun <T, R> TaskResultScope<R>.transformEach( selector: DataSelector<T>, dependencyMeta: Meta = defaultDependencyMeta, - crossinline dataMetaTransform: MutableMeta.(name: Name) -> Unit = {}, - crossinline action: suspend (NamedValueWithMeta<T>) -> R, + dataMetaTransform: MutableMeta.(name: Name) -> Unit = {}, + action: suspend NamedValueWithMeta<T>.() -> R, ): DataTree<R> = from(selector, dependencyMeta).transformEach<T, R>( + resultType, workspace.context, metaTransform = { name -> taskMeta[taskName]?.let { taskName put it } @@ -93,6 +94,15 @@ public suspend inline fun <T, reified R> TaskResultScope<R>.transformEach( action(it) } +@OptIn(UnsafeKType::class) +public fun <R> TaskResultScope<R>.result(data: Data<R>): DataTree<R> = DataTree.static(resultType) { + data(Name.EMPTY, data) +} + +@OptIn(UnsafeKType::class) +public fun <R> TaskResultScope<R>.result(builder: StaticDataBuilder<R>.() -> Unit): DataTree<R> = + DataTree.static(resultType, builder) + ///** // * Set given [dataSet] as a task result. // */ diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/FileDataTree.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/FileDataTree.kt index c3cd3a0b..049ec2ec 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/FileDataTree.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/FileDataTree.kt @@ -68,10 +68,18 @@ public class FileDataTree( } path.isDirectory() -> { - val dataBinary: Binary? = path.resolve(IOPlugin.DATA_FILE_NAME)?.asBinary() - val meta: Meta? = path.find { it.fileName.startsWith(IOPlugin.META_FILE_NAME) }?.let { + //FIXME find data and meta in a single pass instead of two + + val dataBinary: Binary? = path.listDirectoryEntries().find { + it.fileName.nameWithoutExtension == IOPlugin.DATA_FILE_NAME + }?.asBinary() + + val meta: Meta? = path.listDirectoryEntries().find { + it.fileName.nameWithoutExtension == IOPlugin.META_FILE_NAME + }?.let { io.readMetaFileOrNull(it) } + if (dataBinary != null || meta != null) { StaticData( typeOf<Binary>(), @@ -156,6 +164,9 @@ public class FileDataTree( } } +public fun IOPlugin.readDirectory(path: Path, monitor: Boolean = false): FileDataTree = + FileDataTree(this, path, monitor) + ///** // * @param resources The names of the resources to read. diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt index 1c43fba0..eb705e56 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt @@ -3,7 +3,7 @@ package space.kscience.dataforge.workspace import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Test -import space.kscience.dataforge.data.putValue +import space.kscience.dataforge.data.value import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.boolean import space.kscience.dataforge.meta.get @@ -22,14 +22,14 @@ internal class CachingWorkspaceTest { data { //statically initialize data repeat(5) { - putValue("myData[$it]", it) + value("myData[$it]", it) } } inMemoryCache() val doFirst by task<Any> { - transformEach(allData) { (name, _, _) -> + transformEach(allData) { firstCounter++ println("Done first on $name with flag=${taskMeta["flag"].boolean}") } @@ -39,7 +39,7 @@ internal class CachingWorkspaceTest { transformEach( doFirst, dependencyMeta = if (taskMeta["flag"].boolean == true) taskMeta else Meta.EMPTY - ) { (name, _, _) -> + ) { secondCounter++ println("Done second on $name with flag=${taskMeta["flag"].boolean ?: false}") } diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt index cd38f809..9cb040be 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt @@ -20,14 +20,12 @@ class DataPropagationTestPlugin : WorkspacePlugin() { val result: Data<Int> = selectedData.foldToData(0) { result, data -> result + data.value } - put("result", result) + result(result) } val singleData by task<Int> { - workspace.data.filterByType<Int>()["myData[12]"]?.let { - put("result", it) - } + result(workspace.data.filterByType<Int>()["myData[12]"]!!) } @@ -47,7 +45,7 @@ class DataPropagationTest { } data { repeat(100) { - putValue("myData[$it]", it) + value("myData[$it]", it) } } } @@ -55,12 +53,12 @@ class DataPropagationTest { @Test fun testAllData() = runTest { val node = testWorkspace.produce("Test.allData") - assertEquals(4950, node.content.asSequence().single().await()) + assertEquals(4950, node.content.data?.await()) } @Test fun testSingleData() = runTest { val node = testWorkspace.produce("Test.singleData") - assertEquals(12, node.content.asSequence().single().await()) + assertEquals(12, node.content.data?.await()) } } \ No newline at end of file diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileDataTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileDataTest.kt index d9fa9ae4..c9e5ea5a 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileDataTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileDataTest.kt @@ -12,7 +12,6 @@ import space.kscience.dataforge.io.* import space.kscience.dataforge.io.yaml.YamlPlugin import space.kscience.dataforge.meta.get import space.kscience.dataforge.misc.DFExperimental -import space.kscience.dataforge.names.Name import java.nio.file.Files import kotlin.io.path.deleteExisting import kotlin.io.path.fileSize @@ -22,13 +21,13 @@ import kotlin.test.assertEquals class FileDataTest { - val dataNode = DataTree<String> { - putAll("dir") { - putValue("a", "Some string") { + val dataNode = DataTree.static<String> { + node("dir") { + value("a", "Some string") { "content" put "Some string" } } - putValue("b", "root data") + value("b", "root data") // meta { // "content" put "This is root meta node" // } @@ -51,10 +50,10 @@ class FileDataTest { val dir = Files.createTempDirectory("df_data_node") io.writeDataDirectory(dir, dataNode, StringIOFormat) println(dir.toUri().toString()) - val data = DataTree { - io.readAsDataTree(Name.EMPTY, dir) + val data = io.readDirectory(dir) + val reconstructed = data.transformEach(this) { (_, value) -> + value.toByteArray().decodeToString() } - val reconstructed = data.map { (_, value) -> value.toByteArray().decodeToString() } assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content")) assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await()) } @@ -68,8 +67,9 @@ class FileDataTest { zip.deleteExisting() io.writeZip(zip, dataNode, StringIOFormat) println(zip.toUri().toString()) - val reconstructed = DataTree { io.readAsDataTree(Name.EMPTY, zip) } - .map { (_, value) -> value.toByteArray().decodeToString() } + val reconstructed = io.readDirectory(zip).transformEach(this) { (_, value) -> + value.toByteArray().decodeToString() + } assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content")) assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await()) diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileWorkspaceCacheTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileWorkspaceCacheTest.kt index 7d07481c..7aa1fb0e 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileWorkspaceCacheTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileWorkspaceCacheTest.kt @@ -3,7 +3,7 @@ package space.kscience.dataforge.workspace import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Test -import space.kscience.dataforge.data.putValue +import space.kscience.dataforge.data.value import space.kscience.dataforge.misc.DFExperimental import java.nio.file.Files @@ -16,13 +16,13 @@ class FileWorkspaceCacheTest { data { //statically initialize data repeat(5) { - putValue("myData[$it]", it) + value("myData[$it]", it) } } fileCache(Files.createTempDirectory("dataforge-temporary-cache")) val echo by task<String> { - transformEach(dataByType<String>()) { arg, _, _ -> arg } + transformEach(dataByType<String>()) { value } } } diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt index 39837c15..111b3b89 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt @@ -37,9 +37,9 @@ internal object TestPlugin : WorkspacePlugin() { val test by task { // type is inferred - transformEach(dataByType<Int>()) { arg, _, _ -> - logger.info { "Test: $arg" } - arg + transformEach(dataByType<Int>()) { + logger.info { "Test: $value" } + value } } @@ -62,42 +62,42 @@ internal class SimpleWorkspaceTest { data { //statically initialize data repeat(100) { - putValue("myData[$it]", it) + value("myData[$it]", it) } } val filterOne by task<Int> { val name by taskMeta.string { error("Name field not defined") } - from(testPluginFactory) { test }[name]?.let { source: Data<Int> -> - put(name, source) - } + result(from(testPluginFactory) { test }[name]!!) } val square by task<Int> { - transformEach(dataByType<Int>()) { arg, name, meta -> + transformEach(dataByType<Int>()) { if (meta["testFlag"].boolean == true) { println("Side effect") } workspace.logger.info { "Starting square on $name" } - arg * arg + value * value } } val linear by task<Int> { - transformEach(dataByType<Int>()) { arg, name, _ -> + transformEach(dataByType<Int>()) { workspace.logger.info { "Starting linear on $name" } - arg * 2 + 1 + value * 2 + 1 } } val fullSquare by task<Int> { val squareData = from(square) val linearData = from(linear) - squareData.forEach { data -> - val newData: Data<Int> = data.combine(linearData[data.name]!!) { l, r -> - l + r + result { + squareData.forEach { data -> + val newData: Data<Int> = data.combine(linearData[data.name]!!) { l, r -> + l + r + } + data(data.name, newData) } - put(data.name, newData) } } @@ -106,7 +106,7 @@ internal class SimpleWorkspaceTest { val res = from(square).foldToData(0) { l, r -> l + r.value } - put("sum", res) + result(res) } val averageByGroup by task<Int> { @@ -116,13 +116,15 @@ internal class SimpleWorkspaceTest { l + r.value } - put("even", evenSum) val oddSum = workspace.data.filterByType<Int> { name, _, _ -> name.toString().toInt() % 2 == 1 }.foldToData(0) { l, r -> l + r.value } - put("odd", oddSum) + result { + data("even", evenSum) + data("odd", oddSum) + } } val delta by task<Int> { @@ -132,15 +134,17 @@ internal class SimpleWorkspaceTest { val res = even.combine(odd) { l, r -> l - r } - put("res", res) + result(res) } val customPipe by task<Int> { - workspace.data.filterByType<Int>().forEach { data -> - val meta = data.meta.toMutableMeta().apply { - "newValue" put 22 + result { + workspace.data.filterByType<Int>().forEach { data -> + val meta = data.meta.toMutableMeta().apply { + "newValue" put 22 + } + data(data.name + "new", data.transform { (data.meta["value"].int ?: 0) + it }) } - put(data.name + "new", data.transform { (data.meta["value"].int ?: 0) + it }) } } @@ -157,7 +161,7 @@ internal class SimpleWorkspaceTest { @Test fun testMetaPropagation() = runTest(timeout = 100.milliseconds) { val node = workspace.produce("sum") { "testFlag" put true } - val res = node["sum"]!!.await() + val res = node.data?.await() } @Test @@ -175,7 +179,7 @@ internal class SimpleWorkspaceTest { """ Name: ${it.name} Meta: ${it.meta} - Data: ${it.data.await()} + Data: ${it.await()} """.trimIndent() ) }