Refactor DataSet. Remove suspends where it is possible.

This commit is contained in:
Alexander Nozik 2022-05-04 17:27:56 +03:00
parent bedab0dc86
commit 0622bacc4d
No known key found for this signature in database
GPG Key ID: F7FCF2DD25C71357
26 changed files with 418 additions and 352 deletions

View File

@ -14,6 +14,7 @@
- DataSet `getData` is no longer suspended and renamed to `get` - DataSet `getData` is no longer suspended and renamed to `get`
- DataSet operates with sequences of data instead of flows - DataSet operates with sequences of data instead of flows
- PartialEnvelope uses `Int` instead `UInt`. - PartialEnvelope uses `Int` instead `UInt`.
- `ActiveDataSet` renamed to `DataSource`
### Deprecated ### Deprecated

View File

@ -4,10 +4,7 @@ plugins {
allprojects { allprojects {
group = "space.kscience" group = "space.kscience"
version = "0.6.0-dev-4" version = "0.6.0-dev-5"
repositories{
mavenCentral()
}
} }
subprojects { subprojects {

View File

@ -1,6 +1,5 @@
package space.kscience.dataforge.actions package space.kscience.dataforge.actions
import kotlinx.coroutines.CoroutineScope
import space.kscience.dataforge.data.DataSet import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
@ -9,13 +8,12 @@ import space.kscience.dataforge.misc.DFExperimental
* A simple data transformation on a data node. Actions should avoid doing actual dependency evaluation in [execute]. * A simple data transformation on a data node. Actions should avoid doing actual dependency evaluation in [execute].
*/ */
public interface Action<in T : Any, out R : Any> { public interface Action<in T : Any, out R : Any> {
/** /**
* Transform the data in the node, producing a new node. By default it is assumed that all calculations are lazy * Transform the data in the node, producing a new node. By default, it is assumed that all calculations are lazy
* so not actual computation is started at this moment. * so not actual computation is started at this moment.
*
* [scope] context used to compute the initial result, also it is used for updates propagation
*/ */
public suspend fun execute(dataSet: DataSet<T>, meta: Meta = Meta.EMPTY, scope: CoroutineScope? = null): DataSet<R> public fun execute(dataSet: DataSet<T>, meta: Meta = Meta.EMPTY): DataSet<R>
public companion object public companion object
} }
@ -26,16 +24,17 @@ public interface Action<in T : Any, out R : Any> {
public infix fun <T : Any, I : Any, R : Any> Action<T, I>.then(action: Action<I, R>): Action<T, R> { public infix fun <T : Any, I : Any, R : Any> Action<T, I>.then(action: Action<I, R>): Action<T, R> {
// TODO introduce composite action and add optimize by adding action to the list // TODO introduce composite action and add optimize by adding action to the list
return object : Action<T, R> { return object : Action<T, R> {
override suspend fun execute(dataSet: DataSet<T>, meta: Meta, scope: CoroutineScope?): DataSet<R> {
return action.execute(this@then.execute(dataSet, meta, scope), meta, scope) override fun execute(
} dataSet: DataSet<T>,
meta: Meta,
): DataSet<R> = action.execute(this@then.execute(dataSet, meta), meta)
} }
} }
@DFExperimental @DFExperimental
public suspend fun <T : Any, R : Any> DataSet<T>.transformWith( public operator fun <T : Any, R : Any> Action<T, R>.invoke(
action: Action<T, R>, dataSet: DataSet<T>,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
scope: CoroutineScope? = null, ): DataSet<R> = execute(dataSet, meta)
): DataSet<R> = action.execute(this, meta, scope)

View File

@ -0,0 +1,53 @@
package space.kscience.dataforge.actions
import kotlinx.coroutines.launch
import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.startsWith
import kotlin.reflect.KType
/**
* Remove all values with keys starting with [name]
*/
internal fun MutableMap<Name, *>.removeWhatStartsWith(name: Name) {
val toRemove = keys.filter { it.startsWith(name) }
toRemove.forEach(::remove)
}
/**
* An action that caches results on-demand and recalculates them on source push
*/
public abstract class CachingAction<in T : Any, out R : Any>(
public val outputType: KType,
) : Action<T, R> {
protected abstract fun transform(
set: DataSet<T>,
meta: Meta,
key: Name = Name.EMPTY,
): Sequence<NamedData<R>>
override fun execute(
dataSet: DataSet<T>,
meta: Meta,
): DataSet<R> = if (dataSet is DataSource) {
DataSourceBuilder<R>(outputType, dataSet.coroutineContext).apply {
populateFrom(transform(dataSet, meta))
launch {
dataSet.updates.collect {
//clear old nodes
remove(it)
//collect new items
populateFrom(transform(dataSet, meta, it))
//FIXME if the target is data, updates are fired twice
}
}
}
} else {
DataTree<R>(outputType) {
populateFrom(transform(dataSet, meta))
}
}
}

View File

@ -1,6 +1,5 @@
package space.kscience.dataforge.actions package space.kscience.dataforge.actions
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import space.kscience.dataforge.data.* import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
@ -59,11 +58,11 @@ internal class MapAction<in T : Any, out R : Any>(
private val block: MapActionBuilder<T, R>.() -> Unit, private val block: MapActionBuilder<T, R>.() -> Unit,
) : Action<T, R> { ) : Action<T, R> {
override suspend fun execute( override fun execute(
dataSet: DataSet<T>, dataSet: DataSet<T>,
meta: Meta, meta: Meta,
scope: CoroutineScope?,
): DataSet<R> { ): DataSet<R> {
fun mapOne(data: NamedData<T>): NamedData<R> { fun mapOne(data: NamedData<T>): NamedData<R> {
// Creating a new environment for action using **old** name, old meta and task meta // Creating a new environment for action using **old** name, old meta and task meta
val env = ActionEnv(data.name, data.meta, meta) val env = ActionEnv(data.name, data.meta, meta)
@ -92,17 +91,23 @@ internal class MapAction<in T : Any, out R : Any>(
val sequence = dataSet.dataSequence().map(::mapOne) val sequence = dataSet.dataSequence().map(::mapOne)
return ActiveDataTree(outputType) { return if (dataSet is DataSource ) {
populateWith(sequence) ActiveDataTree(outputType, dataSet) {
scope?.launch { populateFrom(sequence)
launch {
dataSet.updates.collect { name -> dataSet.updates.collect { name ->
//clear old nodes //clear old nodes
remove(name) remove(name)
//collect new items //collect new items
populateWith(dataSet.children(name).map(::mapOne)) populateFrom(dataSet.children(name).map(::mapOne))
} }
} }
} }
} else {
DataTree(outputType) {
populateFrom(sequence)
}
}
} }
} }

View File

@ -1,8 +1,5 @@
package space.kscience.dataforge.actions package space.kscience.dataforge.actions
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import space.kscience.dataforge.data.* import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta import space.kscience.dataforge.meta.MutableMeta
@ -38,18 +35,17 @@ public class JoinGroup<T : Any, R : Any>(
@DFBuilder @DFBuilder
public class ReduceGroupBuilder<T : Any, R : Any>( public class ReduceGroupBuilder<T : Any, R : Any>(
private val scope: CoroutineScope,
public val actionMeta: Meta, public val actionMeta: Meta,
private val outputType: KType private val outputType: KType,
) { ) {
private val groupRules: MutableList<suspend (DataSet<T>) -> List<JoinGroup<T, R>>> = ArrayList(); private val groupRules: MutableList<(DataSet<T>) -> List<JoinGroup<T, R>>> = ArrayList();
/** /**
* introduce grouping by meta value * introduce grouping by meta value
*/ */
public fun byValue(tag: String, defaultTag: String = "@default", action: JoinGroup<T, R>.() -> Unit) { public fun byValue(tag: String, defaultTag: String = "@default", action: JoinGroup<T, R>.() -> Unit) {
groupRules += { node -> groupRules += { node ->
GroupRule.byMetaValue(scope, tag, defaultTag).gather(node).map { GroupRule.byMetaValue(tag, defaultTag).gather(node).map {
JoinGroup<T, R>(it.key, it.value, outputType).apply(action) JoinGroup<T, R>(it.key, it.value, outputType).apply(action)
} }
} }
@ -57,12 +53,12 @@ public class ReduceGroupBuilder<T : Any, R : Any>(
public fun group( public fun group(
groupName: String, groupName: String,
filter: (Name, Data<T>) -> Boolean, predicate: (Name, Meta) -> Boolean,
action: JoinGroup<T, R>.() -> Unit, action: JoinGroup<T, R>.() -> Unit,
) { ) {
groupRules += { source -> groupRules += { source ->
listOf( listOf(
JoinGroup<T, R>(groupName, source.filter(filter), outputType).apply(action) JoinGroup<T, R>(groupName, source.filter(predicate), outputType).apply(action)
) )
} }
} }
@ -76,7 +72,7 @@ public class ReduceGroupBuilder<T : Any, R : Any>(
} }
} }
internal suspend fun buildGroups(input: DataSet<T>): List<JoinGroup<T, R>> = internal fun buildGroups(input: DataSet<T>): List<JoinGroup<T, R>> =
groupRules.flatMap { it.invoke(input) } groupRules.flatMap { it.invoke(input) }
} }
@ -89,8 +85,8 @@ internal class ReduceAction<T : Any, R : Any>(
//TODO optimize reduction. Currently the whole action recalculates on push //TODO optimize reduction. Currently the whole action recalculates on push
override fun CoroutineScope.transform(set: DataSet<T>, meta: Meta, key: Name): Flow<NamedData<R>> = flow { override fun transform(set: DataSet<T>, meta: Meta, key: Name): Sequence<NamedData<R>> = sequence {
ReduceGroupBuilder<T, R>(this@transform, meta, outputType).apply(action).buildGroups(set).forEach { group -> ReduceGroupBuilder<T, R>(meta, outputType).apply(action).buildGroups(set).forEach { group ->
val dataFlow: Map<Name, Data<T>> = group.set.dataSequence().fold(HashMap()) { acc, value -> val dataFlow: Map<Name, Data<T>> = group.set.dataSequence().fold(HashMap()) { acc, value ->
acc.apply { acc.apply {
acc[value.name] = value.data acc[value.name] = value.data
@ -107,7 +103,7 @@ internal class ReduceAction<T : Any, R : Any>(
meta = groupMeta meta = groupMeta
) { group.result.invoke(env, it) } ) { group.result.invoke(env, it) }
emit(res.named(env.name)) yield(res.named(env.name))
} }
} }
} }

View File

@ -1,6 +1,5 @@
package space.kscience.dataforge.actions package space.kscience.dataforge.actions
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import space.kscience.dataforge.data.* import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Laminate import space.kscience.dataforge.meta.Laminate
@ -51,10 +50,9 @@ internal class SplitAction<T : Any, R : Any>(
private val action: SplitBuilder<T, R>.() -> Unit, private val action: SplitBuilder<T, R>.() -> Unit,
) : Action<T, R> { ) : Action<T, R> {
override suspend fun execute( override fun execute(
dataSet: DataSet<T>, dataSet: DataSet<T>,
meta: Meta, meta: Meta,
scope: CoroutineScope?,
): DataSet<R> { ): DataSet<R> {
fun splitOne(data: NamedData<T>): Sequence<NamedData<R>> { fun splitOne(data: NamedData<T>): Sequence<NamedData<R>> {
@ -77,17 +75,23 @@ internal class SplitAction<T : Any, R : Any>(
} }
} }
return ActiveDataTree<R>(outputType) { return if (dataSet is DataSource) {
populateWith(dataSet.dataSequence().flatMap(transform = ::splitOne)) ActiveDataTree<R>(outputType, dataSet) {
scope?.launch { populateFrom(dataSet.dataSequence().flatMap(transform = ::splitOne))
launch {
dataSet.updates.collect { name -> dataSet.updates.collect { name ->
//clear old nodes //clear old nodes
remove(name) remove(name)
//collect new items //collect new items
populateWith(dataSet.children(name).flatMap(transform = ::splitOne)) populateFrom(dataSet.children(name).flatMap(transform = ::splitOne))
} }
} }
} }
} else {
DataTree<R>(outputType) {
populateFrom(dataSet.dataSequence().flatMap(transform = ::splitOne))
}
}
} }
} }

View File

@ -1,103 +0,0 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.names.*
import kotlin.reflect.KType
import kotlin.reflect.typeOf
/**
* A mutable [DataTree].
*/
public class ActiveDataTree<T : Any>(
override val dataType: KType,
) : DataTree<T>, DataSetBuilder<T>, ActiveDataSet<T> {
private val mutex = Mutex()
private val treeItems = HashMap<NameToken, DataTreeItem<T>>()
override val items: Map<NameToken, DataTreeItem<T>>
get() = treeItems.filter { !it.key.body.startsWith("@") }
private val _updates = MutableSharedFlow<Name>()
override val updates: Flow<Name>
get() = _updates
private suspend fun remove(token: NameToken) = mutex.withLock {
if (treeItems.remove(token) != null) {
_updates.emit(token.asName())
}
}
override suspend fun remove(name: Name) {
if (name.isEmpty()) error("Can't remove the root node")
(getItem(name.cutLast()).tree as? ActiveDataTree)?.remove(name.lastOrNull()!!)
}
private suspend fun set(token: NameToken, data: Data<T>) = mutex.withLock {
treeItems[token] = DataTreeItem.Leaf(data)
}
private suspend fun getOrCreateNode(token: NameToken): ActiveDataTree<T> =
(treeItems[token] as? DataTreeItem.Node<T>)?.tree as? ActiveDataTree<T>
?: ActiveDataTree<T>(dataType).also {
mutex.withLock {
treeItems[token] = DataTreeItem.Node(it)
}
}
private suspend fun getOrCreateNode(name: Name): ActiveDataTree<T> = when (name.length) {
0 -> this
1 -> getOrCreateNode(name.firstOrNull()!!)
else -> getOrCreateNode(name.firstOrNull()!!).getOrCreateNode(name.cutFirst())
}
override suspend fun data(name: Name, data: Data<T>?) {
if (data == null) {
remove(name)
} else {
when (name.length) {
0 -> error("Can't add data with empty name")
1 -> set(name.firstOrNull()!!, data)
2 -> getOrCreateNode(name.cutLast()).set(name.lastOrNull()!!, data)
}
}
_updates.emit(name)
}
override suspend fun meta(name: Name, meta: Meta) {
val item = getItem(name)
if(item is DataTreeItem.Leaf) error("TODO: Can't change meta of existing leaf item.")
data(name + DataTree.META_ITEM_NAME_TOKEN, Data.empty(meta))
}
}
/**
* Create a dynamic tree. Initial data is placed synchronously. Updates are propagated via [updatesScope]
*/
@Suppress("FunctionName")
public suspend fun <T : Any> ActiveDataTree(
type: KType,
block: suspend ActiveDataTree<T>.() -> Unit,
): ActiveDataTree<T> {
val tree = ActiveDataTree<T>(type)
tree.block()
return tree
}
@Suppress("FunctionName")
public suspend inline fun <reified T : Any> ActiveDataTree(
crossinline block: suspend ActiveDataTree<T>.() -> Unit,
): ActiveDataTree<T> = ActiveDataTree<T>(typeOf<T>()).apply { block() }
public suspend inline fun <reified T : Any> ActiveDataTree<T>.emit(
name: Name,
noinline block: suspend ActiveDataTree<T>.() -> Unit,
): Unit = node(name, ActiveDataTree(typeOf<T>(), block))
public suspend inline fun <reified T : Any> ActiveDataTree<T>.emit(
name: String,
noinline block: suspend ActiveDataTree<T>.() -> Unit,
): Unit = node(Name.parse(name), ActiveDataTree(typeOf<T>(), block))

View File

@ -1,51 +0,0 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import space.kscience.dataforge.actions.Action
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.startsWith
import kotlin.reflect.KType
/**
* Remove all values with keys starting with [name]
*/
internal fun MutableMap<Name, *>.removeWhatStartsWith(name: Name) {
val toRemove = keys.filter { it.startsWith(name) }
toRemove.forEach(::remove)
}
/**
* An action that caches results on-demand and recalculates them on source push
*/
public abstract class CachingAction<in T : Any, out R : Any>(
public val outputType: KType,
) : Action<T, R> {
protected abstract fun CoroutineScope.transform(
set: DataSet<T>,
meta: Meta,
key: Name = Name.EMPTY,
): Flow<NamedData<R>>
override suspend fun execute(
dataSet: DataSet<T>,
meta: Meta,
scope: CoroutineScope?,
): DataSet<R> = ActiveDataTree<R>(outputType) {
coroutineScope {
populateWith(transform(dataSet, meta))
}
scope?.let {
dataSet.updates.collect {
//clear old nodes
remove(it)
//collect new items
populateWith(scope.transform(dataSet, meta, it))
//FIXME if the target is data, updates are fired twice
}
}
}
}

View File

@ -62,7 +62,11 @@ DataSet<out T : Any> {
public operator fun <T: Any> DataSet<T>.get(name:String): Data<T>? = get(name.parseAsName()) public operator fun <T: Any> DataSet<T>.get(name:String): Data<T>? = get(name.parseAsName())
public interface ActiveDataSet<T : Any> : DataSet<T> { /**
* A [DataSet] with propagated updates.
*/
public interface DataSource<T : Any> : DataSet<T>, CoroutineScope {
/** /**
* A flow of updated item names. Updates are propagated in a form of [Flow] of names of updated nodes. * A flow of updated item names. Updates are propagated in a form of [Flow] of names of updated nodes.
* Those can include new data items and replacement of existing ones. The replaced items could update existing data content * Those can include new data items and replacement of existing ones. The replaced items could update existing data content
@ -70,9 +74,16 @@ public interface ActiveDataSet<T : Any> : DataSet<T> {
* *
*/ */
public val updates: Flow<Name> public val updates: Flow<Name>
/**
* Stop generating updates from this [DataSource]
*/
public fun close(){
coroutineContext[Job]?.cancel()
}
} }
public val <T : Any> DataSet<T>.updates: Flow<Name> get() = if (this is ActiveDataSet) updates else emptyFlow() public val <T : Any> DataSet<T>.updates: Flow<Name> get() = if (this is DataSource) updates else emptyFlow()
/** /**
* Flow all data nodes with names starting with [branchName] * Flow all data nodes with names starting with [branchName]

View File

@ -1,7 +1,5 @@
package space.kscience.dataforge.data package space.kscience.dataforge.data
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
@ -16,14 +14,14 @@ public interface DataSetBuilder<in T : Any> {
/** /**
* Remove all data items starting with [name] * Remove all data items starting with [name]
*/ */
public suspend fun remove(name: Name) public fun remove(name: Name)
public suspend fun data(name: Name, data: Data<T>?) public fun data(name: Name, data: Data<T>?)
/** /**
* Set a current state of given [dataSet] into a branch [name]. Does not propagate updates * Set a current state of given [dataSet] into a branch [name]. Does not propagate updates
*/ */
public suspend fun node(name: Name, dataSet: DataSet<T>) { public fun node(name: Name, dataSet: DataSet<T>) {
//remove previous items //remove previous items
if (name != Name.EMPTY) { if (name != Name.EMPTY) {
remove(name) remove(name)
@ -38,19 +36,19 @@ public interface DataSetBuilder<in T : Any> {
/** /**
* Set meta for the given node * Set meta for the given node
*/ */
public suspend fun meta(name: Name, meta: Meta) public fun meta(name: Name, meta: Meta)
} }
/** /**
* Define meta in this [DataSet] * Define meta in this [DataSet]
*/ */
public suspend fun <T : Any> DataSetBuilder<T>.meta(value: Meta): Unit = meta(Name.EMPTY, value) public fun <T : Any> DataSetBuilder<T>.meta(value: Meta): Unit = meta(Name.EMPTY, value)
/** /**
* Define meta in this [DataSet] * Define meta in this [DataSet]
*/ */
public suspend fun <T : Any> DataSetBuilder<T>.meta(mutableMeta: MutableMeta.() -> Unit): Unit = meta(Meta(mutableMeta)) public fun <T : Any> DataSetBuilder<T>.meta(mutableMeta: MutableMeta.() -> Unit): Unit = meta(Meta(mutableMeta))
@PublishedApi @PublishedApi
internal class SubSetBuilder<in T : Any>( internal class SubSetBuilder<in T : Any>(
@ -59,52 +57,52 @@ internal class SubSetBuilder<in T : Any>(
) : DataSetBuilder<T> { ) : DataSetBuilder<T> {
override val dataType: KType get() = parent.dataType override val dataType: KType get() = parent.dataType
override suspend fun remove(name: Name) { override fun remove(name: Name) {
parent.remove(branch + name) parent.remove(branch + name)
} }
override suspend fun data(name: Name, data: Data<T>?) { override fun data(name: Name, data: Data<T>?) {
parent.data(branch + name, data) parent.data(branch + name, data)
} }
override suspend fun node(name: Name, dataSet: DataSet<T>) { override fun node(name: Name, dataSet: DataSet<T>) {
parent.node(branch + name, dataSet) parent.node(branch + name, dataSet)
} }
override suspend fun meta(name: Name, meta: Meta) { override fun meta(name: Name, meta: Meta) {
parent.meta(branch + name, meta) parent.meta(branch + name, meta)
} }
} }
public suspend inline fun <T : Any> DataSetBuilder<T>.node( public inline fun <T : Any> DataSetBuilder<T>.node(
name: Name, name: Name,
crossinline block: suspend DataSetBuilder<T>.() -> Unit, crossinline block: DataSetBuilder<T>.() -> Unit,
) { ) {
if (name.isEmpty()) block() else SubSetBuilder(this, name).block() if (name.isEmpty()) block() else SubSetBuilder(this, name).block()
} }
public suspend fun <T : Any> DataSetBuilder<T>.data(name: String, value: Data<T>) { public fun <T : Any> DataSetBuilder<T>.data(name: String, value: Data<T>) {
data(Name.parse(name), value) data(Name.parse(name), value)
} }
public suspend fun <T : Any> DataSetBuilder<T>.node(name: String, set: DataSet<T>) { public fun <T : Any> DataSetBuilder<T>.node(name: String, set: DataSet<T>) {
node(Name.parse(name), set) node(Name.parse(name), set)
} }
public suspend inline fun <T : Any> DataSetBuilder<T>.node( public inline fun <T : Any> DataSetBuilder<T>.node(
name: String, name: String,
crossinline block: suspend DataSetBuilder<T>.() -> Unit, crossinline block: DataSetBuilder<T>.() -> Unit,
): Unit = node(Name.parse(name), block) ): Unit = node(Name.parse(name), block)
public suspend fun <T : Any> DataSetBuilder<T>.set(value: NamedData<T>) { public fun <T : Any> DataSetBuilder<T>.set(value: NamedData<T>) {
data(value.name, value.data) data(value.name, value.data)
} }
/** /**
* Produce lazy [Data] and emit it into the [DataSetBuilder] * Produce lazy [Data] and emit it into the [DataSetBuilder]
*/ */
public suspend inline fun <reified T : Any> DataSetBuilder<T>.produce( public inline fun <reified T : Any> DataSetBuilder<T>.produce(
name: String, name: String,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
noinline producer: suspend () -> T, noinline producer: suspend () -> T,
@ -113,7 +111,7 @@ public suspend inline fun <reified T : Any> DataSetBuilder<T>.produce(
data(name, data) data(name, data)
} }
public suspend inline fun <reified T : Any> DataSetBuilder<T>.produce( public inline fun <reified T : Any> DataSetBuilder<T>.produce(
name: Name, name: Name,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
noinline producer: suspend () -> T, noinline producer: suspend () -> T,
@ -125,19 +123,19 @@ public suspend inline fun <reified T : Any> DataSetBuilder<T>.produce(
/** /**
* Emit a static data with the fixed value * Emit a static data with the fixed value
*/ */
public suspend inline fun <reified T : Any> DataSetBuilder<T>.static( public inline fun <reified T : Any> DataSetBuilder<T>.static(
name: String, name: String,
data: T, data: T,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
): Unit = data(name, Data.static(data, meta)) ): Unit = data(name, Data.static(data, meta))
public suspend inline fun <reified T : Any> DataSetBuilder<T>.static( public inline fun <reified T : Any> DataSetBuilder<T>.static(
name: Name, name: Name,
data: T, data: T,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
): Unit = data(name, Data.static(data, meta)) ): Unit = data(name, Data.static(data, meta))
public suspend inline fun <reified T : Any> DataSetBuilder<T>.static( public inline fun <reified T : Any> DataSetBuilder<T>.static(
name: String, name: String,
data: T, data: T,
mutableMeta: MutableMeta.() -> Unit, mutableMeta: MutableMeta.() -> Unit,
@ -147,20 +145,20 @@ public suspend inline fun <reified T : Any> DataSetBuilder<T>.static(
* Update data with given node data and meta with node meta. * Update data with given node data and meta with node meta.
*/ */
@DFExperimental @DFExperimental
public suspend fun <T : Any> DataSetBuilder<T>.populateFrom(tree: DataSet<T>): Unit = coroutineScope { public fun <T : Any> DataSetBuilder<T>.populateFrom(tree: DataSet<T>): Unit {
tree.dataSequence().forEach { tree.dataSequence().forEach {
//TODO check if the place is occupied //TODO check if the place is occupied
data(it.name, it.data) data(it.name, it.data)
} }
} }
public suspend fun <T : Any> DataSetBuilder<T>.populateWith(flow: Flow<NamedData<T>>) { //public fun <T : Any> DataSetBuilder<T>.populateFrom(flow: Flow<NamedData<T>>) {
flow.collect { // flow.collect {
data(it.name, it.data) // data(it.name, it.data)
} // }
} //}
public suspend fun <T : Any> DataSetBuilder<T>.populateWith(sequence: Sequence<NamedData<T>>) { public fun <T : Any> DataSetBuilder<T>.populateFrom(sequence: Sequence<NamedData<T>>) {
sequence.forEach { sequence.forEach {
data(it.name, it.data) data(it.name, it.data)
} }

View File

@ -0,0 +1,122 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.launch
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.*
import kotlin.collections.set
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.jvm.Synchronized
import kotlin.reflect.KType
import kotlin.reflect.typeOf
/**
* A mutable [DataTree] that propagates updates
*/
public class DataSourceBuilder<T : Any>(
override val dataType: KType,
coroutineContext: CoroutineContext,
) : DataTree<T>, DataSetBuilder<T>, DataSource<T> {
override val coroutineContext: CoroutineContext =
coroutineContext + Job(coroutineContext[Job]) + GoalExecutionRestriction()
private val treeItems = HashMap<NameToken, DataTreeItem<T>>()
override val items: Map<NameToken, DataTreeItem<T>>
get() = treeItems.filter { !it.key.body.startsWith("@") }
private val _updates = MutableSharedFlow<Name>()
override val updates: SharedFlow<Name>
get() = _updates
@Synchronized
private fun remove(token: NameToken) {
if (treeItems.remove(token) != null) {
launch {
_updates.emit(token.asName())
}
}
}
override fun remove(name: Name) {
if (name.isEmpty()) error("Can't remove the root node")
(getItem(name.cutLast()).tree as? DataSourceBuilder)?.remove(name.lastOrNull()!!)
}
@Synchronized
private fun set(token: NameToken, data: Data<T>) {
treeItems[token] = DataTreeItem.Leaf(data)
}
@Synchronized
private fun set(token: NameToken, node: DataTree<T>) {
treeItems[token] = DataTreeItem.Node(node)
}
private fun getOrCreateNode(token: NameToken): DataSourceBuilder<T> =
(treeItems[token] as? DataTreeItem.Node<T>)?.tree as? DataSourceBuilder<T>
?: DataSourceBuilder<T>(dataType, coroutineContext).also { set(token, it) }
private fun getOrCreateNode(name: Name): DataSourceBuilder<T> = when (name.length) {
0 -> this
1 -> getOrCreateNode(name.firstOrNull()!!)
else -> getOrCreateNode(name.firstOrNull()!!).getOrCreateNode(name.cutFirst())
}
override fun data(name: Name, data: Data<T>?) {
if (data == null) {
remove(name)
} else {
when (name.length) {
0 -> error("Can't add data with empty name")
1 -> set(name.firstOrNull()!!, data)
2 -> getOrCreateNode(name.cutLast()).set(name.lastOrNull()!!, data)
}
}
launch {
_updates.emit(name)
}
}
override fun meta(name: Name, meta: Meta) {
val item = getItem(name)
if (item is DataTreeItem.Leaf) error("TODO: Can't change meta of existing leaf item.")
data(name + DataTree.META_ITEM_NAME_TOKEN, Data.empty(meta))
}
}
/**
* Create a dynamic tree. Initial data is placed synchronously.
*/
@Suppress("FunctionName")
public fun <T : Any> ActiveDataTree(
type: KType,
parent: CoroutineScope,
block: DataSourceBuilder<T>.() -> Unit,
): DataSourceBuilder<T> {
val tree = DataSourceBuilder<T>(type, parent.coroutineContext)
tree.block()
return tree
}
@Suppress("FunctionName")
public suspend inline fun <reified T : Any> ActiveDataTree(
crossinline block: DataSourceBuilder<T>.() -> Unit = {},
): DataSourceBuilder<T> = DataSourceBuilder<T>(typeOf<T>(), coroutineContext).apply { block() }
public inline fun <reified T : Any> DataSourceBuilder<T>.emit(
name: Name,
parent: CoroutineScope,
noinline block: DataSourceBuilder<T>.() -> Unit,
): Unit = node(name, ActiveDataTree(typeOf<T>(), parent, block))
public inline fun <reified T : Any> DataSourceBuilder<T>.emit(
name: String,
parent: CoroutineScope,
noinline block: DataSourceBuilder<T>.() -> Unit,
): Unit = node(Name.parse(name), ActiveDataTree(typeOf<T>(), parent, block))

View File

@ -15,13 +15,12 @@
*/ */
package space.kscience.dataforge.data package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string import space.kscience.dataforge.meta.string
public interface GroupRule { public interface GroupRule {
public suspend fun <T : Any> gather(set: DataSet<T>): Map<String, DataSet<T>> public fun <T : Any> gather(set: DataSet<T>): Map<String, DataSet<T>>
public companion object { public companion object {
/** /**
@ -33,30 +32,42 @@ public interface GroupRule {
* @return * @return
*/ */
public fun byMetaValue( public fun byMetaValue(
scope: CoroutineScope,
key: String, key: String,
defaultTagValue: String, defaultTagValue: String,
): GroupRule = object : GroupRule { ): GroupRule = object : GroupRule {
override suspend fun <T : Any> gather( override fun <T : Any> gather(
set: DataSet<T>, set: DataSet<T>,
): Map<String, DataSet<T>> { ): Map<String, DataSet<T>> {
val map = HashMap<String, ActiveDataTree<T>>() val map = HashMap<String, DataSet<T>>()
if (set is DataSource) {
set.dataSequence().forEach { data -> set.dataSequence().forEach { data ->
val tagValue = data.meta[key]?.string ?: defaultTagValue val tagValue: String = data.meta[key]?.string ?: defaultTagValue
map.getOrPut(tagValue) { ActiveDataTree(set.dataType) }.data(data.name, data.data) (map.getOrPut(tagValue) { DataSourceBuilder(set.dataType, set.coroutineContext) } as DataSourceBuilder<T>)
} .data(data.name, data.data)
scope.launch { set.launch {
set.updates.collect { name -> set.updates.collect { name ->
val data = set.get(name) val dataUpdate = set[name]
@Suppress("NULLABLE_EXTENSION_OPERATOR_WITH_SAFE_CALL_RECEIVER") val updateTagValue = dataUpdate?.meta?.get(key)?.string ?: defaultTagValue
val tagValue = data?.meta?.get(key)?.string ?: defaultTagValue map.getOrPut(updateTagValue) {
map.getOrPut(tagValue) { ActiveDataTree(set.dataType) }.data(name, data) ActiveDataTree(set.dataType, this) {
data(name, dataUpdate)
} }
} }
}
}
}
} else {
set.dataSequence().forEach { data ->
val tagValue: String = data.meta[key]?.string ?: defaultTagValue
(map.getOrPut(tagValue) { StaticDataTree(set.dataType) } as StaticDataTree<T>)
.data(data.name, data.data)
}
}
return map return map
} }

View File

@ -1,6 +1,5 @@
package space.kscience.dataforge.data package space.kscience.dataforge.data
import kotlinx.coroutines.coroutineScope
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.* import space.kscience.dataforge.names.*
@ -17,7 +16,7 @@ internal class StaticDataTree<T : Any>(
override val items: Map<NameToken, DataTreeItem<T>> override val items: Map<NameToken, DataTreeItem<T>>
get() = _items.filter { !it.key.body.startsWith("@") } get() = _items.filter { !it.key.body.startsWith("@") }
override suspend fun remove(name: Name) { override fun remove(name: Name) {
when (name.length) { when (name.length) {
0 -> error("Can't remove root tree node") 0 -> error("Can't remove root tree node")
1 -> _items.remove(name.firstOrNull()!!) 1 -> _items.remove(name.firstOrNull()!!)
@ -36,7 +35,7 @@ internal class StaticDataTree<T : Any>(
else -> getOrCreateNode(name.cutLast()).getOrCreateNode(name.lastOrNull()!!.asName()) else -> getOrCreateNode(name.cutLast()).getOrCreateNode(name.lastOrNull()!!.asName())
} }
private suspend fun set(name: Name, item: DataTreeItem<T>?) { private fun set(name: Name, item: DataTreeItem<T>?) {
if (name.isEmpty()) error("Can't set top level tree node") if (name.isEmpty()) error("Can't set top level tree node")
if (item == null) { if (item == null) {
remove(name) remove(name)
@ -45,23 +44,21 @@ internal class StaticDataTree<T : Any>(
} }
} }
override suspend fun data(name: Name, data: Data<T>?) { override fun data(name: Name, data: Data<T>?) {
set(name, data?.let { DataTreeItem.Leaf(it) }) set(name, data?.let { DataTreeItem.Leaf(it) })
} }
override suspend fun node(name: Name, dataSet: DataSet<T>) { override fun node(name: Name, dataSet: DataSet<T>) {
if (dataSet is StaticDataTree) { if (dataSet is StaticDataTree) {
set(name, DataTreeItem.Node(dataSet)) set(name, DataTreeItem.Node(dataSet))
} else { } else {
coroutineScope {
dataSet.dataSequence().forEach { dataSet.dataSequence().forEach {
data(name + it.name, it.data) data(name + it.name, it.data)
} }
} }
} }
}
override suspend fun meta(name: Name, meta: Meta) { override fun meta(name: Name, meta: Meta) {
val item = getItem(name) val item = getItem(name)
if (item is DataTreeItem.Leaf) TODO("Can't change meta of existing leaf item.") if (item is DataTreeItem.Leaf) TODO("Can't change meta of existing leaf item.")
data(name + DataTree.META_ITEM_NAME_TOKEN, Data.empty(meta)) data(name + DataTree.META_ITEM_NAME_TOKEN, Data.empty(meta))
@ -69,17 +66,17 @@ internal class StaticDataTree<T : Any>(
} }
@Suppress("FunctionName") @Suppress("FunctionName")
public suspend fun <T : Any> DataTree( public inline fun <T : Any> DataTree(
dataType: KType, dataType: KType,
block: suspend DataSetBuilder<T>.() -> Unit, block: DataSetBuilder<T>.() -> Unit,
): DataTree<T> = StaticDataTree<T>(dataType).apply { block() } ): DataTree<T> = StaticDataTree<T>(dataType).apply { block() }
@Suppress("FunctionName") @Suppress("FunctionName")
public suspend inline fun <reified T : Any> DataTree( public inline fun <reified T : Any> DataTree(
noinline block: suspend DataSetBuilder<T>.() -> Unit, noinline block: DataSetBuilder<T>.() -> Unit,
): DataTree<T> = DataTree(typeOf<T>(), block) ): DataTree<T> = DataTree(typeOf<T>(), block)
@OptIn(DFExperimental::class) @OptIn(DFExperimental::class)
public suspend fun <T : Any> DataSet<T>.seal(): DataTree<T> = DataTree(dataType) { public fun <T : Any> DataSet<T>.seal(): DataTree<T> = DataTree(dataType) {
populateFrom(this@seal) populateFrom(this@seal)
} }

View File

@ -10,6 +10,8 @@ import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.isEmpty import space.kscience.dataforge.names.isEmpty
import space.kscience.dataforge.names.plus import space.kscience.dataforge.names.plus
import space.kscience.dataforge.names.removeHeadOrNull import space.kscience.dataforge.names.removeHeadOrNull
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KType import kotlin.reflect.KType
@ -17,34 +19,42 @@ import kotlin.reflect.KType
* A stateless filtered [DataSet] * A stateless filtered [DataSet]
*/ */
public fun <T : Any> DataSet<T>.filter( public fun <T : Any> DataSet<T>.filter(
predicate: (Name, Data<T>) -> Boolean, predicate: (Name, Meta) -> Boolean,
): ActiveDataSet<T> = object : ActiveDataSet<T> { ): DataSource<T> = object : DataSource<T> {
override val dataType: KType get() = this@filter.dataType override val dataType: KType get() = this@filter.dataType
override val coroutineContext: CoroutineContext
get() = (this@filter as? DataSource)?.coroutineContext ?: EmptyCoroutineContext
override val meta: Meta get() = this@filter.meta override val meta: Meta get() = this@filter.meta
override fun dataSequence(): Sequence<NamedData<T>> = override fun dataSequence(): Sequence<NamedData<T>> =
this@filter.dataSequence().filter { predicate(it.name, it.data) } this@filter.dataSequence().filter { predicate(it.name, it.meta) }
override fun get(name: Name): Data<T>? = this@filter.get(name)?.takeIf { override fun get(name: Name): Data<T>? = this@filter.get(name)?.takeIf {
predicate(name, it) predicate(name, it.meta)
} }
override val updates: Flow<Name> = this@filter.updates.filter flowFilter@{ name -> override val updates: Flow<Name> = this@filter.updates.filter flowFilter@{ name ->
val theData = this@filter.get(name) ?: return@flowFilter false val theData = this@filter[name] ?: return@flowFilter false
predicate(name, theData) predicate(name, theData.meta)
} }
} }
/** /**
* Generate a wrapper data set with a given name prefix appended to all names * Generate a wrapper data set with a given name prefix appended to all names
*/ */
public fun <T : Any> DataSet<T>.withNamePrefix(prefix: Name): DataSet<T> = if (prefix.isEmpty()) this public fun <T : Any> DataSet<T>.withNamePrefix(prefix: Name): DataSet<T> = if (prefix.isEmpty()) {
else object : ActiveDataSet<T> { this
} else object : DataSource<T> {
override val dataType: KType get() = this@withNamePrefix.dataType override val dataType: KType get() = this@withNamePrefix.dataType
override val coroutineContext: CoroutineContext
get() = (this@withNamePrefix as? DataSource)?.coroutineContext ?: EmptyCoroutineContext
override val meta: Meta get() = this@withNamePrefix.meta override val meta: Meta get() = this@withNamePrefix.meta
@ -62,9 +72,12 @@ else object : ActiveDataSet<T> {
*/ */
public fun <T : Any> DataSet<T>.branch(branchName: Name): DataSet<T> = if (branchName.isEmpty()) { public fun <T : Any> DataSet<T>.branch(branchName: Name): DataSet<T> = if (branchName.isEmpty()) {
this this
} else object : ActiveDataSet<T> { } else object : DataSource<T> {
override val dataType: KType get() = this@branch.dataType override val dataType: KType get() = this@branch.dataType
override val coroutineContext: CoroutineContext
get() = (this@branch as? DataSource)?.coroutineContext ?: EmptyCoroutineContext
override val meta: Meta get() = this@branch.meta override val meta: Meta get() = this@branch.meta
override fun dataSequence(): Sequence<NamedData<T>> = this@branch.dataSequence().mapNotNull { override fun dataSequence(): Sequence<NamedData<T>> = this@branch.dataSequence().mapNotNull {

View File

@ -144,7 +144,7 @@ public suspend fun <T : Any, R : Any> DataSet<T>.map(
metaTransform: MutableMeta.() -> Unit = {}, metaTransform: MutableMeta.() -> Unit = {},
block: suspend (T) -> R, block: suspend (T) -> R,
): DataTree<R> = DataTree<R>(outputType) { ): DataTree<R> = DataTree<R>(outputType) {
populateWith( populateFrom(
dataSequence().map { dataSequence().map {
val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal() val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal()
Data(outputType, newMeta, coroutineContext, listOf(it)) { Data(outputType, newMeta, coroutineContext, listOf(it)) {

View File

@ -0,0 +1,2 @@
package space.kscience.dataforge.data

View File

@ -6,6 +6,8 @@ import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.matches import space.kscience.dataforge.names.matches
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KType import kotlin.reflect.KType
import kotlin.reflect.full.isSubtypeOf import kotlin.reflect.full.isSubtypeOf
import kotlin.reflect.typeOf import kotlin.reflect.typeOf
@ -28,46 +30,47 @@ private fun <R : Any> Data<*>.castOrNull(type: KType): Data<R>? =
* Select all data matching given type and filters. Does not modify paths * Select all data matching given type and filters. Does not modify paths
* *
* @param namePattern a name match patter according to [Name.matches] * @param namePattern a name match patter according to [Name.matches]
* @param filter addition filtering condition based on item name and meta. By default, accepts all * @param predicate addition filtering condition based on item name and meta. By default, accepts all
*/ */
@OptIn(DFExperimental::class) @OptIn(DFExperimental::class)
public fun <R : Any> DataSet<*>.select( public fun <R : Any> DataSet<*>.filterIsInstance(
type: KType, type: KType,
namePattern: Name? = null, predicate: (name: Name, meta: Meta) -> Boolean = { _, _ -> true },
filter: (name: Name, meta: Meta) -> Boolean = { _, _ -> true }, ): DataSource<R> = object : DataSource<R> {
): ActiveDataSet<R> = object : ActiveDataSet<R> {
override val dataType = type override val dataType = type
override val meta: Meta get() = this@select.meta override val coroutineContext: CoroutineContext
get() = (this@filterIsInstance as? DataSource)?.coroutineContext ?: EmptyCoroutineContext
override val meta: Meta get() = this@filterIsInstance.meta
private fun checkDatum(name: Name, datum: Data<*>): Boolean = datum.type.isSubtypeOf(type) private fun checkDatum(name: Name, datum: Data<*>): Boolean = datum.type.isSubtypeOf(type)
&& (namePattern == null || name.matches(namePattern)) && predicate(name, datum.meta)
&& filter(name, datum.meta)
override fun dataSequence(): Sequence<NamedData<R>> = this@select.dataSequence().filter { override fun dataSequence(): Sequence<NamedData<R>> = this@filterIsInstance.dataSequence().filter {
checkDatum(it.name, it.data) checkDatum(it.name, it.data)
}.map { }.map {
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
it as NamedData<R> it as NamedData<R>
} }
override fun get(name: Name): Data<R>? = this@select[name]?.let { datum -> override fun get(name: Name): Data<R>? = this@filterIsInstance[name]?.let { datum ->
if (checkDatum(name, datum)) datum.castOrNull(type) else null if (checkDatum(name, datum)) datum.castOrNull(type) else null
} }
override val updates: Flow<Name> = this@select.updates.filter { override val updates: Flow<Name> = this@filterIsInstance.updates.filter { name ->
val datum = this@select[it] ?: return@filter false get(name)?.let { datum ->
checkDatum(it, datum) checkDatum(name, datum)
} ?: false
} }
} }
/** /**
* Select a single datum of the appropriate type * Select a single datum of the appropriate type
*/ */
public inline fun <reified R : Any> DataSet<*>.select( public inline fun <reified R : Any> DataSet<*>.filterIsInstance(
namePattern: Name? = null, noinline predicate: (name: Name, meta: Meta) -> Boolean = { _, _ -> true },
noinline filter: (name: Name, meta: Meta) -> Boolean = { _, _ -> true }, ): DataSet<R> = filterIsInstance(typeOf<R>(), predicate)
): DataSet<R> = select(typeOf<R>(), namePattern, filter)
/** /**
* Select a single datum if it is present and of given [type] * Select a single datum if it is present and of given [type]

View File

@ -10,24 +10,24 @@ import space.kscience.dataforge.names.plus
/** /**
* Append data to node * Append data to node
*/ */
context(DataSetBuilder<T>) public suspend infix fun <T : Any> String.put(data: Data<T>): Unit = context(DataSetBuilder<T>) public infix fun <T : Any> String.put(data: Data<T>): Unit =
data(Name.parse(this), data) data(Name.parse(this), data)
/** /**
* Append node * Append node
*/ */
context(DataSetBuilder<T>) public suspend infix fun <T : Any> String.put(dataSet: DataSet<T>): Unit = context(DataSetBuilder<T>) public infix fun <T : Any> String.put(dataSet: DataSet<T>): Unit =
node(Name.parse(this), dataSet) node(Name.parse(this), dataSet)
/** /**
* Build and append node * Build and append node
*/ */
context(DataSetBuilder<T>) public suspend infix fun <T : Any> String.put( context(DataSetBuilder<T>) public infix fun <T : Any> String.put(
block: suspend DataSetBuilder<T>.() -> Unit, block: DataSetBuilder<T>.() -> Unit,
): Unit = node(Name.parse(this), block) ): Unit = node(Name.parse(this), block)
/** /**
* Copy given data set and mirror its changes to this [ActiveDataTree] in [this@setAndObserve]. Returns an update [Job] * Copy given data set and mirror its changes to this [DataSourceBuilder] in [this@setAndObserve]. Returns an update [Job]
*/ */
context(DataSetBuilder<T>) public fun <T : Any> CoroutineScope.setAndWatch( context(DataSetBuilder<T>) public fun <T : Any> CoroutineScope.setAndWatch(
name: Name, name: Name,

View File

@ -1,44 +1,49 @@
package space.kscience.dataforge.data package space.kscience.dataforge.data
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.delay
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import space.kscience.dataforge.actions.Action import space.kscience.dataforge.actions.Action
import space.kscience.dataforge.actions.invoke
import space.kscience.dataforge.actions.map import space.kscience.dataforge.actions.map
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import kotlin.test.assertEquals import kotlin.test.assertEquals
@OptIn(DFExperimental::class) @OptIn(DFExperimental::class)
internal class ActionsTest { internal class ActionsTest {
private val data: DataTree<Int> = runBlocking { @Test
DataTree { fun testStaticMapAction() = runTest {
val data: DataTree<Int> = DataTree {
repeat(10) { repeat(10) {
static(it.toString(), it) static(it.toString(), it)
} }
} }
}
@Test
fun testStaticMapAction() {
val plusOne = Action.map<Int, Int> { val plusOne = Action.map<Int, Int> {
result { it + 1 } result { it + 1 }
} }
runBlocking { val result = plusOne(data)
val result = plusOne.execute(data)
assertEquals(2, result["1"]?.await()) assertEquals(2, result["1"]?.await())
} }
}
@Test @Test
fun testDynamicMapAction() { fun testDynamicMapAction() = runTest {
val data: DataSourceBuilder<Int> = ActiveDataTree()
val plusOne = Action.map<Int, Int> { val plusOne = Action.map<Int, Int> {
result { it + 1 } result { it + 1 }
} }
val datum = runBlocking { val result = plusOne(data)
val result = plusOne.execute(data, scope = this)
result["1"]?.await() repeat(10) {
data.static(it.toString(), it)
} }
assertEquals(2, datum)
delay(20)
assertEquals(2, result["1"]?.await())
data.close()
} }
} }

View File

@ -46,8 +46,8 @@ internal class DataTreeBuilderTest {
} }
runBlocking { runBlocking {
assertEquals("a", node.get("update.a")?.await()) assertEquals("a", node["update.a"]?.await())
assertEquals("a", node.get("primary.a")?.await()) assertEquals("a", node["primary.a"]?.await())
} }
} }

View File

@ -1,12 +1,10 @@
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import kotlinx.coroutines.CoroutineScope
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.ContextBuilder import space.kscience.dataforge.context.ContextBuilder
import space.kscience.dataforge.context.Global import space.kscience.dataforge.context.Global
import space.kscience.dataforge.data.ActiveDataTree import space.kscience.dataforge.data.*
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.DataSetBuilder
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaRepr import space.kscience.dataforge.meta.MetaRepr
import space.kscience.dataforge.meta.MutableMeta import space.kscience.dataforge.meta.MutableMeta
@ -17,8 +15,11 @@ import space.kscience.dataforge.misc.DFBuilder
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName import space.kscience.dataforge.names.asName
import kotlin.collections.HashMap
import kotlin.collections.set
import kotlin.properties.PropertyDelegateProvider import kotlin.properties.PropertyDelegateProvider
import kotlin.properties.ReadOnlyProperty import kotlin.properties.ReadOnlyProperty
import kotlin.reflect.typeOf
public data class TaskReference<T : Any>(public val taskName: Name, public val task: Task<T>) : DataSelector<T> { public data class TaskReference<T : Any>(public val taskName: Name, public val task: Task<T>) : DataSelector<T> {
@ -100,13 +101,13 @@ public class WorkspaceBuilder(private val parentContext: Context = Global) : Tas
/** /**
* Define intrinsic data for the workspace * Define intrinsic data for the workspace
*/ */
public suspend fun buildData(builder: suspend DataSetBuilder<Any>.() -> Unit) { public fun data(builder: DataSetBuilder<Any>.() -> Unit) {
data = DataTree(builder) data = DataTree(builder)
} }
@DFExperimental @DFExperimental
public suspend fun buildActiveData(builder: suspend ActiveDataTree<Any>.() -> Unit) { public fun buildActiveData(scope: CoroutineScope, builder: DataSourceBuilder<Any>.() -> Unit) {
data = ActiveDataTree(builder) data = ActiveDataTree(typeOf<Any>(), scope, builder)
} }
/** /**

View File

@ -1,21 +1,24 @@
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import kotlinx.coroutines.runBlocking
import space.kscience.dataforge.data.DataSet import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.DataSetBuilder import space.kscience.dataforge.data.filterIsInstance
import space.kscience.dataforge.data.select
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.matches
public fun WorkspaceBuilder.data(builder: suspend DataSetBuilder<Any>.() -> Unit): Unit = runBlocking { //public fun WorkspaceBuilder.data(builder: DataSetBuilder<Any>.() -> Unit): Unit = runBlocking {
buildData(builder) // data(builder)
//}
public inline fun <reified T : Any> TaskResultBuilder<*>.data(namePattern: Name? = null): DataSelector<T> =
object : DataSelector<T> {
override suspend fun select(workspace: Workspace, meta: Meta): DataSet<T> =
workspace.data.filterIsInstance { name, _ ->
namePattern == null || name.matches(namePattern)
} }
public inline fun <reified T: Any> TaskResultBuilder<*>.data(namePattern: Name? = null): DataSelector<T> = object : DataSelector<T> {
override suspend fun select(workspace: Workspace, meta: Meta): DataSet<T> = workspace.data.select(namePattern)
} }
public suspend inline fun <reified T : Any> TaskResultBuilder<*>.fromTask( public suspend inline fun <reified T : Any> TaskResultBuilder<*>.fromTask(
task: Name, task: Name,
taskMeta: Meta = Meta.EMPTY, taskMeta: Meta = Meta.EMPTY,
): DataSet<T> = workspace.produce(task, taskMeta).select() ): DataSet<T> = workspace.produce(task, taskMeta).filterIsInstance()

View File

@ -14,7 +14,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
override val tag: PluginTag = Companion.tag override val tag: PluginTag = Companion.tag
val allData by task<Int> { val allData by task<Int> {
val selectedData = workspace.data.select<Int>() val selectedData = workspace.data.filterIsInstance<Int>()
val result: Data<Int> = selectedData.dataSequence().foldToData(0) { result, data -> val result: Data<Int> = selectedData.dataSequence().foldToData(0) { result, data ->
result + data.await() result + data.await()
} }
@ -23,7 +23,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
val singleData by task<Int> { val singleData by task<Int> {
workspace.data.select<Int>()["myData[12]"]?.let { workspace.data.filterIsInstance<Int>()["myData[12]"]?.let {
data("result", it) data("result", it)
} }
} }

View File

@ -20,8 +20,7 @@ import kotlin.test.assertEquals
class FileDataTest { class FileDataTest {
val dataNode = runBlocking { val dataNode = DataTree<String> {
DataTree<String> {
node("dir") { node("dir") {
static("a", "Some string") { static("a", "Some string") {
"content" put "Some string" "content" put "Some string"
@ -32,7 +31,7 @@ class FileDataTest {
"content" put "This is root meta node" "content" put "This is root meta node"
} }
} }
}
object StringIOFormat : IOFormat<String> { object StringIOFormat : IOFormat<String> {

View File

@ -117,16 +117,16 @@ class SimpleWorkspaceTest {
} }
val averageByGroup by task<Int> { val averageByGroup by task<Int> {
val evenSum = workspace.data.filter { name, _ -> val evenSum = workspace.data.filterIsInstance<Int> { name, _ ->
name.toString().toInt() % 2 == 0 name.toString().toInt() % 2 == 0
}.select<Int>().foldToData(0) { l, r -> }.foldToData(0) { l, r ->
l + r.await() l + r.await()
} }
data("even", evenSum) data("even", evenSum)
val oddSum = workspace.data.filter { name, _ -> val oddSum = workspace.data.filterIsInstance<Int> { name, _ ->
name.toString().toInt() % 2 == 1 name.toString().toInt() % 2 == 1
}.select<Int>().foldToData(0) { l, r -> }.foldToData(0) { l, r ->
l + r.await() l + r.await()
} }
data("odd", oddSum) data("odd", oddSum)
@ -143,7 +143,7 @@ class SimpleWorkspaceTest {
} }
val customPipe by task<Int> { val customPipe by task<Int> {
workspace.data.select<Int>().forEach { data -> workspace.data.filterIsInstance<Int>().forEach { data ->
val meta = data.meta.toMutableMeta().apply { val meta = data.meta.toMutableMeta().apply {
"newValue" put 22 "newValue" put 22
} }