[WIP] redo DataSet fill and updates

This commit is contained in:
Alexander Nozik 2021-01-16 12:44:39 +03:00
parent 3334380693
commit 7e4d1af55f
21 changed files with 301 additions and 200 deletions

View File

@ -5,7 +5,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
/** /**
* A simple data transformation on a data node. Actions should avoid doing actual dependency evaluation in [run]. * A simple data transformation on a data node. Actions should avoid doing actual dependency evaluation in [execute].
*/ */
public interface Action<in T : Any, out R : Any> { public interface Action<in T : Any, out R : Any> {
/** /**
@ -14,7 +14,7 @@ public interface Action<in T : Any, out R : Any> {
* *
* [scope] context used to compute the initial result, also it is used for updates propagation * [scope] context used to compute the initial result, also it is used for updates propagation
*/ */
public suspend fun run(set: DataSet<T>, meta: Meta, scope: CoroutineScope): DataSet<R> public suspend fun execute(dataSet: DataSet<T>, meta: Meta = Meta.EMPTY, scope: CoroutineScope? = null): DataSet<R>
public companion object public companion object
} }
@ -25,8 +25,8 @@ public interface Action<in T : Any, out R : Any> {
public infix fun <T : Any, I : Any, R : Any> Action<T, I>.then(action: Action<I, R>): Action<T, R> { public infix fun <T : Any, I : Any, R : Any> Action<T, I>.then(action: Action<I, R>): Action<T, R> {
// TODO introduce composite action and add optimize by adding action to the list // TODO introduce composite action and add optimize by adding action to the list
return object : Action<T, R> { return object : Action<T, R> {
override suspend fun run(set: DataSet<T>, meta: Meta, scope: CoroutineScope): DataSet<R> { override suspend fun execute(dataSet: DataSet<T>, meta: Meta, scope: CoroutineScope?): DataSet<R> {
return action.run(this@then.run(set, meta, scope), meta, scope) return action.execute(this@then.execute(dataSet, meta, scope), meta, scope)
} }
} }
} }

View File

@ -4,6 +4,7 @@ import hep.dataforge.meta.Meta
import hep.dataforge.names.Name import hep.dataforge.names.Name
import hep.dataforge.names.startsWith import hep.dataforge.names.startsWith
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlin.reflect.KClass import kotlin.reflect.KClass
@ -29,20 +30,22 @@ public abstract class CachingAction<in T : Any, out R : Any>(
key: Name = Name.EMPTY, key: Name = Name.EMPTY,
): Flow<NamedData<R>> ): Flow<NamedData<R>>
override suspend fun run( override suspend fun execute(
set: DataSet<T>, dataSet: DataSet<T>,
meta: Meta, meta: Meta,
scope: CoroutineScope, scope: CoroutineScope?,
): DataSet<R> = DataTree.dynamic(outputType,scope) { ): DataSet<R> = DataTree.dynamic(outputType) {
collectFrom(scope.transform(set, meta)) coroutineScope {
scope.let { collectFrom(transform(dataSet, meta))
set.updates.collect { }
scope?.let {
dataSet.updates.collect {
//clear old nodes //clear old nodes
remove(it) remove(it)
//collect new items //collect new items
collectFrom(scope.transform(set, meta, it)) collectFrom(scope.transform(dataSet, meta, it))
//FIXME if the target is data, updates are fired twice //FIXME if the target is data, updates are fired twice
} }
} }
} }
} }

View File

@ -44,13 +44,15 @@ public interface Data<out T : Any> : Goal<T>, MetaRepr {
/** /**
* An empty data containing only meta * An empty data containing only meta
*/ */
public fun <T> empty(meta: Meta): Data<Nothing> = object : Data<Nothing> { public fun empty(meta: Meta): Data<Nothing> = object : Data<Nothing> {
override val type: KClass<out Nothing> = Nothing::class override val type: KClass<out Nothing> = Nothing::class
override val meta: Meta = meta override val meta: Meta = meta
override val dependencies: Collection<Goal<*>> = emptyList() override val dependencies: Collection<Goal<*>> = emptyList()
override val deferred: Deferred<Nothing> get() = GlobalScope.async(start = CoroutineStart.LAZY) { override val deferred: Deferred<Nothing>
error("The Data is empty and could not be computed") get() = GlobalScope.async(start = CoroutineStart.LAZY) {
} error("The Data is empty and could not be computed")
}
override fun async(coroutineScope: CoroutineScope): Deferred<Nothing> = deferred override fun async(coroutineScope: CoroutineScope): Deferred<Nothing> = deferred
override fun reset() {} override fun reset() {}
} }
@ -92,7 +94,18 @@ public inline fun <reified T : Any> Data(
public class NamedData<out T : Any> internal constructor( public class NamedData<out T : Any> internal constructor(
override val name: Name, override val name: Name,
public val data: Data<T>, public val data: Data<T>,
) : Data<T> by data, Named ) : Data<T> by data, Named {
override fun toString(): String = buildString {
append("NamedData(name=\"$name\"")
if(data is StaticData){
append(", value=${data.value}")
}
if(!data.meta.isEmpty()){
append(", meta=${data.meta}")
}
append(")")
}
}
public fun <T : Any> Data<T>.named(name: Name): NamedData<T> = if (this is NamedData) { public fun <T : Any> Data<T>.named(name: Name): NamedData<T> = if (this is NamedData) {
NamedData(name, this.data) NamedData(name, this.data)

View File

@ -76,4 +76,6 @@ public suspend fun DataSet<*>.toMeta(): Meta = Meta {
} }
} }
} }
} }
public val <T : Any> DataSet<T>.updatesWithData: Flow<NamedData<T>> get() = updates.mapNotNull { getData(it)?.named(it) }

View File

@ -4,50 +4,81 @@ import hep.dataforge.meta.DFExperimental
import hep.dataforge.meta.Meta import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder import hep.dataforge.meta.MetaBuilder
import hep.dataforge.names.Name import hep.dataforge.names.Name
import hep.dataforge.names.plus
import hep.dataforge.names.toName import hep.dataforge.names.toName
import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
public interface DataSetBuilder<in T : Any> { public interface DataSetBuilder<in T : Any> {
public fun remove(name: Name) /**
* Remove all data items starting with [name]
*/
public suspend fun remove(name: Name)
public operator fun set(name: Name, data: Data<T>?) public suspend fun set(name: Name, data: Data<T>?)
public suspend fun set(name: Name, dataSet: DataSet<T>) /**
* Set a current state of given [dataSet] into a branch [name]. Does not propagate updates
public operator fun set(name: Name, block: DataSetBuilder<T>.() -> Unit) */
public suspend fun set(name: Name, dataSet: DataSet<T>){
//remove previous items
remove(name)
//Set new items
dataSet.flow().collect {
set(name + it.name, it.data)
}
}
/** /**
* Append data to node * Append data to node
*/ */
public infix fun String.put(data: Data<T>): Unit = set(toName(), data) public suspend infix fun String.put(data: Data<T>): Unit = set(toName(), data)
/** /**
* Append node * Append node
*/ */
public suspend infix fun String.put(tree: DataSet<T>): Unit = set(toName(), tree) public suspend infix fun String.put(dataSet: DataSet<T>): Unit = set(toName(), dataSet)
/** /**
* Build and append node * Build and append node
*/ */
public infix fun String.put(block: DataSetBuilder<T>.() -> Unit): Unit = set(toName(), block) public suspend infix fun String.put(block: suspend DataSetBuilder<T>.() -> Unit): Unit = set(toName(), block)
}
private class SubSetBuilder<in T : Any>(private val parent: DataSetBuilder<T>, private val branch: Name) :
DataSetBuilder<T> {
override suspend fun remove(name: Name) {
parent.remove(branch + name)
}
override suspend fun set(name: Name, data: Data<T>?) {
parent.set(branch + name, data)
}
override suspend fun set(name: Name, dataSet: DataSet<T>) {
parent.set(branch + name, dataSet)
}
}
public suspend fun <T : Any> DataSetBuilder<T>.set(name: Name, block: suspend DataSetBuilder<T>.() -> Unit){
SubSetBuilder(this,name).apply { block() }
} }
public operator fun <T : Any> DataSetBuilder<T>.set(name: String, data: Data<T>) { public suspend fun <T : Any> DataSetBuilder<T>.set(name: String, data: Data<T>) {
this@set[name.toName()] = data set(name.toName(), data)
} }
public fun <T : Any> DataSetBuilder<T>.data(name: Name, data: T, meta: Meta = Meta.EMPTY) { public suspend fun <T : Any> DataSetBuilder<T>.data(name: Name, data: T, meta: Meta = Meta.EMPTY) {
set(name, Data.static(data, meta)) set(name, Data.static(data, meta))
} }
public fun <T : Any> DataSetBuilder<T>.data(name: Name, data: T, block: MetaBuilder.() -> Unit = {}) { public suspend fun <T : Any> DataSetBuilder<T>.data(name: Name, data: T, block: MetaBuilder.() -> Unit = {}) {
set(name, Data.static(data, Meta(block))) set(name, Data.static(data, Meta(block)))
} }
public fun <T : Any> DataSetBuilder<T>.data(name: String, data: T, block: MetaBuilder.() -> Unit = {}) { public suspend fun <T : Any> DataSetBuilder<T>.data(name: String, data: T, block: MetaBuilder.() -> Unit = {}) {
set(name.toName(), Data.static(data, Meta(block))) set(name.toName(), Data.static(data, Meta(block)))
} }
@ -55,7 +86,7 @@ public suspend fun <T : Any> DataSetBuilder<T>.set(name: String, set: DataSet<T>
this.set(name.toName(), set) this.set(name.toName(), set)
} }
public operator fun <T : Any> DataSetBuilder<T>.set(name: String, block: DataSetBuilder<T>.() -> Unit): Unit = public suspend fun <T : Any> DataSetBuilder<T>.set(name: String, block: suspend DataSetBuilder<T>.() -> Unit): Unit =
this@set.set(name.toName(), block) this@set.set(name.toName(), block)
@ -63,9 +94,15 @@ public operator fun <T : Any> DataSetBuilder<T>.set(name: String, block: DataSet
* Update data with given node data and meta with node meta. * Update data with given node data and meta with node meta.
*/ */
@DFExperimental @DFExperimental
public suspend fun <T: Any> DataSetBuilder<T>.update(tree: DataSet<T>): Unit = coroutineScope{ public suspend fun <T : Any> DataSetBuilder<T>.update(tree: DataSet<T>): Unit = coroutineScope {
tree.flow().collect { tree.flow().collect {
//TODO check if the place is occupied //TODO check if the place is occupied
set(it.name, it.data) set(it.name, it.data)
} }
} }
public suspend fun <T : Any> DataSetBuilder<T>.collectFrom(flow: Flow<NamedData<T>>) {
flow.collect {
set(it.name, it.data)
}
}

View File

@ -46,9 +46,11 @@ public interface DataTree<out T : Any> : DataSet<T> {
override fun flow(): Flow<NamedData<T>> = flow { override fun flow(): Flow<NamedData<T>> = flow {
items().forEach { (token, childItem: DataTreeItem<T>) -> items().forEach { (token, childItem: DataTreeItem<T>) ->
when (childItem) { if(!token.body.startsWith("@")) {
is DataTreeItem.Leaf -> emit(childItem.data.named(token.asName())) when (childItem) {
is DataTreeItem.Node -> emitAll(childItem.tree.flow().map { it.named(token + it.name) }) is DataTreeItem.Leaf -> emit(childItem.data.named(token.asName()))
is DataTreeItem.Node -> emitAll(childItem.tree.flow().map { it.named(token + it.name) })
}
} }
} }
} }

View File

@ -41,7 +41,7 @@ public interface GroupRule {
set.flow().collect { data -> set.flow().collect { data ->
val tagValue = data.meta[key]?.string ?: defaultTagValue val tagValue = data.meta[key]?.string ?: defaultTagValue
map.getOrPut(tagValue) { MutableDataTree(dataType, scope) }.set(data.name, data.data) map.getOrPut(tagValue) { MutableDataTree(dataType) }.set(data.name, data.data)
} }
return map return map

View File

@ -33,16 +33,16 @@ public class MapActionBuilder<T, R>(public var name: Name, public var meta: Meta
} }
public class MapAction<T : Any, out R : Any>( public class MapAction<in T : Any, out R : Any>(
public val outputType: KClass<out R>, public val outputType: KClass<out R>,
private val block: MapActionBuilder<T, R>.() -> Unit, private val block: MapActionBuilder<T, R>.() -> Unit,
) : Action<T, R> { ) : Action<T, R> {
override suspend fun run( override suspend fun execute(
set: DataSet<T>, dataSet: DataSet<T>,
meta: Meta, meta: Meta,
scope: CoroutineScope, scope: CoroutineScope?,
): DataSet<R> = DataTree.dynamic(outputType, scope) { ): DataSet<R> {
suspend fun mapOne(data: NamedData<T>): NamedData<R> { suspend fun mapOne(data: NamedData<T>): NamedData<R> {
// Creating a new environment for action using **old** name, old meta and task meta // Creating a new environment for action using **old** name, old meta and task meta
val env = ActionEnv(data.name, data.meta, meta) val env = ActionEnv(data.name, data.meta, meta)
@ -65,23 +65,26 @@ public class MapAction<T : Any, out R : Any>(
return newData.named(newName) return newData.named(newName)
} }
collectFrom(set.flow().map(::mapOne)) val flow = dataSet.flow().map(::mapOne)
scope.launch {
set.updates.collect { name -> return DataTree.dynamic(outputType) {
//clear old nodes collectFrom(flow)
remove(name) scope?.launch {
//collect new items dataSet.updates.collect { name ->
collectFrom(set.flowChildren(name).map(::mapOne)) //clear old nodes
remove(name)
//collect new items
collectFrom(dataSet.flowChildren(name).map(::mapOne))
}
} }
} }
} }
} }
public suspend inline fun <T : Any, reified R : Any> DataSet<T>.map(
meta: Meta, @Suppress("FunctionName")
updatesScope: CoroutineScope, public inline fun <T : Any, reified R : Any> MapAction(
noinline action: MapActionBuilder<in T, out R>.() -> Unit, noinline builder: MapActionBuilder<T, R>.() -> Unit,
): DataSet<R> = MapAction(R::class, action).run(this, meta, updatesScope) ): MapAction<T, R> = MapAction(R::class, builder)

View File

@ -3,7 +3,7 @@ package hep.dataforge.data
import hep.dataforge.meta.* import hep.dataforge.meta.*
import hep.dataforge.names.* import hep.dataforge.names.*
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
@ -15,12 +15,13 @@ import kotlin.reflect.KClass
*/ */
public class MutableDataTree<T : Any>( public class MutableDataTree<T : Any>(
override val dataType: KClass<out T>, override val dataType: KClass<out T>,
public val scope: CoroutineScope,
) : DataTree<T>, DataSetBuilder<T> { ) : DataTree<T>, DataSetBuilder<T> {
private val mutex = Mutex() private val mutex = Mutex()
private val treeItems = HashMap<NameToken, DataTreeItem<T>>() private val treeItems = HashMap<NameToken, DataTreeItem<T>>()
override suspend fun items(): Map<NameToken, DataTreeItem<T>> = mutex.withLock { treeItems } override suspend fun items(): Map<NameToken, DataTreeItem<T>> = mutex.withLock {
treeItems.filter { !it.key.body.startsWith("@") }
}
private val _updates = MutableSharedFlow<Name>() private val _updates = MutableSharedFlow<Name>()
@ -35,36 +36,37 @@ public class MutableDataTree<T : Any>(
} }
} }
override fun remove(name: Name) { override suspend fun remove(name: Name) {
scope.launch { if (name.isEmpty()) error("Can't remove the root node")
if (name.isEmpty()) error("Can't remove the root node") (getItem(name.cutLast()).tree as? MutableDataTree)?.remove(name.lastOrNull()!!)
(getItem(name.cutLast()).tree as? MutableDataTree)?.remove(name.lastOrNull()!!)
}
} }
private suspend fun set(token: NameToken, node: DataSet<T>) { // private suspend fun set(token: NameToken, node: DataSet<T>) {
//if (_map.containsKey(token)) error("Tree entry with name $token is not empty") // //if (_map.containsKey(token)) error("Tree entry with name $token is not empty")
mutex.withLock { // mutex.withLock {
treeItems[token] = DataTreeItem.Node(node.toMutableTree(scope)) // treeItems[token] = DataTreeItem.Node(node.toMutableTree())
coroutineScope { // coroutineScope {
node.updates.onEach { // node.updates.onEach {
_updates.emit(token + it) // _updates.emit(token + it)
}.launchIn(this) // }.launchIn(this)
} // }
_updates.emit(token.asName()) // _updates.emit(token.asName())
} // }
} // }
private suspend fun set(token: NameToken, data: Data<T>) { private suspend fun set(token: NameToken, data: Data<T>) {
mutex.withLock { mutex.withLock {
treeItems[token] = DataTreeItem.Leaf(data) treeItems[token] = DataTreeItem.Leaf(data)
_updates.emit(token.asName())
} }
} }
private suspend fun getOrCreateNode(token: NameToken): MutableDataTree<T> = private suspend fun getOrCreateNode(token: NameToken): MutableDataTree<T> =
(treeItems[token] as? DataTreeItem.Node<T>)?.tree as? MutableDataTree<T> (treeItems[token] as? DataTreeItem.Node<T>)?.tree as? MutableDataTree<T>
?: MutableDataTree(dataType, scope).also { set(token, it) } ?: MutableDataTree(dataType).also {
mutex.withLock {
treeItems[token] = DataTreeItem.Node(it)
}
}
private suspend fun getOrCreateNode(name: Name): MutableDataTree<T> { private suspend fun getOrCreateNode(name: Name): MutableDataTree<T> {
return when (name.length) { return when (name.length) {
@ -74,83 +76,53 @@ public class MutableDataTree<T : Any>(
} }
} }
override fun set(name: Name, data: Data<T>?) { override suspend fun set(name: Name, data: Data<T>?) {
if (data == null) { if (data == null) {
remove(name) remove(name)
} else { } else {
scope.launch { when (name.length) {
when (name.length) { 0 -> error("Can't add data with empty name")
0 -> error("Can't add data with empty name") 1 -> set(name.firstOrNull()!!, data)
1 -> set(name.firstOrNull()!!, data) 2 -> getOrCreateNode(name.cutLast()).set(name.lastOrNull()!!, data)
2 -> getOrCreateNode(name.cutLast()).set(name.lastOrNull()!!, data)
}
} }
} }
_updates.emit(name)
} }
private suspend fun setTree(name: Name, node: MutableDataTree<out T>) { /**
when (name.length) { * Copy given data set and mirror its changes to this [MutableDataTree] in [this@setAndObserve]. Returns an update [Job]
0 -> error("Can't add data with empty name") */
1 -> set(name.firstOrNull()!!, node) public fun CoroutineScope.setAndObserve(name: Name, dataSet: DataSet<T>): Job = launch {
2 -> getOrCreateNode(name.cutLast()).set(name.lastOrNull()!!, node) set(name, dataSet)
dataSet.updates.collect { nameInBranch ->
set(name + nameInBranch, dataSet.getData(nameInBranch))
} }
} }
override suspend fun set(name: Name, dataSet: DataSet<T>): Unit {
if (dataSet is MutableDataTree) {
setTree(name, dataSet)
} else {
setTree(name, dataSet.toMutableTree(scope))
}
}
override fun set(name: Name, block: DataSetBuilder<T>.() -> Unit) {
scope.launch {
setTree(name, MutableDataTree(dataType, scope).apply(block))
}
}
public fun collectFrom(flow: Flow<NamedData<T>>) {
flow.onEach {
set(it.name, it.data)
}.launchIn(scope)
}
} }
/**
* Create a dynamic tree. Initial data is placed synchronously. Updates are propagated via [updatesScope]
*/
public suspend fun <T : Any> DataTree.Companion.dynamic( public suspend fun <T : Any> DataTree.Companion.dynamic(
type: KClass<out T>, type: KClass<out T>,
updatesScope: CoroutineScope,
block: suspend MutableDataTree<T>.() -> Unit, block: suspend MutableDataTree<T>.() -> Unit,
): DataTree<T> { ): DataTree<T> {
val tree = MutableDataTree(type, updatesScope) val tree = MutableDataTree(type)
tree.block() tree.block()
return tree return tree
} }
public suspend inline fun <reified T : Any> DataTree.Companion.dynamic( public suspend inline fun <reified T : Any> DataTree.Companion.dynamic(
updatesScope: CoroutineScope,
crossinline block: suspend MutableDataTree<T>.() -> Unit, crossinline block: suspend MutableDataTree<T>.() -> Unit,
): DataTree<T> = MutableDataTree(T::class, updatesScope).apply { block() } ): DataTree<T> = MutableDataTree(T::class).apply { block() }
public suspend inline fun <reified T : Any> MutableDataTree<T>.set( public suspend inline fun <reified T : Any> MutableDataTree<T>.set(
name: Name, name: Name,
noinline block: suspend MutableDataTree<T>.() -> Unit, noinline block: suspend MutableDataTree<T>.() -> Unit,
): Unit = set(name, DataTree.dynamic(T::class, scope, block)) ): Unit = set(name, DataTree.dynamic(T::class, block))
public suspend inline fun <reified T : Any> MutableDataTree<T>.set( public suspend inline fun <reified T : Any> MutableDataTree<T>.set(
name: String, name: String,
noinline block: suspend MutableDataTree<T>.() -> Unit, noinline block: suspend MutableDataTree<T>.() -> Unit,
): Unit = set(name.toName(), DataTree.dynamic(T::class, scope, block)) ): Unit = set(name.toName(), DataTree.dynamic(T::class, block))
/**
* Generate a mutable builder from this node. Node content is not changed
*/
public suspend fun <T : Any> DataSet<T>.toMutableTree(
scope: CoroutineScope,
): MutableDataTree<T> = MutableDataTree(dataType, scope).apply {
flow().collect { set(it.name, it.data) }
this@toMutableTree.updates.onEach {
set(it, getData(it))
}.launchIn(scope)
}

View File

@ -13,7 +13,6 @@ import kotlin.collections.set
import kotlin.reflect.KClass import kotlin.reflect.KClass
public class SplitBuilder<T : Any, R : Any>(public val name: Name, public val meta: Meta) { public class SplitBuilder<T : Any, R : Any>(public val name: Name, public val meta: Meta) {
public class FragmentRule<T : Any, R : Any>(public val name: Name, public var meta: MetaBuilder) { public class FragmentRule<T : Any, R : Any>(public val name: Name, public var meta: MetaBuilder) {
@ -44,11 +43,11 @@ public class SplitAction<T : Any, R : Any>(
private val action: SplitBuilder<T, R>.() -> Unit, private val action: SplitBuilder<T, R>.() -> Unit,
) : Action<T, R> { ) : Action<T, R> {
override suspend fun run( override suspend fun execute(
set: DataSet<T>, dataSet: DataSet<T>,
meta: Meta, meta: Meta,
scope: CoroutineScope, scope: CoroutineScope?,
): DataSet<R> = DataTree.dynamic(outputType, scope) { ): DataSet<R> {
suspend fun splitOne(data: NamedData<T>): Flow<NamedData<R>> { suspend fun splitOne(data: NamedData<T>): Flow<NamedData<R>> {
val laminate = Laminate(data.meta, meta) val laminate = Laminate(data.meta, meta)
@ -63,13 +62,15 @@ public class SplitAction<T : Any, R : Any>(
} }
} }
collectFrom(set.flow().flatMapConcat(transform = ::splitOne)) return DataTree.dynamic(outputType) {
scope.launch { collectFrom(dataSet.flow().flatMapConcat(transform = ::splitOne))
set.updates.collect { name -> scope?.launch {
//clear old nodes dataSet.updates.collect { name ->
remove(name) //clear old nodes
//collect new items remove(name)
collectFrom(set.flowChildren(name).flatMapConcat(transform = ::splitOne)) //collect new items
collectFrom(dataSet.flowChildren(name).flatMapConcat(transform = ::splitOne))
}
} }
} }
} }

View File

@ -5,7 +5,8 @@ import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
import kotlin.reflect.KClass import kotlin.reflect.KClass
private class StaticDataTree<T : Any>( @PublishedApi
internal class StaticDataTree<T : Any>(
override val dataType: KClass<out T>, override val dataType: KClass<out T>,
) : DataSetBuilder<T>, DataTree<T> { ) : DataSetBuilder<T>, DataTree<T> {
@ -13,9 +14,9 @@ private class StaticDataTree<T : Any>(
override val updates: Flow<Name> = emptyFlow() override val updates: Flow<Name> = emptyFlow()
override suspend fun items(): Map<NameToken, DataTreeItem<T>> = items override suspend fun items(): Map<NameToken, DataTreeItem<T>> = items.filter { !it.key.body.startsWith("@") }
override fun remove(name: Name) { override suspend fun remove(name: Name) {
when (name.length) { when (name.length) {
0 -> error("Can't remove root tree node") 0 -> error("Can't remove root tree node")
1 -> items.remove(name.firstOrNull()!!) 1 -> items.remove(name.firstOrNull()!!)
@ -34,7 +35,7 @@ private class StaticDataTree<T : Any>(
else -> getOrCreateNode(name.cutLast()).getOrCreateNode(name.lastOrNull()!!.asName()) else -> getOrCreateNode(name.cutLast()).getOrCreateNode(name.lastOrNull()!!.asName())
} }
private operator fun set(name: Name, item: DataTreeItem<T>?) { private suspend fun set(name: Name, item: DataTreeItem<T>?) {
if (name.isEmpty()) error("Can't set top level tree node") if (name.isEmpty()) error("Can't set top level tree node")
if (item == null) { if (item == null) {
remove(name) remove(name)
@ -43,7 +44,7 @@ private class StaticDataTree<T : Any>(
} }
} }
override fun set(name: Name, data: Data<T>?) { override suspend fun set(name: Name, data: Data<T>?) {
set(name, data?.let { DataTreeItem.Leaf(it) }) set(name, data?.let { DataTreeItem.Leaf(it) })
} }
@ -58,20 +59,15 @@ private class StaticDataTree<T : Any>(
} }
} }
} }
override fun set(name: Name, block: DataSetBuilder<T>.() -> Unit) {
val tree = StaticDataTree(dataType).apply(block)
set(name, DataTreeItem.Node(tree))
}
} }
public fun <T : Any> DataTree.Companion.static( public suspend fun <T : Any> DataTree.Companion.static(
dataType: KClass<out T>, dataType: KClass<out T>,
block: DataSetBuilder<T>.() -> Unit, block: suspend DataSetBuilder<T>.() -> Unit,
): DataTree<T> = StaticDataTree(dataType).apply(block) ): DataTree<T> = StaticDataTree(dataType).apply { block() }
public inline fun <reified T : Any> DataTree.Companion.static( public suspend inline fun <reified T : Any> DataTree.Companion.static(
noinline block: DataSetBuilder<T>.() -> Unit, noinline block: suspend DataSetBuilder<T>.() -> Unit,
): DataTree<T> = static(T::class, block) ): DataTree<T> = static(T::class, block)
public suspend fun <T : Any> DataSet<T>.toStaticTree(): DataTree<T> = StaticDataTree(dataType).apply { public suspend fun <T : Any> DataSet<T>.toStaticTree(): DataTree<T> = StaticDataTree(dataType).apply {

View File

@ -12,9 +12,9 @@ public suspend fun DataSet<*>.getMeta(): Meta? = getData(DataSet.META_KEY)?.meta
/** /**
* Add meta-data node to a [DataSet] * Add meta-data node to a [DataSet]
*/ */
public fun DataSetBuilder<*>.meta(meta: Meta): Unit = set(DataSet.META_KEY, Data.empty(meta)) public suspend fun DataSetBuilder<*>.meta(meta: Meta): Unit = set(DataSet.META_KEY, Data.empty(meta))
/** /**
* Add meta-data node to a [DataSet] * Add meta-data node to a [DataSet]
*/ */
public fun DataSetBuilder<*>.meta(metaBuilder: MetaBuilder.() -> Unit): Unit = meta(Meta(metaBuilder)) public suspend fun DataSetBuilder<*>.meta(metaBuilder: MetaBuilder.() -> Unit): Unit = meta(Meta(metaBuilder))

View File

@ -0,0 +1,39 @@
package hep.dataforge.data
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import kotlin.test.assertEquals
class ActionsTest {
val data: DataTree<Int> = runBlocking {
DataTree.static {
repeat(10) {
data(it.toString(), it)
}
}
}
@Test
fun testStaticMapAction() {
val plusOne = MapAction<Int, Int> {
result { it + 1 }
}
runBlocking {
val result = plusOne.execute(data)
assertEquals(2, result.getData("1")?.value())
}
}
@Test
fun testDynamicMapAction() {
val plusOne = MapAction<Int, Int> {
result { it + 1 }
}
val datum = runBlocking {
val result = plusOne.execute(data, scope = this)
result.getData("1")?.value()
}
assertEquals(2, datum)
}
}

View File

@ -1,34 +1,69 @@
package hep.dataforge.data package hep.dataforge.data
import kotlinx.coroutines.runBlocking import hep.dataforge.names.toName
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.collect
import kotlin.test.Test import kotlin.test.Test
import kotlin.test.assertTrue import kotlin.test.assertEquals
internal class DataTreeBuilderTest{ internal class DataTreeBuilderTest {
@Test @Test
fun testDataUpdate(){ fun testDataUpdate() = runBlocking {
val updateData: DataTree<Any> = DataTree.static{ val updateData: DataTree<Any> = DataTree.static {
"update" put { "update" put {
"a" put Data.static("a") "a" put Data.static("a")
"b" put Data.static("b") "b" put Data.static("b")
} }
} }
val node = DataTree.static<Any>{ val node = DataTree.static<Any> {
set("primary"){ set("primary") {
data("a","a") data("a", "a")
data("b","b") data("b", "b")
}
data("root","root")
runBlocking {
update(updateData)
} }
data("root", "root")
update(updateData)
} }
assertTrue { node.branch("update.a") != null } assertEquals("a", node.getData("update.a")?.value())
assertTrue { node.branch("primary.a") != null } assertEquals("a", node.getData("primary.a")?.value())
}
@Test
fun testDynamicUpdates() = runBlocking {
try {
supervisorScope {
val subNode = DataTree.dynamic<Int> {
launch {
repeat(10) {
delay(10)
data("value", it)
}
}
}
launch {
subNode.updatesWithData.collect {
println(it)
}
}
val rootNode = DataTree.dynamic<Int> {
setAndObserve("sub".toName(), subNode)
}
launch {
rootNode.updatesWithData.collect {
println(it)
}
}
delay(200)
assertEquals(9, rootNode.getData("sub.value")?.value())
cancel()
}
} catch (t: Throwable) {
if (t !is CancellationException) throw t
}
} }
} }

View File

@ -193,7 +193,7 @@ public fun Name.endsWith(name: Name): Boolean =
* if [this] starts with given [head] name, returns the reminder of the name (could be empty). Otherwise returns null * if [this] starts with given [head] name, returns the reminder of the name (could be empty). Otherwise returns null
*/ */
public fun Name.removeHeadOrNull(head: Name): Name? = if (startsWith(head)) { public fun Name.removeHeadOrNull(head: Name): Name? = if (startsWith(head)) {
Name(tokens.subList(head.length, head.length)) Name(tokens.subList(head.length, length))
} else { } else {
null null
} }

View File

@ -3,13 +3,12 @@ package hep.dataforge.workspace
import hep.dataforge.data.DataSet import hep.dataforge.data.DataSet
import hep.dataforge.meta.Meta import hep.dataforge.meta.Meta
import hep.dataforge.meta.descriptors.Described import hep.dataforge.meta.descriptors.Described
import hep.dataforge.misc.Named
import hep.dataforge.misc.Type import hep.dataforge.misc.Type
import hep.dataforge.workspace.Task.Companion.TYPE import hep.dataforge.workspace.Task.Companion.TYPE
import kotlin.reflect.KClass import kotlin.reflect.KClass
@Type(TYPE) @Type(TYPE)
public interface Task<out R : Any> : Named, Described { public interface Task<out R : Any> : Described {
/** /**
* The explicit type of the node returned by the task * The explicit type of the node returned by the task

View File

@ -104,12 +104,11 @@ public fun TaskDependencyContainer.data(
pattern: String? = null, pattern: String? = null,
from: String? = null, from: String? = null,
to: String? = null, to: String? = null,
): DataDependency = ): DataDependency = data {
data { pattern?.let { this.pattern = it }
pattern?.let { this.pattern = it } from?.let { this.from = it }
from?.let { this.from = it } to?.let { this.to = it }
to?.let { this.to = it } }
}
///** ///**
// * Add all data as root node // * Add all data as root node

View File

@ -35,14 +35,7 @@ public class TaskBuilder<R : Any>(private val name: Name, public val type: KClas
public val meta: Meta, public val meta: Meta,
public val context: Context, public val context: Context,
public val data: DataSet<Any>, public val data: DataSet<Any>,
) { )
// public operator fun <T : Any> DirectTaskDependency<T>.invoke(): DataSet<T> = if (placement.isEmpty()) {
// data.cast(task.type)
// } else {
// data[placement].tree?.cast(task.type)
// ?: error("Could not find results of direct task dependency $this at \"$placement\"")
// }
}
/** /**
* Add a transformation on untyped data * Add a transformation on untyped data
@ -86,7 +79,7 @@ public class TaskBuilder<R : Any>(private val name: Name, public val type: KClas
crossinline block: TaskEnv.() -> Action<T, R>, crossinline block: TaskEnv.() -> Action<T, R>,
) { ) {
transform { data: DataSet<T> -> transform { data: DataSet<T> ->
block().run(data, meta, context) block().execute(data, meta, context)
} }
} }
@ -194,15 +187,19 @@ public class TaskBuilder<R : Any>(private val name: Name, public val type: KClas
logger.warn { "No transformation present, returning input data" } logger.warn { "No transformation present, returning input data" }
dataSet.castOrNull(type) ?: error("$type expected, but $type received") dataSet.castOrNull(type) ?: error("$type expected, but $type received")
} else { } else {
val builder = MutableDataTree(type, workspace.context) DataTree.dynamic(type, workspace.context){
dataTransforms.forEach { transformation -> dataTransforms.forEach { transformation ->
val res = transformation(workspace.context, model, dataSet) val res = transformation(workspace.context, model, dataSet)
builder.update(res) update(res)
}
} }
builder
} }
} }
} }
} }
} }
@DFExperimental
public suspend inline fun <reified T : Any> TaskBuilder.TaskEnv.dataTree(
crossinline block: suspend MutableDataTree<T>.() -> Unit,
): DataTree<T> = DataTree.dynamic(context, block)

View File

@ -37,6 +37,8 @@ public fun WorkspaceBuilder.target(name: String, block: MetaBuilder.() -> Unit)
targets[name] = Meta(block).seal() targets[name] = Meta(block).seal()
} }
class WorkspaceTask(val workspace: Workspace, val name: String)
/** /**
* Use existing target as a base updating it with the block * Use existing target as a base updating it with the block
*/ */
@ -52,7 +54,7 @@ public fun <T : Any> WorkspaceBuilder.task(
name: String, name: String,
type: KClass<out T>, type: KClass<out T>,
builder: TaskBuilder<T>.() -> Unit, builder: TaskBuilder<T>.() -> Unit,
): Task<T> = TaskBuilder(name.toName(), type).apply(builder).build().also { tasks.add(it) } ): WorkspaceTask = TaskBuilder(name.toName(), type).apply(builder).build().also { tasks.add(it) }
public inline fun <reified T : Any> WorkspaceBuilder.task( public inline fun <reified T : Any> WorkspaceBuilder.task(
name: String, name: String,

View File

@ -33,7 +33,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
val testSingleData = task("singleData", Int::class) { val testSingleData = task("singleData", Int::class) {
model { model {
data("myData\\[12\\]") data(pattern = "myData\\[12\\]")
} }
transform<Int> { data -> transform<Int> { data ->
DataTree.dynamic(context) { DataTree.dynamic(context) {

View File

@ -85,7 +85,7 @@ class SimpleWorkspaceTest {
transform<Int> { data -> transform<Int> { data ->
val squareNode = data.branch("square").filterIsInstance<Int>() //squareDep() val squareNode = data.branch("square").filterIsInstance<Int>() //squareDep()
val linearNode = data.branch("linear").filterIsInstance<Int>() //linearDep() val linearNode = data.branch("linear").filterIsInstance<Int>() //linearDep()
DataTree.dynamic<Int>(context) { dataTree {
squareNode.flow().collect { squareNode.flow().collect {
val newData: Data<Int> = Data { val newData: Data<Int> = Data {
val squareValue = squareNode.getData(it.name)!!.value() val squareValue = squareNode.getData(it.name)!!.value()
@ -150,6 +150,7 @@ class SimpleWorkspaceTest {
} }
@Test @Test
@Timeout(1)
fun testWorkspace() { fun testWorkspace() {
val node = workspace.runBlocking("sum") val node = workspace.runBlocking("sum")
val res = node.first() val res = node.first()
@ -157,7 +158,7 @@ class SimpleWorkspaceTest {
} }
@Test @Test
@Timeout(400) @Timeout(1)
fun testMetaPropagation() { fun testMetaPropagation() {
val node = workspace.runBlocking("sum") { "testFlag" put true } val node = workspace.runBlocking("sum") { "testFlag" put true }
val res = node.first()?.value() val res = node.first()?.value()
@ -179,7 +180,7 @@ class SimpleWorkspaceTest {
} }
@Test @Test
fun testGather() { fun testFilter() {
val node = workspace.runBlocking("filterOne") val node = workspace.runBlocking("filterOne")
runBlocking { runBlocking {
assertEquals(12, node.first()?.value()) assertEquals(12, node.first()?.value())