Refactor of data trees filtering

This commit is contained in:
Alexander Nozik 2024-04-02 10:04:46 +03:00
parent 104111f62d
commit e7db1cc763
37 changed files with 644 additions and 531 deletions

View File

@ -13,7 +13,9 @@
### Removed ### Removed
### Fixed ### Fixed
- `listOfScheme` and `listOfConvertable` delegates provides correct items order - `listOfScheme` and `listOfConvertable` delegates provides correct items order.
- Scheme meta setter works with proper sub-branch.
-
### Security ### Security

View File

@ -8,7 +8,7 @@ plugins {
allprojects { allprojects {
group = "space.kscience" group = "space.kscience"
version = "0.8.1-dev-1" version = "0.8.1"
} }
subprojects { subprojects {
@ -30,7 +30,7 @@ ksciencePublish {
useApache2Licence() useApache2Licence()
useSPCTeam() useSPCTeam()
} }
repository("spc","https://maven.sciprog.center/kscience") repository("spc", "https://maven.sciprog.center/kscience")
sonatype("https://oss.sonatype.org") sonatype("https://oss.sonatype.org")
} }

View File

@ -1,11 +1,14 @@
package space.kscience.dataforge.actions package space.kscience.dataforge.actions
import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch import space.kscience.dataforge.data.DataSink
import space.kscience.dataforge.data.* import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.DataUpdate
import space.kscience.dataforge.data.launchUpdate
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFInternal import space.kscience.dataforge.misc.UnsafeKType
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.startsWith import space.kscience.dataforge.names.startsWith
import kotlin.reflect.KType import kotlin.reflect.KType
@ -29,7 +32,7 @@ public abstract class AbstractAction<T, R>(
* Generate initial content of the output * Generate initial content of the output
*/ */
protected abstract fun DataSink<R>.generate( protected abstract fun DataSink<R>.generate(
data: DataTree<T>, source: DataTree<T>,
meta: Meta, meta: Meta,
) )
@ -40,35 +43,28 @@ public abstract class AbstractAction<T, R>(
* @param meta the metadata used for the whole data tree * @param meta the metadata used for the whole data tree
* @param updatedData an updated item * @param updatedData an updated item
*/ */
protected open fun DataSink<R>.update( protected open suspend fun DataSink<R>.update(
source: DataTree<T>, source: DataTree<T>,
meta: Meta, meta: Meta,
updatedData: NamedData<T>, updatedData: DataUpdate<T>,
) { ) {
//by default regenerate the whole data set //by default regenerate the whole data set
generate(source, meta) generate(source, meta)
} }
@OptIn(DFInternal::class) @OptIn(UnsafeKType::class)
override fun execute( override fun execute(
dataSet: DataTree<T>, source: DataTree<T>,
meta: Meta, meta: Meta,
): DataTree<R> = if (dataSet.isObservable()) { updatesScope: CoroutineScope
MutableDataTree<R>(outputType, dataSet.updatesScope).apply { ): DataTree<R> = DataTree(outputType) {
generate(dataSet, meta) generate(source, meta)
dataSet.updates().onEach { //propagate updates
update(dataSet, meta, it) launchUpdate(updatesScope) {
}.launchIn(updatesScope) source.updates.onEach { update ->
update(source, meta, update)
//close updates when the source is closed }.collect()
updatesScope.launch {
dataSet.awaitClose()
close()
}
}
} else {
DataTree(outputType) {
generate(dataSet, meta)
} }
} }
} }

View File

@ -1,5 +1,8 @@
package space.kscience.dataforge.actions package space.kscience.dataforge.actions
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import space.kscience.dataforge.data.DataTree import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
@ -13,7 +16,7 @@ public fun interface Action<T, R> {
* Transform the data in the node, producing a new node. By default, it is assumed that all calculations are lazy * Transform the data in the node, producing a new node. By default, it is assumed that all calculations are lazy
* so not actual computation is started at this moment. * so not actual computation is started at this moment.
*/ */
public fun execute(dataSet: DataTree<T>, meta: Meta): DataTree<R> public fun execute(source: DataTree<T>, meta: Meta, updatesScope: CoroutineScope): DataTree<R>
public companion object public companion object
} }
@ -21,23 +24,27 @@ public fun interface Action<T, R> {
/** /**
* A convenience method to transform data using given [action] * A convenience method to transform data using given [action]
*/ */
@OptIn(DelicateCoroutinesApi::class)
public fun <T, R> DataTree<T>.transform( public fun <T, R> DataTree<T>.transform(
action: Action<T, R>, action: Action<T, R>,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
): DataTree<R> = action.execute(this, meta) updateScope: CoroutineScope = GlobalScope,
): DataTree<R> = action.execute(this, meta, updateScope)
/** /**
* Action composition. The result is terminal if one of its parts is terminal * Action composition. The result is terminal if one of its parts is terminal
*/ */
public infix fun <T, I, R> Action<T, I>.then(action: Action<I, R>): Action<T, R> = Action { dataSet, meta -> public infix fun <T, I, R> Action<T, I>.then(action: Action<I, R>): Action<T, R> = Action { dataSet, meta, scope ->
action.execute(this@then.execute(dataSet, meta), meta) action.execute(this@then.execute(dataSet, meta, scope), meta, scope)
} }
@DFExperimental @DFExperimental
@OptIn(DelicateCoroutinesApi::class)
public operator fun <T, R> Action<T, R>.invoke( public operator fun <T, R> Action<T, R>.invoke(
dataSet: DataTree<T>, dataSet: DataTree<T>,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
): DataTree<R> = execute(dataSet, meta) updateScope: CoroutineScope = GlobalScope,
): DataTree<R> = execute(dataSet, meta, updateScope)

View File

@ -6,7 +6,7 @@ import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.meta.seal import space.kscience.dataforge.meta.seal
import space.kscience.dataforge.meta.toMutableMeta import space.kscience.dataforge.meta.toMutableMeta
import space.kscience.dataforge.misc.DFBuilder import space.kscience.dataforge.misc.DFBuilder
import space.kscience.dataforge.misc.DFInternal import space.kscience.dataforge.misc.UnsafeKType
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import kotlin.reflect.KType import kotlin.reflect.KType
import kotlin.reflect.typeOf import kotlin.reflect.typeOf
@ -54,7 +54,12 @@ internal class MapAction<T, R>(
private val block: MapActionBuilder<T, R>.() -> Unit, private val block: MapActionBuilder<T, R>.() -> Unit,
) : AbstractAction<T, R>(outputType) { ) : AbstractAction<T, R>(outputType) {
private fun DataSink<R>.mapOne(name: Name, data: Data<T>, meta: Meta) { private fun DataSink<R>.mapOne(name: Name, data: Data<T>?, meta: Meta) {
//fast return for null data
if (data == null) {
put(name, null)
return
}
// Creating a new environment for action using **old** name, old meta and task meta // Creating a new environment for action using **old** name, old meta and task meta
val env = ActionEnv(name, data.meta, meta) val env = ActionEnv(name, data.meta, meta)
@ -73,7 +78,7 @@ internal class MapAction<T, R>(
//getting new meta //getting new meta
val newMeta = builder.meta.seal() val newMeta = builder.meta.seal()
@OptIn(DFInternal::class) @OptIn(UnsafeKType::class)
val newData = Data(builder.outputType, newMeta, dependencies = listOf(data)) { val newData = Data(builder.outputType, newMeta, dependencies = listOf(data)) {
builder.result(env, data.await()) builder.result(env, data.await())
} }
@ -81,12 +86,18 @@ internal class MapAction<T, R>(
put(newName, newData) put(newName, newData)
} }
override fun DataSink<R>.generate(data: DataTree<T>, meta: Meta) { override fun DataSink<R>.generate(source: DataTree<T>, meta: Meta) {
data.forEach { mapOne(it.name, it.data, meta) } source.forEach { mapOne(it.name, it.data, meta) }
} }
override fun DataSink<R>.update(source: DataTree<T>, meta: Meta, updatedData: NamedData<T>) {
mapOne(updatedData.name, updatedData.data, updatedData.meta)
override suspend fun DataSink<R>.update(
source: DataTree<T>,
meta: Meta,
updatedData: DataUpdate<T>,
) {
mapOne(updatedData.name, updatedData.data, meta)
} }
} }

View File

@ -4,7 +4,7 @@ import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.misc.DFBuilder import space.kscience.dataforge.misc.DFBuilder
import space.kscience.dataforge.misc.DFInternal import space.kscience.dataforge.misc.UnsafeKType
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.parseAsName import space.kscience.dataforge.names.parseAsName
import kotlin.reflect.KType import kotlin.reflect.KType
@ -84,8 +84,8 @@ internal class ReduceAction<T, R>(
) : AbstractAction<T, R>(outputType) { ) : AbstractAction<T, R>(outputType) {
//TODO optimize reduction. Currently, the whole action recalculates on push //TODO optimize reduction. Currently, the whole action recalculates on push
override fun DataSink<R>.generate(data: DataTree<T>, meta: Meta) { override fun DataSink<R>.generate(source: DataTree<T>, meta: Meta) {
ReduceGroupBuilder<T, R>(meta, outputType).apply(action).buildGroups(data).forEach { group -> ReduceGroupBuilder<T, R>(meta, outputType).apply(action).buildGroups(source).forEach { group ->
val dataFlow: Map<Name, Data<T>> = group.set.asSequence().fold(HashMap()) { acc, value -> val dataFlow: Map<Name, Data<T>> = group.set.asSequence().fold(HashMap()) { acc, value ->
acc.apply { acc.apply {
acc[value.name] = value.data acc[value.name] = value.data
@ -97,7 +97,7 @@ internal class ReduceAction<T, R>(
val groupMeta = group.meta val groupMeta = group.meta
val env = ActionEnv(groupName.parseAsName(), groupMeta, meta) val env = ActionEnv(groupName.parseAsName(), groupMeta, meta)
@OptIn(DFInternal::class) val res: Data<R> = dataFlow.reduceToData( @OptIn(UnsafeKType::class) val res: Data<R> = dataFlow.reduceToData(
group.outputType, group.outputType,
meta = groupMeta meta = groupMeta
) { group.result.invoke(env, it) } ) { group.result.invoke(env, it) }

View File

@ -48,10 +48,10 @@ internal class SplitAction<T, R>(
private val action: SplitBuilder<T, R>.() -> Unit, private val action: SplitBuilder<T, R>.() -> Unit,
) : AbstractAction<T, R>(outputType) { ) : AbstractAction<T, R>(outputType) {
private fun DataSink<R>.splitOne(name: Name, data: Data<T>, meta: Meta) { private fun DataSink<R>.splitOne(name: Name, data: Data<T>?, meta: Meta) {
val laminate = Laminate(data.meta, meta) val laminate = Laminate(data?.meta, meta)
val split = SplitBuilder<T, R>(name, data.meta).apply(action) val split = SplitBuilder<T, R>(name, data?.meta ?: Meta.EMPTY).apply(action)
// apply individual fragment rules to result // apply individual fragment rules to result
@ -63,21 +63,29 @@ internal class SplitAction<T, R>(
).apply(rule) ).apply(rule)
//data.map<R>(outputType, meta = env.meta) { env.result(it) }.named(fragmentName) //data.map<R>(outputType, meta = env.meta) { env.result(it) }.named(fragmentName)
put( if (data == null) {
fragmentName, put(fragmentName, null)
@Suppress("OPT_IN_USAGE") Data(outputType, meta = env.meta, dependencies = listOf(data)) { } else {
env.result(data.await()) put(
} fragmentName,
) @Suppress("OPT_IN_USAGE") Data(outputType, meta = env.meta, dependencies = listOf(data)) {
env.result(data.await())
}
)
}
} }
} }
override fun DataSink<R>.generate(data: DataTree<T>, meta: Meta) { override fun DataSink<R>.generate(source: DataTree<T>, meta: Meta) {
data.forEach { splitOne(it.name, it.data, meta) } source.forEach { splitOne(it.name, it.data, meta) }
} }
override fun DataSink<R>.update(source: DataTree<T>, meta: Meta, updatedData: NamedData<T>) { override suspend fun DataSink<R>.update(
splitOne(updatedData.name, updatedData.data, updatedData.meta) source: DataTree<T>,
meta: Meta,
updatedData: DataUpdate<T>,
) {
splitOne(updatedData.name, updatedData.data, meta)
} }
} }

View File

@ -4,8 +4,8 @@ import kotlinx.coroutines.*
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaRepr import space.kscience.dataforge.meta.MetaRepr
import space.kscience.dataforge.meta.isEmpty import space.kscience.dataforge.meta.isEmpty
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.misc.DfType import space.kscience.dataforge.misc.DfType
import space.kscience.dataforge.misc.UnsafeKType
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KType import kotlin.reflect.KType
@ -41,7 +41,7 @@ public interface Data<out T> : Goal<T>, MetaRepr {
*/ */
internal val TYPE_OF_NOTHING: KType = typeOf<Unit>() internal val TYPE_OF_NOTHING: KType = typeOf<Unit>()
public inline fun <reified T> static( public inline fun <reified T> wrapValue(
value: T, value: T,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
): Data<T> = StaticData(typeOf<T>(), value, meta) ): Data<T> = StaticData(typeOf<T>(), value, meta)
@ -50,10 +50,10 @@ public interface Data<out T> : Goal<T>, MetaRepr {
* An empty data containing only meta * An empty data containing only meta
*/ */
@OptIn(DelicateCoroutinesApi::class) @OptIn(DelicateCoroutinesApi::class)
public fun empty(meta: Meta): Data<Nothing> = object : Data<Nothing> { public fun buildEmpty(meta: Meta): Data<Nothing> = object : Data<Nothing> {
override val type: KType = TYPE_OF_NOTHING override val type: KType get() = TYPE_OF_NOTHING
override val meta: Meta = meta override val meta: Meta = meta
override val dependencies: Collection<Goal<*>> = emptyList() override val dependencies: Collection<Goal<*>> get() = emptyList()
override val deferred: Deferred<Nothing> override val deferred: Deferred<Nothing>
get() = GlobalScope.async(start = CoroutineStart.LAZY) { get() = GlobalScope.async(start = CoroutineStart.LAZY) {
error("The Data is empty and could not be computed") error("The Data is empty and could not be computed")
@ -62,6 +62,8 @@ public interface Data<out T> : Goal<T>, MetaRepr {
override fun async(coroutineScope: CoroutineScope): Deferred<Nothing> = deferred override fun async(coroutineScope: CoroutineScope): Deferred<Nothing> = deferred
override fun reset() {} override fun reset() {}
} }
public val EMPTY: Data<Nothing> = buildEmpty(Meta.EMPTY)
} }
} }
@ -87,7 +89,7 @@ public class StaticData<T>(
public inline fun <reified T> Data(value: T, meta: Meta = Meta.EMPTY): StaticData<T> = public inline fun <reified T> Data(value: T, meta: Meta = Meta.EMPTY): StaticData<T> =
StaticData(typeOf<T>(), value, meta) StaticData(typeOf<T>(), value, meta)
@DFInternal @UnsafeKType
public fun <T> Data( public fun <T> Data(
type: KType, type: KType,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
@ -96,7 +98,7 @@ public fun <T> Data(
block: suspend () -> T, block: suspend () -> T,
): Data<T> = LazyData(type, meta, context, dependencies, block) ): Data<T> = LazyData(type, meta, context, dependencies, block)
@OptIn(DFInternal::class) @OptIn(UnsafeKType::class)
public inline fun <reified T> Data( public inline fun <reified T> Data(
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
context: CoroutineContext = EmptyCoroutineContext, context: CoroutineContext = EmptyCoroutineContext,

View File

@ -1,28 +1,30 @@
package space.kscience.dataforge.data package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filter
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.plus
import kotlin.reflect.KType import kotlin.reflect.KType
public fun interface DataFilter { public fun interface DataFilter {
public fun accepts(name: Name, meta: Meta, type: KType): Boolean public fun accepts(name: Name, meta: Meta?, type: KType): Boolean
public companion object { public companion object {
public val EMPTY: DataFilter = DataFilter { _, _, _ -> true } public val EMPTY: DataFilter = DataFilter { _, _, _ -> true }
} }
} }
public fun DataFilter.accepts(data: NamedData<*>): Boolean = accepts(data.name, data.meta, data.type)
public fun <T> Sequence<NamedData<T>>.filterData(predicate: DataFilter): Sequence<NamedData<T>> = filter { data -> public fun DataFilter.accepts(update: DataUpdate<*>): Boolean = accepts(update.name, update.data?.meta, update.type)
public fun <T, DU : DataUpdate<T>> Sequence<DU>.filterData(predicate: DataFilter): Sequence<DU> = filter { data ->
predicate.accepts(data) predicate.accepts(data)
} }
public fun <T> Flow<NamedData<T>>.filterData(predicate: DataFilter): Flow<NamedData<T>> = filter { data -> public fun <T, DU : DataUpdate<T>> Flow<DU>.filterData(predicate: DataFilter): Flow<DU> = filter { data ->
predicate.accepts(data) predicate.accepts(data)
} }
@ -41,7 +43,8 @@ public fun <T> DataSource<T>.filterData(
public fun <T> ObservableDataSource<T>.filterData( public fun <T> ObservableDataSource<T>.filterData(
predicate: DataFilter, predicate: DataFilter,
): ObservableDataSource<T> = object : ObservableDataSource<T> { ): ObservableDataSource<T> = object : ObservableDataSource<T> {
override fun updates(): Flow<NamedData<T>> = this@filterData.updates().filter { predicate.accepts(it) } override val updates: Flow<DataUpdate<T>>
get() = this@filterData.updates.filter { predicate.accepts(it) }
override val dataType: KType get() = this@filterData.dataType override val dataType: KType get() = this@filterData.dataType
@ -49,14 +52,32 @@ public fun <T> ObservableDataSource<T>.filterData(
this@filterData.read(name)?.takeIf { predicate.accepts(name, it.meta, it.type) } this@filterData.read(name)?.takeIf { predicate.accepts(name, it.meta, it.type) }
} }
public fun <T> GenericDataTree<T, *>.filterData( internal class FilteredDataTree<T>(
predicate: DataFilter, val source: DataTree<T>,
): DataTree<T> = asSequence().filterData(predicate).toTree(dataType) val filter: DataFilter,
val branch: Name,
override val dataType: KType = source.dataType,
) : DataTree<T> {
public fun <T> GenericObservableDataTree<T, *>.filterData( override val data: Data<T>?
scope: CoroutineScope, get() = source[branch].takeIf {
filter.accepts(Name.EMPTY, data?.meta, data?.type ?: dataType)
}
override val items: Map<NameToken, DataTree<T>>
get() = source.branch(branch)?.items
?.mapValues { FilteredDataTree(source, filter, branch + it.key) }
?.filter { !it.value.isEmpty() }
?: emptyMap()
override val updates: Flow<DataUpdate<T>>
get() = source.updates.filter { filter.accepts(it) }
}
public fun <T> DataTree<T>.filterData(
predicate: DataFilter, predicate: DataFilter,
): ObservableDataTree<T> = asSequence().filterData(predicate).toObservableTree(dataType, scope, updates().filterData(predicate)) ): DataTree<T> = FilteredDataTree(this, predicate, Name.EMPTY)
///** ///**

View File

@ -0,0 +1,157 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.launch
import space.kscience.dataforge.misc.UnsafeKType
import space.kscience.dataforge.names.*
import kotlin.reflect.KType
import kotlin.reflect.typeOf
public interface DataSink<in T> {
/**
* Put data without notification
*/
public fun put(name: Name, data: Data<T>?)
/**
* Put data and propagate changes downstream
*/
public suspend fun update(name: Name, data: Data<T>?)
}
/**
* Launch continuous update using
*/
public fun <T> DataSink<T>.launchUpdate(
scope: CoroutineScope,
updater: suspend DataSink<T>.() -> Unit,
): Job = scope.launch {
object : DataSink<T> {
override fun put(name: Name, data: Data<T>?) {
launch {
this@launchUpdate.update(name, data)
}
}
override suspend fun update(name: Name, data: Data<T>?) {
this@launchUpdate.update(name, data)
}
}.updater()
}
/**
* A mutable version of [DataTree]
*/
public interface MutableDataTree<T> : DataTree<T>, DataSink<T> {
override var data: Data<T>?
override val items: Map<NameToken, MutableDataTree<T>>
public fun getOrCreateItem(token: NameToken): MutableDataTree<T>
public operator fun set(token: NameToken, data: Data<T>?)
override fun put(name: Name, data: Data<T>?): Unit = set(name, data)
}
public tailrec operator fun <T> MutableDataTree<T>.set(name: Name, data: Data<T>?): Unit {
when (name.length) {
0 -> this.data = data
1 -> set(name.first(), data)
else -> getOrCreateItem(name.first())[name.cutFirst()] = data
}
}
/**
* Provide a mutable subtree if it exists
*/
public tailrec fun <T> MutableDataTree<T>.branch(name: Name): MutableDataTree<T>? =
when (name.length) {
0 -> this
1 -> items[name.first()]
else -> items[name.first()]?.branch(name.cutFirst())
}
private class MutableDataTreeRoot<T>(
override val dataType: KType,
) : MutableDataTree<T> {
override val updates = MutableSharedFlow<DataUpdate<T>>()
inner class MutableDataTreeBranch(val branchName: Name) : MutableDataTree<T> {
override var data: Data<T>? = null
override val items = HashMap<NameToken, MutableDataTree<T>>()
override val updates: Flow<DataUpdate<T>> = this@MutableDataTreeRoot.updates.mapNotNull { update ->
update.name.removeFirstOrNull(branchName)?.let {
DataUpdate(update.data?.type ?: dataType, it, update.data)
}
}
override val dataType: KType get() = this@MutableDataTreeRoot.dataType
override fun getOrCreateItem(token: NameToken): MutableDataTree<T> =
items.getOrPut(token) { MutableDataTreeBranch(branchName + token) }
override fun set(token: NameToken, data: Data<T>?) {
val subTree = getOrCreateItem(token)
subTree.data = data
}
override suspend fun update(name: Name, data: Data<T>?) {
if (name.isEmpty()) {
this.data = data
} else {
getOrCreateItem(name.first()).update(name.cutFirst(), data)
}
this@MutableDataTreeRoot.updates.emit(DataUpdate(data?.type ?: dataType, branchName + name, data))
}
}
override var data: Data<T>? = null
override val items = HashMap<NameToken, MutableDataTree<T>>()
override fun getOrCreateItem(token: NameToken): MutableDataTree<T> = items.getOrPut(token) {
MutableDataTreeRoot(dataType)
}
override fun set(token: NameToken, data: Data<T>?) {
val subTree = getOrCreateItem(token)
subTree.data = data
}
override suspend fun update(name: Name, data: Data<T>?) {
if (name.isEmpty()) {
this.data = data
} else {
getOrCreateItem(name.first()).update(name.cutFirst(), data)
}
updates.emit(DataUpdate(data?.type ?: dataType, name, data))
}
}
/**
* Create a new [MutableDataTree]
*/
@UnsafeKType
public fun <T> MutableDataTree(
type: KType,
): MutableDataTree<T> = MutableDataTreeRoot<T>(type)
/**
* Create and initialize a observable mutable data tree.
*/
@OptIn(UnsafeKType::class)
public inline fun <reified T> MutableDataTree(
generator: MutableDataTree<T>.() -> Unit = {},
): MutableDataTree<T> = MutableDataTree<T>(typeOf<T>()).apply { generator() }

View File

@ -1,9 +1,8 @@
package space.kscience.dataforge.data package space.kscience.dataforge.data
import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.emptyFlow
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.* import space.kscience.dataforge.names.*
import kotlin.contracts.contract import kotlin.contracts.contract
import kotlin.reflect.KType import kotlin.reflect.KType
@ -33,46 +32,42 @@ public interface ObservableDataSource<out T> : DataSource<T> {
/** /**
* Flow updates made to the data * Flow updates made to the data
*/ */
public fun updates(): Flow<NamedData<T>> public val updates: Flow<DataUpdate<T>>
} }
/** /**
* A tree like structure for data holding * A tree like structure for data holding
*/ */
public interface GenericDataTree<out T, out TR : GenericDataTree<T, TR>> : DataSource<T> { public interface DataTree<out T> : ObservableDataSource<T> {
public val self: TR
public val data: Data<T>? public val data: Data<T>?
public val items: Map<NameToken, TR> public val items: Map<NameToken, DataTree<T>>
override fun read(name: Name): Data<T>? = when (name.length) { override fun read(name: Name): Data<T>? = when (name.length) {
0 -> data 0 -> data
else -> items[name.first()]?.read(name.cutFirst()) else -> items[name.first()]?.read(name.cutFirst())
} }
/**
* Flow updates made to the data
*/
override val updates: Flow<DataUpdate<T>>
public companion object { public companion object {
private object EmptyDataTree : GenericDataTree<Nothing, EmptyDataTree> { private object EmptyDataTree :
override val self: EmptyDataTree get() = this DataTree<Nothing> {
override val data: Data<Nothing>? = null override val data: Data<Nothing>? = null
override val items: Map<NameToken, EmptyDataTree> = emptyMap() override val items: Map<NameToken, EmptyDataTree> = emptyMap()
override val dataType: KType = typeOf<Unit>() override val dataType: KType = typeOf<Unit>()
override fun read(name: Name): Data<Nothing>? = null override fun read(name: Name): Data<Nothing>? = null
override val updates: Flow<DataUpdate<Nothing>> get() = emptyFlow()
} }
public val EMPTY: GenericDataTree<Nothing, *> = EmptyDataTree public val EMPTY: DataTree<Nothing> = EmptyDataTree
} }
} }
public typealias DataTree<T> = GenericDataTree<T, GenericDataTree<T, *>>
/**
* Return a single data in this tree. Throw error if it is not single.
*/
public fun <T> DataTree<T>.single(): NamedData<T> = asSequence().single()
/** /**
* An alias for easier access to tree values * An alias for easier access to tree values
*/ */
@ -94,11 +89,13 @@ public fun <T> DataTree<T>.asSequence(
} }
/** /**
* Walk the data tree depth-first * Walk the data tree depth-first.
*
* @return a [Sequence] of pairs [Name]-[DataTree] for all nodes including the root one.
*/ */
private fun <T, TR: GenericDataTree<T,TR>> TR.walk( public fun <T> DataTree<T>.walk(
namePrefix: Name = Name.EMPTY, namePrefix: Name = Name.EMPTY,
): Sequence<Pair<Name,TR>> = sequence { ): Sequence<Pair<Name, DataTree<T>>> = sequence {
yield(namePrefix to this@walk) yield(namePrefix to this@walk)
items.forEach { (token, tree) -> items.forEach { (token, tree) ->
yieldAll(tree.walk(namePrefix + token)) yieldAll(tree.walk(namePrefix + token))
@ -110,233 +107,25 @@ public val DataTree<*>.meta: Meta? get() = data?.meta
/** /**
* Provide subtree if it exists * Provide subtree if it exists
*/ */
public tailrec fun <T, TR : GenericDataTree<T, TR>> GenericDataTree<T, TR>.branch(name: Name): TR? = public tailrec fun <T> DataTree<T>.branch(name: Name): DataTree<T>? =
when (name.length) { when (name.length) {
0 -> self 0 -> this
1 -> items[name.first()] 1 -> items[name.first()]
else -> items[name.first()]?.branch(name.cutFirst()) else -> items[name.first()]?.branch(name.cutFirst())
} }
public fun <T, TR : GenericDataTree<T, TR>> GenericDataTree<T, TR>.branch(name: String): TR? = public fun <T> DataTree<T>.branch(name: String): DataTree<T>? =
branch(name.parseAsName()) branch(name.parseAsName())
public fun GenericDataTree<*, *>.isEmpty(): Boolean = data == null && items.isEmpty() public fun DataTree<*>.isEmpty(): Boolean = data == null && items.isEmpty()
@PublishedApi
internal class FlatDataTree<T>(
override val dataType: KType,
private val dataSet: Map<Name, Data<T>>,
private val prefix: Name,
) : GenericDataTree<T, FlatDataTree<T>> {
override val self: FlatDataTree<T> get() = this
override val data: Data<T>? get() = dataSet[prefix]
override val items: Map<NameToken, FlatDataTree<T>>
get() = dataSet.keys
.filter { it.startsWith(prefix) && it.length > prefix.length }
.map { it.tokens[prefix.length] }
.associateWith { FlatDataTree(dataType, dataSet, prefix + it) }
override fun read(name: Name): Data<T>? = dataSet[prefix + name]
}
/**
* Represent this flat data map as a [DataTree] without copying it
*/
public inline fun <reified T> Map<Name, Data<T>>.asTree(): DataTree<T> = FlatDataTree(typeOf<T>(), this, Name.EMPTY)
internal fun <T> Sequence<NamedData<T>>.toTree(type: KType): DataTree<T> =
FlatDataTree(type, associate { it.name to it.data }, Name.EMPTY)
/**
* Collect a sequence of [NamedData] to a [DataTree]
*/
public inline fun <reified T> Sequence<NamedData<T>>.toTree(): DataTree<T> =
FlatDataTree(typeOf<T>(), associate { it.name to it.data }, Name.EMPTY)
public interface GenericObservableDataTree<out T, out TR : GenericObservableDataTree<T, TR>> :
GenericDataTree<T, TR>, ObservableDataSource<T>, AutoCloseable {
/**
* A scope that is used to propagate updates. When this scope is closed, no new updates could arrive.
*/
public val updatesScope: CoroutineScope
/**
* Close this data tree updates channel
*/
override fun close() {
updatesScope.cancel()
}
}
public typealias ObservableDataTree<T> = GenericObservableDataTree<T, GenericObservableDataTree<T, *>>
/** /**
* Check if the [DataTree] is observable * Check if the [DataTree] is observable
*/ */
public fun <T> DataTree<T>.isObservable(): Boolean { public fun <T> DataSource<T>.isObservable(): Boolean {
contract { contract {
returns(true) implies (this@isObservable is GenericObservableDataTree<T, *>) returns(true) implies (this@isObservable is ObservableDataSource<T>)
} }
return this is GenericObservableDataTree<T, *> return this is ObservableDataSource<T>
} }
/**
* Wait for this data tree to stop spawning updates (updatesScope is closed).
* If this [DataTree] is not observable, return immediately.
*/
public suspend fun <T> DataTree<T>.awaitClose() {
if (isObservable()) {
updatesScope.coroutineContext[Job]?.join()
}
}
public fun <T> DataTree<T>.updates(): Flow<NamedData<T>> =
if (this is GenericObservableDataTree<T, *>) updates() else emptyFlow()
public fun interface DataSink<in T> {
public fun put(name: Name, data: Data<T>?)
}
@DFInternal
public class DataTreeBuilder<T>(private val type: KType) : DataSink<T> {
private val map = HashMap<Name, Data<T>>()
override fun put(name: Name, data: Data<T>?) {
if (data == null) {
map.remove(name)
} else {
map[name] = data
}
}
public fun build(): DataTree<T> = FlatDataTree(type, map, Name.EMPTY)
}
@DFInternal
public inline fun <T> DataTree(
dataType: KType,
generator: DataSink<T>.() -> Unit,
): DataTree<T> = DataTreeBuilder<T>(dataType).apply(generator).build()
/**
* Create and a data tree.
*/
@OptIn(DFInternal::class)
public inline fun <reified T> DataTree(
generator: DataSink<T>.() -> Unit,
): DataTree<T> = DataTreeBuilder<T>(typeOf<T>()).apply(generator).build()
/**
* A mutable version of [GenericDataTree]
*/
public interface MutableDataTree<T> : GenericObservableDataTree<T, MutableDataTree<T>>, DataSink<T> {
override var data: Data<T>?
override val items: Map<NameToken, MutableDataTree<T>>
public fun getOrCreateItem(token: NameToken): MutableDataTree<T>
public operator fun set(token: NameToken, data: Data<T>?)
override fun put(name: Name, data: Data<T>?): Unit = set(name, data)
}
public tailrec operator fun <T> MutableDataTree<T>.set(name: Name, data: Data<T>?): Unit {
when (name.length) {
0 -> this.data = data
1 -> set(name.first(), data)
else -> getOrCreateItem(name.first())[name.cutFirst()] = data
}
}
private class MutableDataTreeImpl<T>(
override val dataType: KType,
override val updatesScope: CoroutineScope,
) : MutableDataTree<T> {
private val updates = MutableSharedFlow<NamedData<T>>()
private val children = HashMap<NameToken, MutableDataTree<T>>()
override var data: Data<T>? = null
set(value) {
if (!updatesScope.isActive) error("Can't send updates to closed MutableDataTree")
field = value
if (value != null) {
updatesScope.launch {
updates.emit(value.named(Name.EMPTY))
}
}
}
override val items: Map<NameToken, MutableDataTree<T>> get() = children
override fun getOrCreateItem(token: NameToken): MutableDataTree<T> = children.getOrPut(token){
MutableDataTreeImpl(dataType, updatesScope)
}
override val self: MutableDataTree<T> get() = this
override fun set(token: NameToken, data: Data<T>?) {
if (!updatesScope.isActive) error("Can't send updates to closed MutableDataTree")
val subTree = getOrCreateItem(token)
subTree.updates().onEach {
updates.emit(it.named(token + it.name))
}.launchIn(updatesScope)
subTree.data = data
}
override fun updates(): Flow<NamedData<T>> = updates
}
/**
* Create a new [MutableDataTree]
*
* @param parentScope a [CoroutineScope] to control data propagation. By default uses [GlobalScope]
*/
@OptIn(DelicateCoroutinesApi::class)
public fun <T> MutableDataTree(
type: KType,
parentScope: CoroutineScope = GlobalScope,
): MutableDataTree<T> = MutableDataTreeImpl<T>(
type,
CoroutineScope(parentScope.coroutineContext + Job(parentScope.coroutineContext[Job]))
)
/**
* Create and initialize a observable mutable data tree.
*/
@OptIn(DelicateCoroutinesApi::class)
public inline fun <reified T> MutableDataTree(
parentScope: CoroutineScope = GlobalScope,
generator: MutableDataTree<T>.() -> Unit = {},
): MutableDataTree<T> = MutableDataTree<T>(typeOf<T>(), parentScope).apply { generator() }
//@DFInternal
//public fun <T> ObservableDataTree(
// type: KType,
// scope: CoroutineScope,
// generator: suspend MutableDataTree<T>.() -> Unit = {},
//): ObservableDataTree<T> = MutableDataTree<T>(type, scope.coroutineContext).apply(generator)
public inline fun <reified T> ObservableDataTree(
parentScope: CoroutineScope,
generator: MutableDataTree<T>.() -> Unit = {},
): ObservableDataTree<T> = MutableDataTree<T>(typeOf<T>(), parentScope).apply(generator)
/**
* Collect a [Sequence] into an observable tree with additional [updates]
*/
public fun <T> Sequence<NamedData<T>>.toObservableTree(
dataType: KType,
parentScope: CoroutineScope,
updates: Flow<NamedData<T>>,
): ObservableDataTree<T> = MutableDataTree<T>(dataType, parentScope).apply {
this.putAll(this@toObservableTree)
updates.onEach {
put(it.name, it.data)
}.launchIn(updatesScope)
}

View File

@ -17,7 +17,7 @@ package space.kscience.dataforge.data
import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFInternal import space.kscience.dataforge.misc.UnsafeKType
public interface GroupRule { public interface GroupRule {
public fun <T> gather(set: DataTree<T>): Map<String, DataTree<T>> public fun <T> gather(set: DataTree<T>): Map<String, DataTree<T>>
@ -31,7 +31,7 @@ public interface GroupRule {
* @param defaultTagValue * @param defaultTagValue
* @return * @return
*/ */
@OptIn(DFInternal::class) @OptIn(UnsafeKType::class)
public fun byMetaValue( public fun byMetaValue(
key: String, key: String,
defaultTagValue: String, defaultTagValue: String,
@ -40,15 +40,15 @@ public interface GroupRule {
override fun <T> gather( override fun <T> gather(
set: DataTree<T>, set: DataTree<T>,
): Map<String, DataTree<T>> { ): Map<String, DataTree<T>> {
val map = HashMap<String, DataTreeBuilder<T>>() val map = HashMap<String, MutableDataTree<T>>()
set.forEach { data -> set.forEach { data ->
val tagValue: String = data.meta[key]?.string ?: defaultTagValue val tagValue: String = data.meta[key]?.string ?: defaultTagValue
map.getOrPut(tagValue) { DataTreeBuilder(set.dataType) }.put(data.name, data.data) map.getOrPut(tagValue) { MutableDataTree(set.dataType) }.put(data.name, data.data)
} }
return map.mapValues { it.value.build() } return map
} }
} }
} }

View File

@ -20,4 +20,4 @@ public fun <T> Data<T>.withMeta(newMeta: Meta): Data<T> = if (this is MetaMaskDa
* Create a new [Data] with the same computation, but different meta. The meta is created by applying [block] to * Create a new [Data] with the same computation, but different meta. The meta is created by applying [block] to
* the existing data meta. * the existing data meta.
*/ */
public inline fun <T> Data<T>.mapMeta(block: MutableMeta.() -> Unit): Data<T> = withMeta(meta.copy(block)) public inline fun <T> Data<T>.withMeta(block: MutableMeta.() -> Unit): Data<T> = withMeta(meta.copy(block))

View File

@ -3,10 +3,30 @@ package space.kscience.dataforge.data
import space.kscience.dataforge.meta.isEmpty import space.kscience.dataforge.meta.isEmpty
import space.kscience.dataforge.misc.Named import space.kscience.dataforge.misc.Named
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import kotlin.reflect.KType
public interface NamedData<out T> : Named, Data<T> { /**
* An interface implementing a data update event.
*
* If [data] is null, then corresponding element should be removed.
*/
public interface DataUpdate<out T> : Named {
public val type: KType
override val name: Name override val name: Name
public val data: Data<T> public val data: Data<T>?
}
public fun <T> DataUpdate(type: KType, name: Name, data: Data<T>?): DataUpdate<T> = object : DataUpdate<T> {
override val type: KType = type
override val name: Name = name
override val data: Data<T>? = data
}
/**
* A data coupled to a name.
*/
public interface NamedData<out T> : DataUpdate<T>, Data<T> {
override val data: Data<T>
} }
public operator fun NamedData<*>.component1(): Name = name public operator fun NamedData<*>.component1(): Name = name
@ -32,4 +52,6 @@ public fun <T> Data<T>.named(name: Name): NamedData<T> = if (this is NamedData)
NamedDataImpl(name, this.data) NamedDataImpl(name, this.data)
} else { } else {
NamedDataImpl(name, this) NamedDataImpl(name, this)
} }
public fun <T> NamedData(name: Name, data: Data<T>): NamedData<T> = data.named(name)

View File

@ -1,5 +1,6 @@
package space.kscience.dataforge.data package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
@ -22,31 +23,27 @@ public inline fun <T> DataSink<T>.putAll(
if (prefix.isEmpty()) { if (prefix.isEmpty()) {
apply(block) apply(block)
} else { } else {
val proxyDataSink = DataSink { nameWithoutPrefix, data -> val proxyDataSink = object :DataSink<T>{
this.put(prefix + nameWithoutPrefix, data) override fun put(name: Name, data: Data<T>?) {
this@putAll.put(prefix + name, data)
}
override suspend fun update(name: Name, data: Data<T>?) {
this@putAll.update(prefix + name, data)
}
} }
proxyDataSink.apply(block) proxyDataSink.apply(block)
} }
} }
@Deprecated("Use putAll", ReplaceWith("putAll(prefix, block)"))
public inline fun <T> DataSink<T>.branch(
prefix: Name,
block: DataSink<T>.() -> Unit,
): Unit = putAll(prefix, block)
public inline fun <T> DataSink<T>.putAll( public inline fun <T> DataSink<T>.putAll(
prefix: String, prefix: String,
block: DataSink<T>.() -> Unit, block: DataSink<T>.() -> Unit,
): Unit = putAll(prefix.asName(), block) ): Unit = putAll(prefix.asName(), block)
@Deprecated("Use putAll", ReplaceWith("putAll(prefix, block)"))
public inline fun <T> DataSink<T>.branch(
prefix: String,
block: DataSink<T>.() -> Unit,
): Unit = putAll(prefix, block)
public fun <T> DataSink<T>.put(name: String, value: Data<T>) { public fun <T> DataSink<T>.put(name: String, value: Data<T>) {
put(Name.parse(name), value) put(Name.parse(name), value)
@ -56,20 +53,15 @@ public fun <T> DataSink<T>.putAll(name: Name, tree: DataTree<T>) {
putAll(name) { putAll(tree.asSequence()) } putAll(name) { putAll(tree.asSequence()) }
} }
@Deprecated("Use putAll", ReplaceWith("putAll(name, tree)"))
public fun <T> DataSink<T>.branch(name: Name, tree: DataTree<T>): Unit = putAll(name, tree)
public fun <T> DataSink<T>.putAll(name: String, tree: DataTree<T>) { public fun <T> DataSink<T>.putAll(name: String, tree: DataTree<T>) {
putAll(Name.parse(name)) { putAll(tree.asSequence()) } putAll(Name.parse(name)) { putAll(tree.asSequence()) }
} }
@Deprecated("Use putAll", ReplaceWith("putAll(name, tree)"))
public fun <T> DataSink<T>.branch(name: String, tree: DataTree<T>): Unit = putAll(name, tree)
/** /**
* Produce lazy [Data] and emit it into the [MutableDataTree] * Produce lazy [Data] and emit it into the [MutableDataTree]
*/ */
public inline fun <reified T> DataSink<T>.put( public inline fun <reified T> DataSink<T>.putValue(
name: String, name: String,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
noinline producer: suspend () -> T, noinline producer: suspend () -> T,
@ -78,7 +70,7 @@ public inline fun <reified T> DataSink<T>.put(
put(name, data) put(name, data)
} }
public inline fun <reified T> DataSink<T>.put( public inline fun <reified T> DataSink<T>.putValue(
name: Name, name: Name,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
noinline producer: suspend () -> T, noinline producer: suspend () -> T,
@ -90,24 +82,23 @@ public inline fun <reified T> DataSink<T>.put(
/** /**
* Emit static data with the fixed value * Emit static data with the fixed value
*/ */
public inline fun <reified T> DataSink<T>.wrap( public inline fun <reified T> DataSink<T>.putValue(
name: String, name: String,
data: T, value: T,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
): Unit = put(name, Data.static(data, meta)) ): Unit = put(name, Data.wrapValue(value, meta))
public inline fun <reified T> DataSink<T>.wrap( public inline fun <reified T> DataSink<T>.putValue(
name: Name, name: Name,
data: T, value: T,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
): Unit = put(name, Data.static(data, meta)) ): Unit = put(name, Data.wrapValue(value, meta))
public inline fun <reified T> DataSink<T>.wrap( public inline fun <reified T> DataSink<T>.putValue(
name: String, name: String,
data: T, value: T,
mutableMeta: MutableMeta.() -> Unit, metaBuilder: MutableMeta.() -> Unit,
): Unit = put(Name.parse(name), Data.static(data, Meta(mutableMeta))) ): Unit = put(Name.parse(name), Data.wrapValue(value, Meta(metaBuilder)))
public fun <T> DataSink<T>.putAll(sequence: Sequence<NamedData<T>>) { public fun <T> DataSink<T>.putAll(sequence: Sequence<NamedData<T>>) {
sequence.forEach { sequence.forEach {
@ -123,18 +114,12 @@ public fun <T> DataSink<T>.putAll(tree: DataTree<T>) {
* Copy given data set and mirror its changes to this [DataSink] in [this@setAndObserve]. Returns an update [Job] * Copy given data set and mirror its changes to this [DataSink] in [this@setAndObserve]. Returns an update [Job]
*/ */
public fun <T : Any> DataSink<T>.putAllAndWatch( public fun <T : Any> DataSink<T>.putAllAndWatch(
scope: CoroutineScope,
branchName: Name = Name.EMPTY, branchName: Name = Name.EMPTY,
dataSet: ObservableDataTree<T>, source: DataTree<T>,
): Job { ): Job {
putAll(branchName, dataSet) putAll(branchName, source)
return dataSet.updates().onEach { return source.updates.onEach {
put(branchName + it.name, it.data) put(branchName + it.name, it.data)
}.launchIn(dataSet.updatesScope) }.launchIn(scope)
} }
@Deprecated("Use putAllAndWatch", ReplaceWith("putAllAndWatch(name, dataSet)"))
public fun <T : Any> DataSink<T>.watchBranch(
name: Name,
dataSet: ObservableDataTree<T>,
): Job = putAllAndWatch(name, dataSet)

View File

@ -1,7 +1,7 @@
package space.kscience.dataforge.data package space.kscience.dataforge.data
import space.kscience.dataforge.meta.* import space.kscience.dataforge.meta.*
import space.kscience.dataforge.misc.DFInternal import space.kscience.dataforge.misc.UnsafeKType
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken import space.kscience.dataforge.names.NameToken
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
@ -68,7 +68,7 @@ internal fun Map<*, Data<*>>.joinMeta(): Meta = Meta {
} }
} }
@DFInternal @UnsafeKType
public fun <K, T, R> Map<K, Data<T>>.reduceToData( public fun <K, T, R> Map<K, Data<T>>.reduceToData(
outputType: KType, outputType: KType,
meta: Meta = joinMeta(), meta: Meta = joinMeta(),
@ -103,7 +103,7 @@ public inline fun <K, T, reified R> Map<K, Data<T>>.reduceToData(
//Iterable operations //Iterable operations
@DFInternal @UnsafeKType
public inline fun <T, R> Iterable<Data<T>>.reduceToData( public inline fun <T, R> Iterable<Data<T>>.reduceToData(
outputType: KType, outputType: KType,
meta: Meta = joinMeta(), meta: Meta = joinMeta(),
@ -118,7 +118,7 @@ public inline fun <T, R> Iterable<Data<T>>.reduceToData(
transformation(map { it.awaitWithMeta() }) transformation(map { it.awaitWithMeta() })
} }
@OptIn(DFInternal::class) @OptIn(UnsafeKType::class)
public inline fun <T, reified R> Iterable<Data<T>>.reduceToData( public inline fun <T, reified R> Iterable<Data<T>>.reduceToData(
meta: Meta = joinMeta(), meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -141,7 +141,7 @@ public inline fun <T, reified R> Iterable<Data<T>>.foldToData(
/** /**
* Transform an [Iterable] of [NamedData] to a single [Data]. * Transform an [Iterable] of [NamedData] to a single [Data].
*/ */
@DFInternal @UnsafeKType
public inline fun <T, R> Iterable<NamedData<T>>.reduceNamedToData( public inline fun <T, R> Iterable<NamedData<T>>.reduceNamedToData(
outputType: KType, outputType: KType,
meta: Meta = joinMeta(), meta: Meta = joinMeta(),
@ -156,7 +156,7 @@ public inline fun <T, R> Iterable<NamedData<T>>.reduceNamedToData(
transformation(map { it.awaitWithMeta() }) transformation(map { it.awaitWithMeta() })
} }
@OptIn(DFInternal::class) @OptIn(UnsafeKType::class)
public inline fun <T, reified R> Iterable<NamedData<T>>.reduceNamedToData( public inline fun <T, reified R> Iterable<NamedData<T>>.reduceNamedToData(
meta: Meta = joinMeta(), meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -181,7 +181,8 @@ public inline fun <T, reified R> Iterable<NamedData<T>>.foldNamedToData(
//DataSet operations //DataSet operations
@DFInternal
@UnsafeKType
public suspend fun <T, R> DataTree<T>.transform( public suspend fun <T, R> DataTree<T>.transform(
outputType: KType, outputType: KType,
metaTransform: MutableMeta.() -> Unit = {}, metaTransform: MutableMeta.() -> Unit = {},
@ -198,7 +199,7 @@ public suspend fun <T, R> DataTree<T>.transform(
} }
} }
@OptIn(DFInternal::class) @OptIn(UnsafeKType::class)
public suspend inline fun <T, reified R> DataTree<T>.transform( public suspend inline fun <T, reified R> DataTree<T>.transform(
noinline metaTransform: MutableMeta.() -> Unit = {}, noinline metaTransform: MutableMeta.() -> Unit = {},
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,

View File

@ -0,0 +1,112 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import space.kscience.dataforge.misc.UnsafeKType
import space.kscience.dataforge.names.*
import kotlin.reflect.KType
import kotlin.reflect.typeOf
private class FlatDataTree<T>(
override val dataType: KType,
private val dataSet: Map<Name, Data<T>>,
private val sourceUpdates: Flow<DataUpdate<T>>,
private val prefix: Name,
) : DataTree<T> {
override val data: Data<T>? get() = dataSet[prefix]
override val items: Map<NameToken, FlatDataTree<T>>
get() = dataSet.keys
.filter { it.startsWith(prefix) && it.length > prefix.length }
.map { it.tokens[prefix.length] }
.associateWith { FlatDataTree(dataType, dataSet, sourceUpdates, prefix + it) }
override fun read(name: Name): Data<T>? = dataSet[prefix + name]
override val updates: Flow<DataUpdate<T>> =
sourceUpdates.mapNotNull { update ->
update.name.removeFirstOrNull(prefix)?.let { DataUpdate(dataType, it, update.data) }
}
}
/**
* A builder for static [DataTree].
*/
private class DataTreeBuilder<T>(
private val type: KType,
initialData: Map<Name, Data<T>> = emptyMap(),
) : DataSink<T> {
private val map = HashMap<Name, Data<T>>(initialData)
private val mutex = Mutex()
private val updatesFlow = MutableSharedFlow<DataUpdate<T>>()
override fun put(name: Name, data: Data<T>?) {
if (data == null) {
map.remove(name)
} else {
map[name] = data
}
}
override suspend fun update(name: Name, data: Data<T>?) {
mutex.withLock {
if (data == null) {
map.remove(name)
} else {
map.put(name, data)
}
}
updatesFlow.emit(DataUpdate(data?.type ?: type, name, data))
}
public fun build(): DataTree<T> = FlatDataTree(type, map, updatesFlow, Name.EMPTY)
}
/**
* Create a static [DataTree]
*/
@UnsafeKType
public fun <T> DataTree(
dataType: KType,
generator: DataSink<T>.() -> Unit,
): DataTree<T> = DataTreeBuilder<T>(dataType).apply(generator).build()
/**
* Create and a data tree.
*/
@OptIn(UnsafeKType::class)
public inline fun <reified T> DataTree(
noinline generator: DataSink<T>.() -> Unit,
): DataTree<T> = DataTree(typeOf<T>(), generator)
/**
* Represent this flat data map as a [DataTree] without copying it
*/
@UnsafeKType
public fun <T> Map<Name, Data<T>>.asTree(type: KType): DataTree<T> =
DataTreeBuilder(type, this).build()
/**
* Represent this flat data map as a [DataTree] without copying it
*/
@OptIn(UnsafeKType::class)
public inline fun <reified T> Map<Name, Data<T>>.asTree(): DataTree<T> = asTree(typeOf<T>())
@UnsafeKType
public fun <T> Sequence<NamedData<T>>.toTree(type: KType): DataTree<T> =
DataTreeBuilder(type, associate { it.name to it.data }).build()
/**
* Collect a sequence of [NamedData] to a [DataTree]
*/
@OptIn(UnsafeKType::class)
public inline fun <reified T> Sequence<NamedData<T>>.toTree(): DataTree<T> = toTree(typeOf<T>())

View File

@ -1,22 +1,24 @@
package space.kscience.dataforge.data package space.kscience.dataforge.data
import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test
import space.kscience.dataforge.actions.Action import space.kscience.dataforge.actions.Action
import space.kscience.dataforge.actions.invoke import space.kscience.dataforge.actions.invoke
import space.kscience.dataforge.actions.mapping import space.kscience.dataforge.actions.mapping
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import kotlin.test.Test
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.time.Duration.Companion.milliseconds
@OptIn(DFExperimental::class) @OptIn(DFExperimental::class)
internal class ActionsTest { internal class ActionsTest {
@Test @Test
fun testStaticMapAction() = runTest { fun testStaticMapAction() = runTest(timeout = 500.milliseconds) {
val data: DataTree<Int> = DataTree { val data: DataTree<Int> = DataTree {
repeat(10) { repeat(10) {
wrap(it.toString(), it) putValue(it.toString(), it)
} }
} }
@ -28,7 +30,7 @@ internal class ActionsTest {
} }
@Test @Test
fun testDynamicMapAction() = runBlocking { fun testDynamicMapAction() = runTest(timeout = 500.milliseconds) {
val source: MutableDataTree<Int> = MutableDataTree() val source: MutableDataTree<Int> = MutableDataTree()
val plusOne = Action.mapping<Int, Int> { val plusOne = Action.mapping<Int, Int> {
@ -39,13 +41,9 @@ internal class ActionsTest {
repeat(10) { repeat(10) {
source.wrap(it.toString(), it) source.putValue(it.toString(), it)
} }
result.updates.take(10).onEach { println(it.name) }.collect()
delay(20)
source.close()
result.awaitClose()
assertEquals(2, result["1"]?.await()) assertEquals(2, result["1"]?.await())
} }

View File

@ -0,0 +1,71 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import space.kscience.dataforge.names.asName
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.time.Duration.Companion.milliseconds
internal class DataTreeBuilderTest {
@Test
fun testTreeBuild() = runTest(timeout = 500.milliseconds) {
val node = DataTree<Any> {
putAll("primary"){
putValue("a", "a")
putValue("b", "b")
}
putValue("c.d", "c.d")
putValue("c.f", "c.f")
}
assertEquals("a", node["primary.a"]?.await())
assertEquals("b", node["primary.b"]?.await())
assertEquals("c.d", node["c.d"]?.await())
assertEquals("c.f", node["c.f"]?.await())
}
@Test
fun testDataUpdate() = runTest(timeout = 500.milliseconds) {
val updateData = DataTree<Any> {
putAll("update") {
put("a", Data.wrapValue("a"))
put("b", Data.wrapValue("b"))
}
}
val node = DataTree<Any> {
putAll("primary") {
putValue("a", "a")
putValue("b", "b")
}
putValue("root", "root")
putAll(updateData)
}
assertEquals("a", node["update.a"]?.await())
assertEquals("a", node["primary.a"]?.await())
}
@Test
fun testDynamicUpdates() = runTest(timeout = 500.milliseconds) {
launch {
val subNode = MutableDataTree<Int>()
val rootNode = MutableDataTree<Int>() {
putAllAndWatch(this@launch, "sub".asName(), subNode)
}
repeat(10) {
subNode.putValue("value[$it]", it)
}
subNode.updates.take(10).collect()
assertEquals(9, rootNode["sub.value[9]"]?.await())
cancel()
}.join()
}
}

View File

@ -1,6 +1,5 @@
package space.kscience.dataforge.data package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filter
import space.kscience.dataforge.misc.DFInternal import space.kscience.dataforge.misc.DFInternal
@ -25,32 +24,40 @@ private fun <R> Data<*>.castOrNull(type: KType): Data<R>? =
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
@DFInternal @DFInternal
public fun <R> Sequence<NamedData<*>>.filterByDataType(type: KType): Sequence<NamedData<R>> = public fun <R> Sequence<DataUpdate<*>>.filterByDataType(type: KType): Sequence<NamedData<R>> =
filter { it.type.isSubtypeOf(type) } as Sequence<NamedData<R>> filter { it.type.isSubtypeOf(type) } as Sequence<NamedData<R>>
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
@DFInternal @DFInternal
public fun <R> Flow<NamedData<*>>.filterByDataType(type: KType): Flow<NamedData<R>> = public fun <R> Flow<DataUpdate<*>>.filterByDataType(type: KType): Flow<NamedData<R>> =
filter { it.type.isSubtypeOf(type) } as Flow<NamedData<R>> filter { it.type.isSubtypeOf(type) } as Flow<NamedData<R>>
/** /**
* Select all data matching given type and filters. Does not modify paths * Select all data matching given type and filters. Does not modify paths
* *
* @param predicate additional filtering condition based on item name and meta. By default, accepts all * @param filter additional filtering condition based on item name and meta. By default, accepts all
*/ */
@Suppress("UNCHECKED_CAST")
@DFInternal @DFInternal
public fun <R> DataTree<*>.filterByType( public fun <R> DataTree<*>.filterByType(
type: KType, type: KType,
predicate: DataFilter = DataFilter.EMPTY, branch: Name = Name.EMPTY,
): DataTree<R> = asSequence().filterByDataType<R>(type).filterData(predicate).toTree(type) filter: DataFilter = DataFilter.EMPTY,
): DataTree<R> {
val filterWithType = DataFilter { name, meta, dataType ->
filter.accepts(name, meta, dataType) && dataType.isSubtypeOf(type)
}
return FilteredDataTree(this, filterWithType, branch, type) as DataTree<R>
}
/** /**
* Select a single datum of the appropriate type * Select a single datum of the appropriate type
*/ */
@OptIn(DFInternal::class) @OptIn(DFInternal::class)
public inline fun <reified R : Any> DataTree<*>.filterByType( public inline fun <reified R : Any> DataTree<*>.filterByType(
predicate: DataFilter = DataFilter.EMPTY, branch: Name = Name.EMPTY,
): DataTree<R> = filterByType(typeOf<R>(), predicate) filter: DataFilter = DataFilter.EMPTY,
): DataTree<R> = filterByType(typeOf<R>(), branch, filter = filter)
/** /**
* Select a single datum if it is present and of given [type] * Select a single datum if it is present and of given [type]
@ -63,25 +70,3 @@ public inline fun <reified R : Any> DataTree<*>.getByType(name: Name): NamedData
public inline fun <reified R : Any> DataTree<*>.getByType(name: String): NamedData<R>? = public inline fun <reified R : Any> DataTree<*>.getByType(name: String): NamedData<R>? =
this@getByType.getByType(typeOf<R>(), Name.parse(name)) this@getByType.getByType(typeOf<R>(), Name.parse(name))
/**
* Select all data matching given type and filters. Does not modify paths
*
* @param predicate additional filtering condition based on item name and meta. By default, accepts all
*/
@DFInternal
public fun <R> ObservableDataTree<*>.filterByType(
type: KType,
scope: CoroutineScope,
predicate: DataFilter = DataFilter.EMPTY,
): ObservableDataTree<R> = asSequence()
.filterByDataType<R>(type)
.filterData(predicate)
.toObservableTree(type, scope, updates().filterByDataType<R>(type).filterData(predicate))
@OptIn(DFInternal::class)
public inline fun <reified R> ObservableDataTree<*>.filterByType(
scope: CoroutineScope,
predicate: DataFilter = DataFilter.EMPTY,
): ObservableDataTree<R> = filterByType(typeOf<R>(), scope, predicate)

View File

@ -14,14 +14,14 @@ public infix fun <T : Any> String.put(data: Data<T>): Unit =
* Append node * Append node
*/ */
context(DataSink<T>) context(DataSink<T>)
public infix fun <T : Any> String.put(dataSet: DataTree<T>): Unit = public infix fun <T : Any> String.putAll(dataSet: DataTree<T>): Unit =
putAll(this, dataSet) putAll(this, dataSet)
/** /**
* Build and append node * Build and append node
*/ */
context(DataSink<T>) context(DataSink<T>)
public infix fun <T : Any> String.put( public infix fun <T : Any> String.putAll(
block: DataSink<T>.() -> Unit, block: DataSink<T>.() -> Unit,
): Unit = putAll(Name.parse(this), block) ): Unit = putAll(Name.parse(this), block)

View File

@ -1,68 +0,0 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.asName
import kotlin.test.Test
import kotlin.test.assertEquals
internal class DataTreeBuilderTest {
@Test
fun testTreeBuild() = runTest {
val node = DataTree<Any> {
"primary" put {
wrap("a", "a")
wrap("b", "b")
}
wrap("c.d", "c.d")
wrap("c.f", "c.f")
}
assertEquals("a", node["primary.a"]?.await())
assertEquals("b", node["primary.b"]?.await())
assertEquals("c.d", node["c.d"]?.await())
assertEquals("c.f", node["c.f"]?.await())
}
@OptIn(DFExperimental::class)
@Test
fun testDataUpdate() = runTest {
val updateData = DataTree<Any> {
"update" put {
"a" put Data.static("a")
"b" put Data.static("b")
}
}
val node = DataTree<Any> {
"primary" put {
wrap("a", "a")
wrap("b", "b")
}
wrap("root", "root")
putAll(updateData)
}
assertEquals("a", node["update.a"]?.await())
assertEquals("a", node["primary.a"]?.await())
}
@Test
fun testDynamicUpdates() = runBlocking {
val subNode = MutableDataTree<Int>()
val rootNode = MutableDataTree<Int> {
putAllAndWatch("sub".asName(), subNode)
}
repeat(10) {
subNode.wrap("value[$it]", it)
}
delay(20)
assertEquals(9, rootNode["sub.value[9]"]?.await())
}
}

View File

@ -10,6 +10,7 @@ import space.kscience.dataforge.meta.MetaSpec
import space.kscience.dataforge.meta.descriptors.Described import space.kscience.dataforge.meta.descriptors.Described
import space.kscience.dataforge.meta.descriptors.MetaDescriptor import space.kscience.dataforge.meta.descriptors.MetaDescriptor
import space.kscience.dataforge.misc.DfType import space.kscience.dataforge.misc.DfType
import space.kscience.dataforge.misc.UnsafeKType
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.workspace.Task.Companion.TYPE import space.kscience.dataforge.workspace.Task.Companion.TYPE
import kotlin.reflect.KType import kotlin.reflect.KType
@ -90,7 +91,8 @@ public fun <T : Any> Task(
taskMeta: Meta, taskMeta: Meta,
): TaskResult<T> { ): TaskResult<T> {
//TODO use safe builder and check for external data on add and detects cycles //TODO use safe builder and check for external data on add and detects cycles
val dataset = MutableDataTree<T>(resultType, workspace.context).apply { @OptIn(UnsafeKType::class)
val dataset = MutableDataTree<T>(resultType).apply {
TaskResultBuilder(workspace, taskName, taskMeta, this).apply { TaskResultBuilder(workspace, taskName, taskMeta, this).apply {
withContext(GoalExecutionRestriction() + workspace.goalLogger) { withContext(GoalExecutionRestriction() + workspace.goalLogger) {
builder() builder()
@ -98,7 +100,6 @@ public fun <T : Any> Task(
} }
} }
return workspace.wrapResult(dataset, taskName, taskMeta) return workspace.wrapResult(dataset, taskName, taskMeta)
} }
} }
@ -117,6 +118,7 @@ public inline fun <reified T : Any> Task(
* @param builder for resulting data set * @param builder for resulting data set
*/ */
@Suppress("FunctionName") @Suppress("FunctionName")
public fun <T : Any, C : MetaRepr> Task( public fun <T : Any, C : MetaRepr> Task(
resultType: KType, resultType: KType,
@ -132,7 +134,8 @@ public fun <T : Any, C : MetaRepr> Task(
): TaskResult<T> = withContext(GoalExecutionRestriction() + workspace.goalLogger) { ): TaskResult<T> = withContext(GoalExecutionRestriction() + workspace.goalLogger) {
//TODO use safe builder and check for external data on add and detects cycles //TODO use safe builder and check for external data on add and detects cycles
val taskMeta = configuration.toMeta() val taskMeta = configuration.toMeta()
val dataset = MutableDataTree<T>(resultType, this).apply { @OptIn(UnsafeKType::class)
val dataset = MutableDataTree<T>(resultType).apply {
TaskResultBuilder(workspace, taskName, taskMeta, this).apply { builder(configuration) } TaskResultBuilder(workspace, taskName, taskMeta, this).apply { builder(configuration) }
} }
workspace.wrapResult(dataset, taskName, taskMeta) workspace.wrapResult(dataset, taskName, taskMeta)

View File

@ -4,7 +4,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.joinAll import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import space.kscience.dataforge.data.ObservableDataTree import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.asSequence import space.kscience.dataforge.data.asSequence
import space.kscience.dataforge.data.launch import space.kscience.dataforge.data.launch
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
@ -17,16 +17,16 @@ import space.kscience.dataforge.names.Name
* @param taskMeta The configuration of the task that produced the result * @param taskMeta The configuration of the task that produced the result
*/ */
public data class TaskResult<T>( public data class TaskResult<T>(
public val content: ObservableDataTree<T>, public val content: DataTree<T>,
public val workspace: Workspace, public val workspace: Workspace,
public val taskName: Name, public val taskName: Name,
public val taskMeta: Meta, public val taskMeta: Meta,
) : ObservableDataTree<T> by content ) : DataTree<T> by content
/** /**
* Wrap data into [TaskResult] * Wrap data into [TaskResult]
*/ */
public fun <T> Workspace.wrapResult(data: ObservableDataTree<T>, taskName: Name, taskMeta: Meta): TaskResult<T> = public fun <T> Workspace.wrapResult(data: DataTree<T>, taskName: Name, taskMeta: Meta): TaskResult<T> =
TaskResult(data, this, taskName, taskMeta) TaskResult(data, this, taskName, taskMeta)
/** /**

View File

@ -2,7 +2,10 @@ package space.kscience.dataforge.workspace
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import space.kscience.dataforge.context.ContextAware import space.kscience.dataforge.context.ContextAware
import space.kscience.dataforge.data.* import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.asSequence
import space.kscience.dataforge.data.get
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.misc.DfType import space.kscience.dataforge.misc.DfType
@ -26,7 +29,7 @@ public interface Workspace : ContextAware, Provider, CoroutineScope {
/** /**
* The whole data node for current workspace * The whole data node for current workspace
*/ */
public val data: ObservableDataTree<*> public val data: DataTree<*>
/** /**
* All targets associated with the workspace * All targets associated with the workspace

View File

@ -1,6 +1,5 @@
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import kotlinx.coroutines.CoroutineScope
import space.kscience.dataforge.actions.Action import space.kscience.dataforge.actions.Action
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.ContextBuilder import space.kscience.dataforge.context.ContextBuilder
@ -12,6 +11,7 @@ import space.kscience.dataforge.meta.*
import space.kscience.dataforge.meta.descriptors.MetaDescriptor import space.kscience.dataforge.meta.descriptors.MetaDescriptor
import space.kscience.dataforge.meta.descriptors.MetaDescriptorBuilder import space.kscience.dataforge.meta.descriptors.MetaDescriptorBuilder
import space.kscience.dataforge.misc.DFBuilder import space.kscience.dataforge.misc.DFBuilder
import space.kscience.dataforge.misc.UnsafeKType
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName import space.kscience.dataforge.names.asName
import kotlin.collections.set import kotlin.collections.set
@ -98,19 +98,19 @@ public inline fun <reified T : Any> TaskContainer.task(
public inline fun <T : Any, reified R : Any> TaskContainer.action( public inline fun <T : Any, reified R : Any> TaskContainer.action(
selector: DataSelector<T>, selector: DataSelector<T>,
action: Action<T, R>, action: Action<T, R>,
noinline metaTransform: MutableMeta.()-> Unit = {}, noinline metaTransform: MutableMeta.() -> Unit = {},
noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {}, noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<R>>> = ): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<R>>> =
task(MetaDescriptor(descriptorBuilder)) { task(MetaDescriptor(descriptorBuilder)) {
result(action.execute(from(selector), taskMeta.copy(metaTransform))) result(action.execute(from(selector), taskMeta.copy(metaTransform), workspace))
} }
public class WorkspaceBuilder( public class WorkspaceBuilder(
private val parentContext: Context = Global, private val parentContext: Context = Global,
private val coroutineScope: CoroutineScope = parentContext,
) : TaskContainer { ) : TaskContainer {
private var context: Context? = null private var context: Context? = null
private val data = MutableDataTree<Any?>(typeOf<Any?>(), coroutineScope) @OptIn(UnsafeKType::class)
private val data = MutableDataTree<Any?>(typeOf<Any?>())
private val targets: HashMap<String, Meta> = HashMap() private val targets: HashMap<String, Meta> = HashMap()
private val tasks = HashMap<Name, Task<*>>() private val tasks = HashMap<Name, Task<*>>()
private var cache: WorkspaceCache? = null private var cache: WorkspaceCache? = null

View File

@ -2,14 +2,14 @@ package space.kscience.dataforge.workspace
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.gather import space.kscience.dataforge.context.gather
import space.kscience.dataforge.data.ObservableDataTree import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
internal class WorkspaceImpl internal constructor( internal class WorkspaceImpl internal constructor(
override val context: Context, override val context: Context,
override val data: ObservableDataTree<*>, override val data: DataTree<*>,
override val targets: Map<String, Meta>, override val targets: Map<String, Meta>,
tasks: Map<Name, Task<*>>, tasks: Map<Name, Task<*>>,
private val postProcess: suspend (TaskResult<*>) -> TaskResult<*>, private val postProcess: suspend (TaskResult<*>) -> TaskResult<*>,

View File

@ -3,14 +3,14 @@ package space.kscience.dataforge.workspace
import space.kscience.dataforge.data.Data import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.await import space.kscience.dataforge.data.await
import space.kscience.dataforge.io.* import space.kscience.dataforge.io.*
import space.kscience.dataforge.misc.DFInternal import space.kscience.dataforge.misc.UnsafeKType
import kotlin.reflect.typeOf import kotlin.reflect.typeOf
/** /**
* Convert an [Envelope] to a data via given format. The actual parsing is done lazily. * Convert an [Envelope] to a data via given format. The actual parsing is done lazily.
*/ */
@OptIn(DFInternal::class) @OptIn(UnsafeKType::class)
public inline fun <reified T : Any> Envelope.toData(format: IOReader<T>): Data<T> = Data(typeOf<T>(), meta) { public inline fun <reified T : Any> Envelope.toData(format: IOReader<T>): Data<T> = Data(typeOf<T>(), meta) {
data?.readWith(format) ?: error("Can't convert envelope without data to Data") data?.readWith(format) ?: error("Can't convert envelope without data to Data")
} }

View File

@ -113,7 +113,7 @@ public suspend inline fun <T, reified R> TaskResultBuilder<R>.actionFrom(
action: Action<T, R>, action: Action<T, R>,
dependencyMeta: Meta = defaultDependencyMeta, dependencyMeta: Meta = defaultDependencyMeta,
) { ) {
this.putAll(action.execute(from(selector, dependencyMeta), dependencyMeta)) putAll(action.execute(from(selector, dependencyMeta), dependencyMeta, workspace))
} }

View File

@ -1,5 +1,6 @@
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.io.* import kotlinx.io.*
import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.ExperimentalSerializationApi
@ -96,7 +97,7 @@ public class FileWorkspaceCache(public val cacheDirectory: Path) : WorkspaceCach
val cachedTree = result.asSequence().map { cacheOne(it) } val cachedTree = result.asSequence().map { cacheOne(it) }
.toObservableTree(result.dataType, result.workspace, result.updates().map { cacheOne(it) }) .toTree(result.dataType, result.updates.filterIsInstance<NamedData<T>>().map { cacheOne(it) })
return result.workspace.wrapResult(cachedTree, result.taskName, result.taskMeta) return result.workspace.wrapResult(cachedTree, result.taskName, result.taskMeta)
} }

View File

@ -1,5 +1,6 @@
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import space.kscience.dataforge.data.* import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
@ -31,7 +32,7 @@ public class InMemoryWorkspaceCache : WorkspaceCache {
val cachedTree = result.asSequence().map { cacheOne(it) } val cachedTree = result.asSequence().map { cacheOne(it) }
.toObservableTree(result.dataType, result.workspace, result.updates().map { cacheOne(it) }) .toTree(result.dataType, result.updates.filterIsInstance<NamedData<T>>().map { cacheOne(it) })
return result.workspace.wrapResult(cachedTree, result.taskName, result.taskMeta) return result.workspace.wrapResult(cachedTree, result.taskName, result.taskMeta)
} }

View File

@ -7,14 +7,12 @@ import space.kscience.dataforge.data.StaticData
import space.kscience.dataforge.io.* import space.kscience.dataforge.io.*
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.copy import space.kscience.dataforge.meta.copy
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.asName import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.plus import space.kscience.dataforge.names.plus
import java.nio.file.Files import java.nio.file.*
import java.nio.file.Path
import java.nio.file.StandardWatchEventKinds
import java.nio.file.WatchEvent
import java.nio.file.attribute.BasicFileAttributes import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.spi.FileSystemProvider import java.nio.file.spi.FileSystemProvider
import kotlin.io.path.* import kotlin.io.path.*
@ -166,15 +164,25 @@ public fun DataSink<Binary>.monitorFiles(
* @param resources The names of the resources to read. * @param resources The names of the resources to read.
* @param classLoader The class loader to use for loading the resources. By default, it uses the current thread's context class loader. * @param classLoader The class loader to use for loading the resources. By default, it uses the current thread's context class loader.
*/ */
@DFExperimental
public fun DataSink<Binary>.resources( public fun DataSink<Binary>.resources(
io: IOPlugin, io: IOPlugin,
vararg resources: String, resource: String,
vararg otherResources: String,
classLoader: ClassLoader = Thread.currentThread().contextClassLoader, classLoader: ClassLoader = Thread.currentThread().contextClassLoader,
) { ) {
resources.forEach { resource -> //create a file system if necessary
val path = classLoader.getResource(resource)?.toURI()?.toPath() ?: error( val uri = Thread.currentThread().contextClassLoader.getResource("common")!!.toURI()
"Resource with name $resource is not resolved" try {
uri.toPath()
} catch (e: FileSystemNotFoundException) {
FileSystems.newFileSystem(uri, mapOf("create" to "true"))
}
listOf(resource,*otherResources).forEach { r ->
val path = classLoader.getResource(r)?.toURI()?.toPath() ?: error(
"Resource with name $r is not resolved"
) )
files(io, resource.asName(), path) files(io, r.asName(), path)
} }
} }

View File

@ -3,7 +3,7 @@ package space.kscience.dataforge.workspace
import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import space.kscience.dataforge.data.wrap import space.kscience.dataforge.data.putValue
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.boolean import space.kscience.dataforge.meta.boolean
import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.get
@ -22,7 +22,7 @@ internal class CachingWorkspaceTest {
data { data {
//statically initialize data //statically initialize data
repeat(5) { repeat(5) {
wrap("myData[$it]", it) putValue("myData[$it]", it)
} }
} }

View File

@ -47,7 +47,7 @@ class DataPropagationTest {
} }
data { data {
repeat(100) { repeat(100) {
wrap("myData[$it]", it) putValue("myData[$it]", it)
} }
} }
} }

View File

@ -24,11 +24,11 @@ import kotlin.test.assertEquals
class FileDataTest { class FileDataTest {
val dataNode = DataTree<String> { val dataNode = DataTree<String> {
putAll("dir") { putAll("dir") {
wrap("a", "Some string") { putValue("a", "Some string") {
"content" put "Some string" "content" put "Some string"
} }
} }
wrap("b", "root data") putValue("b", "root data")
// meta { // meta {
// "content" put "This is root meta node" // "content" put "This is root meta node"
// } // }

View File

@ -3,11 +3,11 @@ package space.kscience.dataforge.workspace
import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import space.kscience.dataforge.data.wrap import space.kscience.dataforge.data.putValue
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import java.nio.file.Files import java.nio.file.Files
@OptIn(ExperimentalCoroutinesApi::class,DFExperimental::class) @OptIn(ExperimentalCoroutinesApi::class, DFExperimental::class)
class FileWorkspaceCacheTest { class FileWorkspaceCacheTest {
@Test @Test
@ -16,7 +16,7 @@ class FileWorkspaceCacheTest {
data { data {
//statically initialize data //statically initialize data
repeat(5) { repeat(5) {
wrap("myData[$it]", it) putValue("myData[$it]", it)
} }
} }
fileCache(Files.createTempDirectory("dataforge-temporary-cache")) fileCache(Files.createTempDirectory("dataforge-temporary-cache"))

View File

@ -6,7 +6,6 @@ package space.kscience.dataforge.workspace
import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Timeout
import space.kscience.dataforge.context.* import space.kscience.dataforge.context.*
import space.kscience.dataforge.data.* import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.* import space.kscience.dataforge.meta.*
@ -16,6 +15,7 @@ import space.kscience.dataforge.names.plus
import kotlin.test.Test import kotlin.test.Test
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertTrue import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.milliseconds
/** /**
@ -62,7 +62,7 @@ internal class SimpleWorkspaceTest {
data { data {
//statically initialize data //statically initialize data
repeat(100) { repeat(100) {
wrap("myData[$it]", it) putValue("myData[$it]", it)
} }
} }
@ -148,18 +148,16 @@ internal class SimpleWorkspaceTest {
} }
@Test @Test
@Timeout(1) fun testWorkspace() = runTest(timeout = 10.milliseconds) {
fun testWorkspace() = runTest {
val node = workspace.produce("sum") val node = workspace.produce("sum")
val res = node.asSequence().single() val res = node.asSequence().single()
assertEquals(328350, res.await()) assertEquals(328350, res.await())
} }
@Test @Test
@Timeout(1) fun testMetaPropagation() = runTest(timeout = 10.milliseconds) {
fun testMetaPropagation() = runTest {
val node = workspace.produce("sum") { "testFlag" put true } val node = workspace.produce("sum") { "testFlag" put true }
val res = node.single().await() val res = node.data!!.await()
} }
@Test @Test
@ -188,7 +186,7 @@ internal class SimpleWorkspaceTest {
val node = workspace.produce("filterOne") { val node = workspace.produce("filterOne") {
"name" put "myData[12]" "name" put "myData[12]"
} }
assertEquals(12, node.single().await()) assertEquals(12, node.data!!.await())
} }
} }