Fix implementation of MutableDataTree.

All tests pass
This commit is contained in:
Alexander Nozik 2025-01-02 10:09:00 +03:00
parent 6634ece349
commit 531f95d55f
13 changed files with 162 additions and 121 deletions
dataforge-data/src
commonMain/kotlin/space/kscience/dataforge/data
commonTest/kotlin/space/kscience/dataforge/data
jvmTest/kotlin/space/kscience/dataforge/data
dataforge-workspace/src
commonMain/kotlin/space/kscience/dataforge/workspace
jvmMain/kotlin/space/kscience/dataforge/workspace
jvmTest/kotlin/space/kscience/dataforge/workspace

@ -8,6 +8,9 @@ import space.kscience.dataforge.names.*
import kotlin.reflect.KType
import kotlin.reflect.typeOf
/**
* A marker scope for data builders
*/
public interface DataBuilderScope<in T> {
public companion object : DataBuilderScope<Nothing>
}
@ -30,21 +33,19 @@ public fun interface DataSink<in T> : DataBuilderScope<T> {
* A mutable version of [DataTree]
*/
public interface MutableDataTree<T> : DataTree<T>, DataSink<T> {
override var data: Data<T>?
override val items: Map<NameToken, MutableDataTree<T>>
public fun getOrCreateItem(token: NameToken): MutableDataTree<T>
public suspend fun put(token: NameToken, data: Data<T>?)
override suspend fun put(name: Name, data: Data<T>?): Unit {
when (name.length) {
0 -> this.data = data
1 -> put(name.first(), data)
else -> getOrCreateItem(name.first()).put(name.cutFirst(), data)
}
}
//
// public fun getOrCreateItem(token: NameToken): MutableDataTree<T>
//
// public suspend fun put(token: NameToken, data: Data<T>?)
//
// override suspend fun put(name: Name, data: Data<T>?): Unit {
// when (name.length) {
// 0 -> this.data = data
// 1 -> put(name.first(), data)
// else -> getOrCreateItem(name.first()).put(name.cutFirst(), data)
// }
// }
}
/**
@ -62,11 +63,12 @@ private class MutableDataTreeRoot<T>(
) : MutableDataTree<T> {
override val items = HashMap<NameToken, MutableDataTree<T>>()
override val updates = MutableSharedFlow<Name>(extraBufferCapacity = 100)
override val updates = MutableSharedFlow<Name>()
inner class MutableDataTreeBranch(val branchName: Name) : MutableDataTree<T> {
override var data: Data<T>? = null
private set
override val items = HashMap<NameToken, MutableDataTree<T>>()
@ -75,26 +77,43 @@ private class MutableDataTreeRoot<T>(
}
override val dataType: KType get() = this@MutableDataTreeRoot.dataType
override suspend fun put(
name: Name,
data: Data<T>?
) {
when (name.length) {
0 -> {
this.data = data
this@MutableDataTreeRoot.updates.emit(branchName)
}
override fun getOrCreateItem(token: NameToken): MutableDataTree<T> =
items.getOrPut(token) { MutableDataTreeBranch(branchName + token) }
else -> {
val token = name.first()
items.getOrPut(token) { MutableDataTreeBranch(branchName + token) }.put(name.cutFirst(), data)
}
}
}
}
override var data: Data<T>? = null
private set
override suspend fun put(token: NameToken, data: Data<T>?) {
this.data = data
this@MutableDataTreeRoot.updates.emit(branchName + token)
override suspend fun put(
name: Name,
data: Data<T>?
) {
when (name.length) {
0 -> {
this.data = data
this@MutableDataTreeRoot.updates.emit(Name.EMPTY)
}
else -> {
val token = name.first()
items.getOrPut(token) { MutableDataTreeBranch(token.asName()) }.put(name.cutFirst(), data)
}
}
}
override var data: Data<T>? = null
override fun getOrCreateItem(token: NameToken): MutableDataTree<T> = items.getOrPut(token) {
MutableDataTreeBranch(token.asName())
}
override suspend fun put(token: NameToken, data: Data<T>?) {
this.data = data
updates.emit(token.asName())
}
}
/**
@ -106,7 +125,7 @@ public fun <T> MutableDataTree(
): MutableDataTree<T> = MutableDataTreeRoot<T>(type)
/**
* Create and initialize a observable mutable data tree.
* Create and initialize an observable mutable data tree.
*/
@OptIn(UnsafeKType::class)
public inline fun <reified T> MutableDataTree(

@ -38,9 +38,8 @@ public interface ObservableDataSource<out T> : DataSource<T> {
public val updates: Flow<Name>
}
public suspend fun <T> ObservableDataSource<T>.awaitData(name: Name): Data<T> {
return read(name) ?: updates.filter { it == name }.map { read(name) }.filterNotNull().first()
}
public suspend fun <T> ObservableDataSource<T>.awaitData(name: Name): Data<T> =
read(name) ?: updates.filter { it == name }.mapNotNull { read(name) }.first()
public suspend fun <T> ObservableDataSource<T>.awaitData(name: String): Data<T> =
awaitData(name.parseAsName())

@ -11,13 +11,13 @@ import kotlin.reflect.typeOf
public fun interface StaticDataBuilder<T> : DataBuilderScope<T> {
public fun put(name: Name, data: Data<T>)
public fun data(name: Name, data: Data<T>)
}
private class DataMapBuilder<T> : StaticDataBuilder<T> {
val map = mutableMapOf<Name, Data<T>>()
override fun put(name: Name, data: Data<T>) {
override fun data(name: Name, data: Data<T>) {
if (map.containsKey(name)) {
error("Duplicate key '$name'")
} else {
@ -26,31 +26,31 @@ private class DataMapBuilder<T> : StaticDataBuilder<T> {
}
}
public fun <T> StaticDataBuilder<T>.put(name: String, data: Data<T>) {
put(name.parseAsName(), data)
public fun <T> StaticDataBuilder<T>.data(name: String, data: Data<T>) {
data(name.parseAsName(), data)
}
public inline fun <T, reified T1 : T> StaticDataBuilder<T>.putValue(
public inline fun <T, reified T1 : T> StaticDataBuilder<T>.value(
name: String,
value: T1,
metaBuilder: MutableMeta.() -> Unit = {}
) {
put(name, Data(value, Meta(metaBuilder)))
data(name, Data(value, Meta(metaBuilder)))
}
public fun <T> StaticDataBuilder<T>.putAll(prefix: Name, block: StaticDataBuilder<T>.() -> Unit) {
public fun <T> StaticDataBuilder<T>.node(prefix: Name, block: StaticDataBuilder<T>.() -> Unit) {
val map = DataMapBuilder<T>().apply(block).map
map.forEach { (name, data) ->
put(prefix + name, data)
data(prefix + name, data)
}
}
public fun <T> StaticDataBuilder<T>.putAll(prefix: String, block: StaticDataBuilder<T>.() -> Unit) =
putAll(prefix.parseAsName(), block)
public fun <T> StaticDataBuilder<T>.node(prefix: String, block: StaticDataBuilder<T>.() -> Unit) =
node(prefix.parseAsName(), block)
public fun <T> StaticDataBuilder<T>.putAll(prefix: String, tree: DataTree<T>) {
public fun <T> StaticDataBuilder<T>.node(prefix: String, tree: DataTree<T>) {
tree.forEach { data ->
put(prefix + data.name, data)
data(prefix.parseAsName() + data.name, data)
}
}

@ -13,12 +13,12 @@ internal class DataTreeBuilderTest {
@Test
fun testTreeBuild() = runTest(timeout = 500.milliseconds) {
val node = DataTree.static<Any> {
putAll("primary") {
putValue("a", "a")
putValue("b", "b")
node("primary") {
value("a", "a")
value("b", "b")
}
putValue("c.d", "c.d")
putValue("c.f", "c.f")
value("c.d", "c.d")
value("c.f", "c.f")
}
assertEquals("a", node["primary.a"]?.await())
assertEquals("b", node["primary.b"]?.await())
@ -30,17 +30,17 @@ internal class DataTreeBuilderTest {
@Test
fun testDataUpdate() = runTest(timeout = 500.milliseconds) {
val updateData = DataTree.static<Any> {
put("a", Data.wrapValue("a"))
put("b", Data.wrapValue("b"))
data("a", Data.wrapValue("a"))
data("b", Data.wrapValue("b"))
}
val node = DataTree.static<Any> {
putAll("primary") {
putValue("a", "a")
putValue("b", "b")
node("primary") {
value("a", "a")
value("b", "b")
}
putValue("root", "root")
putAll("update", updateData)
value("root", "root")
node("update", updateData)
}
assertEquals("a", node["update.a"]?.await())
@ -54,7 +54,9 @@ internal class DataTreeBuilderTest {
val subNode = MutableDataTree<Int>()
val rootNode = MutableDataTree<Int>() {
job = launch { putAllAndWatch(subNode, "sub".asName()) }
job = launch {
putAllAndWatch(subNode, "sub".asName())
}
}
repeat(10) {

@ -1,7 +1,6 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import space.kscience.dataforge.actions.Action
import space.kscience.dataforge.actions.invoke
@ -21,14 +20,13 @@ internal class ActionsTest {
val data: DataTree<Int> = DataTree.static {
repeat(10) {
putValue(it.toString(), it)
value(it.toString(), it)
}
}
val result = plusOne(data)
advanceUntilIdle()
assertEquals(2, result["1"]?.await())
assertEquals(2, result.awaitData("1").await())
}
@Test

@ -62,7 +62,7 @@ public interface TaskWithSpec<T, C : Any> : Task<T> {
// block: C.() -> Unit = {},
//): TaskResult<T> = execute(workspace, taskName, spec(block))
public class TaskResultScope<T>(
public class TaskResultScope<in T>(
public val resultType: KType,
public val workspace: Workspace,
public val taskName: Name,

@ -1,14 +1,13 @@
package space.kscience.dataforge.workspace
import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.NamedValueWithMeta
import space.kscience.dataforge.data.transformEach
import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.meta.copy
import space.kscience.dataforge.meta.remove
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.misc.UnsafeKType
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.plus
@ -77,13 +76,15 @@ public val TaskResultScope<*>.allData: DataSelector<*>
* @param dataMetaTransform additional transformation of individual data meta.
* @param action process individual data asynchronously.
*/
@OptIn(UnsafeKType::class)
@DFExperimental
public suspend inline fun <T, reified R> TaskResultScope<R>.transformEach(
public suspend fun <T, R> TaskResultScope<R>.transformEach(
selector: DataSelector<T>,
dependencyMeta: Meta = defaultDependencyMeta,
crossinline dataMetaTransform: MutableMeta.(name: Name) -> Unit = {},
crossinline action: suspend (NamedValueWithMeta<T>) -> R,
dataMetaTransform: MutableMeta.(name: Name) -> Unit = {},
action: suspend NamedValueWithMeta<T>.() -> R,
): DataTree<R> = from(selector, dependencyMeta).transformEach<T, R>(
resultType,
workspace.context,
metaTransform = { name ->
taskMeta[taskName]?.let { taskName put it }
@ -93,6 +94,15 @@ public suspend inline fun <T, reified R> TaskResultScope<R>.transformEach(
action(it)
}
@OptIn(UnsafeKType::class)
public fun <R> TaskResultScope<R>.result(data: Data<R>): DataTree<R> = DataTree.static(resultType) {
data(Name.EMPTY, data)
}
@OptIn(UnsafeKType::class)
public fun <R> TaskResultScope<R>.result(builder: StaticDataBuilder<R>.() -> Unit): DataTree<R> =
DataTree.static(resultType, builder)
///**
// * Set given [dataSet] as a task result.
// */

@ -68,10 +68,18 @@ public class FileDataTree(
}
path.isDirectory() -> {
val dataBinary: Binary? = path.resolve(IOPlugin.DATA_FILE_NAME)?.asBinary()
val meta: Meta? = path.find { it.fileName.startsWith(IOPlugin.META_FILE_NAME) }?.let {
//FIXME find data and meta in a single pass instead of two
val dataBinary: Binary? = path.listDirectoryEntries().find {
it.fileName.nameWithoutExtension == IOPlugin.DATA_FILE_NAME
}?.asBinary()
val meta: Meta? = path.listDirectoryEntries().find {
it.fileName.nameWithoutExtension == IOPlugin.META_FILE_NAME
}?.let {
io.readMetaFileOrNull(it)
}
if (dataBinary != null || meta != null) {
StaticData(
typeOf<Binary>(),
@ -156,6 +164,9 @@ public class FileDataTree(
}
}
public fun IOPlugin.readDirectory(path: Path, monitor: Boolean = false): FileDataTree =
FileDataTree(this, path, monitor)
///**
// * @param resources The names of the resources to read.

@ -3,7 +3,7 @@ package space.kscience.dataforge.workspace
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test
import space.kscience.dataforge.data.putValue
import space.kscience.dataforge.data.value
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.boolean
import space.kscience.dataforge.meta.get
@ -22,14 +22,14 @@ internal class CachingWorkspaceTest {
data {
//statically initialize data
repeat(5) {
putValue("myData[$it]", it)
value("myData[$it]", it)
}
}
inMemoryCache()
val doFirst by task<Any> {
transformEach(allData) { (name, _, _) ->
transformEach(allData) {
firstCounter++
println("Done first on $name with flag=${taskMeta["flag"].boolean}")
}
@ -39,7 +39,7 @@ internal class CachingWorkspaceTest {
transformEach(
doFirst,
dependencyMeta = if (taskMeta["flag"].boolean == true) taskMeta else Meta.EMPTY
) { (name, _, _) ->
) {
secondCounter++
println("Done second on $name with flag=${taskMeta["flag"].boolean ?: false}")
}

@ -20,14 +20,12 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
val result: Data<Int> = selectedData.foldToData(0) { result, data ->
result + data.value
}
put("result", result)
result(result)
}
val singleData by task<Int> {
workspace.data.filterByType<Int>()["myData[12]"]?.let {
put("result", it)
}
result(workspace.data.filterByType<Int>()["myData[12]"]!!)
}
@ -47,7 +45,7 @@ class DataPropagationTest {
}
data {
repeat(100) {
putValue("myData[$it]", it)
value("myData[$it]", it)
}
}
}
@ -55,12 +53,12 @@ class DataPropagationTest {
@Test
fun testAllData() = runTest {
val node = testWorkspace.produce("Test.allData")
assertEquals(4950, node.content.asSequence().single().await())
assertEquals(4950, node.content.data?.await())
}
@Test
fun testSingleData() = runTest {
val node = testWorkspace.produce("Test.singleData")
assertEquals(12, node.content.asSequence().single().await())
assertEquals(12, node.content.data?.await())
}
}

@ -12,7 +12,6 @@ import space.kscience.dataforge.io.*
import space.kscience.dataforge.io.yaml.YamlPlugin
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import java.nio.file.Files
import kotlin.io.path.deleteExisting
import kotlin.io.path.fileSize
@ -22,13 +21,13 @@ import kotlin.test.assertEquals
class FileDataTest {
val dataNode = DataTree<String> {
putAll("dir") {
putValue("a", "Some string") {
val dataNode = DataTree.static<String> {
node("dir") {
value("a", "Some string") {
"content" put "Some string"
}
}
putValue("b", "root data")
value("b", "root data")
// meta {
// "content" put "This is root meta node"
// }
@ -51,10 +50,10 @@ class FileDataTest {
val dir = Files.createTempDirectory("df_data_node")
io.writeDataDirectory(dir, dataNode, StringIOFormat)
println(dir.toUri().toString())
val data = DataTree {
io.readAsDataTree(Name.EMPTY, dir)
val data = io.readDirectory(dir)
val reconstructed = data.transformEach(this) { (_, value) ->
value.toByteArray().decodeToString()
}
val reconstructed = data.map { (_, value) -> value.toByteArray().decodeToString() }
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await())
}
@ -68,8 +67,9 @@ class FileDataTest {
zip.deleteExisting()
io.writeZip(zip, dataNode, StringIOFormat)
println(zip.toUri().toString())
val reconstructed = DataTree { io.readAsDataTree(Name.EMPTY, zip) }
.map { (_, value) -> value.toByteArray().decodeToString() }
val reconstructed = io.readDirectory(zip).transformEach(this) { (_, value) ->
value.toByteArray().decodeToString()
}
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await())

@ -3,7 +3,7 @@ package space.kscience.dataforge.workspace
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test
import space.kscience.dataforge.data.putValue
import space.kscience.dataforge.data.value
import space.kscience.dataforge.misc.DFExperimental
import java.nio.file.Files
@ -16,13 +16,13 @@ class FileWorkspaceCacheTest {
data {
//statically initialize data
repeat(5) {
putValue("myData[$it]", it)
value("myData[$it]", it)
}
}
fileCache(Files.createTempDirectory("dataforge-temporary-cache"))
val echo by task<String> {
transformEach(dataByType<String>()) { arg, _, _ -> arg }
transformEach(dataByType<String>()) { value }
}
}

@ -37,9 +37,9 @@ internal object TestPlugin : WorkspacePlugin() {
val test by task {
// type is inferred
transformEach(dataByType<Int>()) { arg, _, _ ->
logger.info { "Test: $arg" }
arg
transformEach(dataByType<Int>()) {
logger.info { "Test: $value" }
value
}
}
@ -62,42 +62,42 @@ internal class SimpleWorkspaceTest {
data {
//statically initialize data
repeat(100) {
putValue("myData[$it]", it)
value("myData[$it]", it)
}
}
val filterOne by task<Int> {
val name by taskMeta.string { error("Name field not defined") }
from(testPluginFactory) { test }[name]?.let { source: Data<Int> ->
put(name, source)
}
result(from(testPluginFactory) { test }[name]!!)
}
val square by task<Int> {
transformEach(dataByType<Int>()) { arg, name, meta ->
transformEach(dataByType<Int>()) {
if (meta["testFlag"].boolean == true) {
println("Side effect")
}
workspace.logger.info { "Starting square on $name" }
arg * arg
value * value
}
}
val linear by task<Int> {
transformEach(dataByType<Int>()) { arg, name, _ ->
transformEach(dataByType<Int>()) {
workspace.logger.info { "Starting linear on $name" }
arg * 2 + 1
value * 2 + 1
}
}
val fullSquare by task<Int> {
val squareData = from(square)
val linearData = from(linear)
squareData.forEach { data ->
val newData: Data<Int> = data.combine(linearData[data.name]!!) { l, r ->
l + r
result {
squareData.forEach { data ->
val newData: Data<Int> = data.combine(linearData[data.name]!!) { l, r ->
l + r
}
data(data.name, newData)
}
put(data.name, newData)
}
}
@ -106,7 +106,7 @@ internal class SimpleWorkspaceTest {
val res = from(square).foldToData(0) { l, r ->
l + r.value
}
put("sum", res)
result(res)
}
val averageByGroup by task<Int> {
@ -116,13 +116,15 @@ internal class SimpleWorkspaceTest {
l + r.value
}
put("even", evenSum)
val oddSum = workspace.data.filterByType<Int> { name, _, _ ->
name.toString().toInt() % 2 == 1
}.foldToData(0) { l, r ->
l + r.value
}
put("odd", oddSum)
result {
data("even", evenSum)
data("odd", oddSum)
}
}
val delta by task<Int> {
@ -132,15 +134,17 @@ internal class SimpleWorkspaceTest {
val res = even.combine(odd) { l, r ->
l - r
}
put("res", res)
result(res)
}
val customPipe by task<Int> {
workspace.data.filterByType<Int>().forEach { data ->
val meta = data.meta.toMutableMeta().apply {
"newValue" put 22
result {
workspace.data.filterByType<Int>().forEach { data ->
val meta = data.meta.toMutableMeta().apply {
"newValue" put 22
}
data(data.name + "new", data.transform { (data.meta["value"].int ?: 0) + it })
}
put(data.name + "new", data.transform { (data.meta["value"].int ?: 0) + it })
}
}
@ -157,7 +161,7 @@ internal class SimpleWorkspaceTest {
@Test
fun testMetaPropagation() = runTest(timeout = 100.milliseconds) {
val node = workspace.produce("sum") { "testFlag" put true }
val res = node["sum"]!!.await()
val res = node.data?.await()
}
@Test
@ -175,7 +179,7 @@ internal class SimpleWorkspaceTest {
"""
Name: ${it.name}
Meta: ${it.meta}
Data: ${it.data.await()}
Data: ${it.await()}
""".trimIndent()
)
}