Compare commits

...

7 Commits

55 changed files with 1262 additions and 1752 deletions

2
.gitignore vendored
View File

@ -5,5 +5,7 @@ out/
.gradle
build/
.kotlin
!gradle-wrapper.jar

View File

@ -12,7 +12,7 @@ kscience {
useCoroutines()
useSerialization()
commonMain {
api(project(":dataforge-meta"))
api(projects.dataforgeMeta)
api(spclibs.atomicfu)
}
jvmMain{

View File

@ -1,5 +1,7 @@
package space.kscience.dataforge.actions
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Meta
@ -19,46 +21,51 @@ internal fun MutableMap<Name, *>.removeWhatStartsWith(name: Name) {
/**
* An action that caches results on-demand and recalculates them on source push
*/
public abstract class AbstractAction<in T : Any, R : Any>(
public abstract class AbstractAction<T : Any, R : Any>(
public val outputType: KType,
) : Action<T, R> {
/**
* Generate initial content of the output
*/
protected abstract fun DataSetBuilder<R>.generate(
data: DataSet<T>,
protected abstract fun DataSink<R>.generate(
data: DataTree<T>,
meta: Meta,
)
/**
* Update part of the data set when given [updateKey] is triggered by the source
* Update part of the data set using provided data
*
* @param source the source data tree in case we need several data items to update
*/
protected open fun DataSourceBuilder<R>.update(
dataSet: DataSet<T>,
protected open fun DataSink<R>.update(
source: DataTree<T>,
meta: Meta,
updateKey: Name,
) {
// By default, recalculate the whole dataset
generate(dataSet, meta)
namedData: NamedData<T>,
){
//by default regenerate the whole data set
generate(source,meta)
}
@OptIn(DFInternal::class)
override fun execute(
dataSet: DataSet<T>,
dataSet: DataTree<T>,
meta: Meta,
): DataSet<R> = if (dataSet is DataSource) {
DataSource(outputType, dataSet){
): DataTree<R> = if(dataSet.isObservable()) {
MutableDataTree<R>(outputType, dataSet.updatesScope).apply {
generate(dataSet, meta)
dataSet.updates().onEach {
update(dataSet, meta, it)
}.launchIn(updatesScope)
launch {
dataSet.updates.collect { name ->
update(dataSet, meta, name)
}
//close updates when the source is closed
updatesScope.launch {
dataSet.awaitClose()
close()
}
}
} else {
DataTree<R>(outputType) {
} else{
DataTree(outputType){
generate(dataSet, meta)
}
}

View File

@ -1,19 +1,19 @@
package space.kscience.dataforge.actions
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
/**
* A simple data transformation on a data node. Actions should avoid doing actual dependency evaluation in [execute].
*/
public fun interface Action<in T : Any, out R : Any> {
public fun interface Action<T, R> {
/**
* Transform the data in the node, producing a new node. By default, it is assumed that all calculations are lazy
* so not actual computation is started at this moment.
*/
public fun execute(dataSet: DataSet<T>, meta: Meta): DataSet<R>
public fun execute(dataSet: DataTree<T>, meta: Meta): DataTree<R>
public companion object
}
@ -21,18 +21,23 @@ public fun interface Action<in T : Any, out R : Any> {
/**
* A convenience method to transform data using given [action]
*/
public fun <T : Any, R : Any> DataSet<T>.transform(action: Action<T, R>, meta: Meta = Meta.EMPTY): DataSet<R> =
action.execute(this, meta)
public fun <T, R> DataTree<T>.transform(
action: Action<T, R>,
meta: Meta = Meta.EMPTY,
): DataTree<R> = action.execute(this, meta)
/**
* Action composition. The result is terminal if one of its parts is terminal
*/
public infix fun <T : Any, I : Any, R : Any> Action<T, I>.then(action: Action<I, R>): Action<T, R> =
Action<T, R> { dataSet, meta -> action.execute(this@then.execute(dataSet, meta), meta) }
public infix fun <T, I, R> Action<T, I>.then(action: Action<I, R>): Action<T, R> = Action { dataSet, meta ->
action.execute(this@then.execute(dataSet, meta), meta)
}
@DFExperimental
public operator fun <T : Any, R : Any> Action<T, R>.invoke(
dataSet: DataSet<T>,
public operator fun <T, R> Action<T, R>.invoke(
dataSet: DataTree<T>,
meta: Meta = Meta.EMPTY,
): DataSet<R> = execute(dataSet, meta)
): DataTree<R> = execute(dataSet, meta)

View File

@ -29,6 +29,7 @@ public class MapActionBuilder<T, R>(
public var name: Name,
public var meta: MutableMeta,
public val actionMeta: Meta,
public val dataType: KType,
@PublishedApi internal var outputType: KType,
) {
@ -45,19 +46,16 @@ public class MapActionBuilder<T, R>(
/**
* Calculate the result of goal
*/
public inline fun <reified R1 : R> result(noinline f: suspend ActionEnv.(T) -> R1) {
outputType = typeOf<R1>()
result = f;
}
public inline fun <reified R1 : R> result(noinline f: suspend ActionEnv.(T) -> R1): Unit = result(typeOf<R1>(), f)
}
@PublishedApi
internal class MapAction<in T : Any, R : Any>(
internal class MapAction<T : Any, R : Any>(
outputType: KType,
private val block: MapActionBuilder<T, R>.() -> Unit,
) : AbstractAction<T, R>(outputType) {
private fun DataSetBuilder<R>.mapOne(name: Name, data: Data<T>, meta: Meta) {
private fun DataSink<R>.mapOne(name: Name, data: Data<T>, meta: Meta) {
// Creating a new environment for action using **old** name, old meta and task meta
val env = ActionEnv(name, data.meta, meta)
@ -66,6 +64,7 @@ internal class MapAction<in T : Any, R : Any>(
name,
data.meta.toMutableMeta(), // using data meta
meta,
data.type,
outputType
).apply(block)
@ -80,16 +79,15 @@ internal class MapAction<in T : Any, R : Any>(
builder.result(env, data.await())
}
//setting the data node
data(newName, newData)
put(newName, newData)
}
override fun DataSetBuilder<R>.generate(data: DataSet<T>, meta: Meta) {
override fun DataSink<R>.generate(data: DataTree<T>, meta: Meta) {
data.forEach { mapOne(it.name, it.data, meta) }
}
override fun DataSourceBuilder<R>.update(dataSet: DataSet<T>, meta: Meta, updateKey: Name) {
remove(updateKey)
dataSet[updateKey]?.let { mapOne(updateKey, it, meta) }
override fun DataSink<R>.update(source: DataTree<T>, meta: Meta, namedData: NamedData<T>) {
mapOne(namedData.name, namedData.data, namedData.meta)
}
}

View File

@ -14,7 +14,7 @@ import kotlin.reflect.typeOf
public class JoinGroup<T : Any, R : Any>(
public var name: String,
internal val set: DataSet<T>,
internal val set: DataTree<T>,
@PublishedApi internal var outputType: KType,
) {
@ -39,7 +39,7 @@ public class ReduceGroupBuilder<T : Any, R : Any>(
public val actionMeta: Meta,
private val outputType: KType,
) {
private val groupRules: MutableList<(DataSet<T>) -> List<JoinGroup<T, R>>> = ArrayList();
private val groupRules: MutableList<(DataTree<T>) -> List<JoinGroup<T, R>>> = ArrayList();
/**
* introduce grouping by meta value
@ -54,12 +54,12 @@ public class ReduceGroupBuilder<T : Any, R : Any>(
public fun group(
groupName: String,
predicate: (Name, Meta) -> Boolean,
predicate: DataFilter,
action: JoinGroup<T, R>.() -> Unit,
) {
groupRules += { source ->
listOf(
JoinGroup<T, R>(groupName, source.filter(predicate), outputType).apply(action)
JoinGroup<T, R>(groupName, source.filterData(predicate), outputType).apply(action)
)
}
}
@ -73,7 +73,7 @@ public class ReduceGroupBuilder<T : Any, R : Any>(
}
}
internal fun buildGroups(input: DataSet<T>): List<JoinGroup<T, R>> =
internal fun buildGroups(input: DataTree<T>): List<JoinGroup<T, R>> =
groupRules.flatMap { it.invoke(input) }
}
@ -85,7 +85,7 @@ internal class ReduceAction<T : Any, R : Any>(
) : AbstractAction<T, R>(outputType) {
//TODO optimize reduction. Currently, the whole action recalculates on push
override fun DataSetBuilder<R>.generate(data: DataSet<T>, meta: Meta) {
override fun DataSink<R>.generate(data: DataTree<T>, meta: Meta) {
ReduceGroupBuilder<T, R>(meta, outputType).apply(action).buildGroups(data).forEach { group ->
val dataFlow: Map<Name, Data<T>> = group.set.asSequence().fold(HashMap()) { acc, value ->
acc.apply {
@ -103,7 +103,7 @@ internal class ReduceAction<T : Any, R : Any>(
meta = groupMeta
) { group.result.invoke(env, it) }
data(env.name, res)
put(env.name, res)
}
}
}

View File

@ -49,7 +49,7 @@ internal class SplitAction<T : Any, R : Any>(
private val action: SplitBuilder<T, R>.() -> Unit,
) : AbstractAction<T, R>(outputType) {
private fun DataSetBuilder<R>.splitOne(name: Name, data: Data<T>, meta: Meta) {
private fun DataSink<R>.splitOne(name: Name, data: Data<T>, meta: Meta) {
val laminate = Laminate(data.meta, meta)
val split = SplitBuilder<T, R>(name, data.meta).apply(action)
@ -64,7 +64,7 @@ internal class SplitAction<T : Any, R : Any>(
).apply(rule)
//data.map<R>(outputType, meta = env.meta) { env.result(it) }.named(fragmentName)
data(
put(
fragmentName,
@Suppress("OPT_IN_USAGE") Data(outputType, meta = env.meta, dependencies = listOf(data)) {
env.result(data.await())
@ -73,13 +73,12 @@ internal class SplitAction<T : Any, R : Any>(
}
}
override fun DataSetBuilder<R>.generate(data: DataSet<T>, meta: Meta) {
override fun DataSink<R>.generate(data: DataTree<T>, meta: Meta) {
data.forEach { splitOne(it.name, it.data, meta) }
}
override fun DataSourceBuilder<R>.update(dataSet: DataSet<T>, meta: Meta, updateKey: Name) {
remove(updateKey)
dataSet[updateKey]?.let { splitOne(updateKey, it, meta) }
override fun DataSink<R>.update(source: DataTree<T>, meta: Meta, namedData: NamedData<T>) {
splitOne(namedData.name, namedData.data, namedData.meta)
}
}

View File

@ -41,7 +41,7 @@ public interface Data<out T> : Goal<T>, MetaRepr {
*/
internal val TYPE_OF_NOTHING: KType = typeOf<Unit>()
public inline fun <reified T : Any> static(
public inline fun <reified T> static(
value: T,
meta: Meta = Meta.EMPTY,
): Data<T> = StaticData(typeOf<T>(), value, meta)
@ -69,37 +69,37 @@ public interface Data<out T> : Goal<T>, MetaRepr {
* A lazily computed variant of [Data] based on [LazyGoal]
* One must ensure that proper [type] is used so this method should not be used
*/
private class LazyData<T : Any>(
private class LazyData<T>(
override val type: KType,
override val meta: Meta = Meta.EMPTY,
additionalContext: CoroutineContext = EmptyCoroutineContext,
dependencies: Collection<Goal<*>> = emptyList(),
dependencies: Iterable<Goal<*>> = emptyList(),
block: suspend () -> T,
) : Data<T>, LazyGoal<T>(additionalContext, dependencies, block)
public class StaticData<T : Any>(
public class StaticData<T>(
override val type: KType,
value: T,
override val meta: Meta = Meta.EMPTY,
) : Data<T>, StaticGoal<T>(value)
@Suppress("FunctionName")
public inline fun <reified T : Any> Data(value: T, meta: Meta = Meta.EMPTY): StaticData<T> =
public inline fun <reified T> Data(value: T, meta: Meta = Meta.EMPTY): StaticData<T> =
StaticData(typeOf<T>(), value, meta)
@DFInternal
public fun <T : Any> Data(
public fun <T> Data(
type: KType,
meta: Meta = Meta.EMPTY,
context: CoroutineContext = EmptyCoroutineContext,
dependencies: Collection<Goal<*>> = emptyList(),
dependencies: Iterable<Goal<*>> = emptyList(),
block: suspend () -> T,
): Data<T> = LazyData(type, meta, context, dependencies, block)
@OptIn(DFInternal::class)
public inline fun <reified T : Any> Data(
public inline fun <reified T> Data(
meta: Meta = Meta.EMPTY,
context: CoroutineContext = EmptyCoroutineContext,
dependencies: Collection<Goal<*>> = emptyList(),
dependencies: Iterable<Goal<*>> = emptyList(),
noinline block: suspend () -> T,
): Data<T> = Data(typeOf<T>(), meta, context, dependencies, block)

View File

@ -0,0 +1,89 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import kotlin.reflect.KType
public fun interface DataFilter {
public fun accepts(name: Name, meta: Meta, type: KType): Boolean
public companion object {
public val EMPTY: DataFilter = DataFilter { _, _, _ -> true }
}
}
public fun DataFilter.accepts(data: NamedData<*>): Boolean = accepts(data.name, data.meta, data.type)
public fun <T> Sequence<NamedData<T>>.filterData(predicate: DataFilter): Sequence<NamedData<T>> = filter { data ->
predicate.accepts(data)
}
public fun <T> Flow<NamedData<T>>.filterData(predicate: DataFilter): Flow<NamedData<T>> = filter { data ->
predicate.accepts(data)
}
public fun <T> DataSource<T>.filterData(
predicate: DataFilter,
): DataSource<T> = object : DataSource<T> {
override val dataType: KType get() = this@filterData.dataType
override fun read(name: Name): Data<T>? =
this@filterData.read(name)?.takeIf { predicate.accepts(name, it.meta, it.type) }
}
/**
* Stateless filtered [ObservableDataSource]
*/
public fun <T> ObservableDataSource<T>.filterData(
predicate: DataFilter,
): ObservableDataSource<T> = object : ObservableDataSource<T> {
override fun updates(): Flow<NamedData<T>> = this@filterData.updates().filter { predicate.accepts(it) }
override val dataType: KType get() = this@filterData.dataType
override fun read(name: Name): Data<T>? =
this@filterData.read(name)?.takeIf { predicate.accepts(name, it.meta, it.type) }
}
public fun <T> GenericDataTree<T, *>.filterData(
predicate: DataFilter,
): DataTree<T> = asSequence().filterData(predicate).toTree(dataType)
public fun <T> GenericObservableDataTree<T, *>.filterData(
scope: CoroutineScope,
predicate: DataFilter,
): ObservableDataTree<T> = asSequence().filterData(predicate).toObservableTree(dataType, scope, updates().filterData(predicate))
///**
// * Generate a wrapper data set with a given name prefix appended to all names
// */
//public fun <T : Any> DataTree<T>.withNamePrefix(prefix: Name): DataSet<T> = if (prefix.isEmpty()) {
// this
//} else object : DataSource<T> {
//
// override val dataType: KType get() = this@withNamePrefix.dataType
//
// override val coroutineContext: CoroutineContext
// get() = (this@withNamePrefix as? DataSource)?.coroutineContext ?: EmptyCoroutineContext
//
// override val meta: Meta get() = this@withNamePrefix.meta
//
//
// override fun iterator(): Iterator<NamedData<T>> = iterator {
// for (d in this@withNamePrefix) {
// yield(d.data.named(prefix + d.name))
// }
// }
//
// override fun get(name: Name): Data<T>? =
// name.removeFirstOrNull(name)?.let { this@withNamePrefix.get(it) }
//
// override val updates: Flow<Name> get() = this@withNamePrefix.updates.map { prefix + it }
//}
//

View File

@ -1,124 +0,0 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.mapNotNull
import space.kscience.dataforge.data.Data.Companion.TYPE_OF_NOTHING
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.endsWith
import space.kscience.dataforge.names.parseAsName
import kotlin.reflect.KType
public interface DataSet<out T : Any> {
/**
* The minimal common ancestor to all data in the node
*/
public val dataType: KType
/**
* Meta-data associated with this node. If no meta is provided, returns [Meta.EMPTY].
*/
public val meta: Meta
/**
* Traverse this [DataSet] returning named data instances. The order is not guaranteed.
*/
public operator fun iterator(): Iterator<NamedData<T>>
/**
* Get data with given name.
*/
public operator fun get(name: Name): Data<T>?
public companion object {
public val META_KEY: Name = "@meta".asName()
/**
* An empty [DataSet] that suits all types
*/
public val EMPTY: DataSet<Nothing> = object : DataSet<Nothing> {
override val dataType: KType = TYPE_OF_NOTHING
override val meta: Meta get() = Meta.EMPTY
override fun iterator(): Iterator<NamedData<Nothing>> = emptySequence<NamedData<Nothing>>().iterator()
override fun get(name: Name): Data<Nothing>? = null
}
}
}
public fun <T : Any> DataSet<T>.asSequence(): Sequence<NamedData<T>> = object : Sequence<NamedData<T>> {
override fun iterator(): Iterator<NamedData<T>> = this@asSequence.iterator()
}
/**
* Return a single [Data] in this [DataSet]. Throw error if it is not single.
*/
public fun <T : Any> DataSet<T>.single(): NamedData<T> = asSequence().single()
public fun <T : Any> DataSet<T>.asIterable(): Iterable<NamedData<T>> = object : Iterable<NamedData<T>> {
override fun iterator(): Iterator<NamedData<T>> = this@asIterable.iterator()
}
public operator fun <T : Any> DataSet<T>.get(name: String): Data<T>? = get(name.parseAsName())
/**
* A [DataSet] with propagated updates.
*/
public interface DataSource<out T : Any> : DataSet<T>, CoroutineScope {
/**
* A flow of updated item names. Updates are propagated in a form of [Flow] of names of updated nodes.
* Those can include new data items and replacement of existing ones. The replaced items could update existing data content
* and replace it completely, so they should be pulled again.
*
*/
public val updates: Flow<Name>
/**
* Stop generating updates from this [DataSource]
*/
public fun close() {
coroutineContext[Job]?.cancel()
}
}
public val <T : Any> DataSet<T>.updates: Flow<Name> get() = if (this is DataSource) updates else emptyFlow()
//
///**
// * Flow all data nodes with names starting with [branchName]
// */
//public fun <T : Any> DataSet<T>.children(branchName: Name): Sequence<NamedData<T>> =
// this@children.asSequence().filter {
// it.name.startsWith(branchName)
// }
/**
* Start computation for all goals in data node and return a job for the whole node
*/
public fun <T : Any> DataSet<T>.startAll(coroutineScope: CoroutineScope): Job = coroutineScope.launch {
asIterable().map {
it.launch(this@launch)
}.joinAll()
}
public suspend fun <T : Any> DataSet<T>.computeAndJoinAll(): Unit = coroutineScope { startAll(this).join() }
public fun DataSet<*>.toMeta(): Meta = Meta {
forEach {
if (it.name.endsWith(DataSet.META_KEY)) {
set(it.name, it.meta)
} else {
it.name put {
"type" put it.type.toString()
"meta" put it.meta
}
}
}
}
public val <T : Any> DataSet<T>.updatesWithData: Flow<NamedData<T>> get() = updates.mapNotNull { get(it)?.named(it) }

View File

@ -1,165 +0,0 @@
package space.kscience.dataforge.data
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.isEmpty
import space.kscience.dataforge.names.plus
import kotlin.reflect.KType
public interface DataSetBuilder<in T : Any> {
public val dataType: KType
/**
* Remove all data items starting with [name]
*/
public fun remove(name: Name)
public fun data(name: Name, data: Data<T>?)
/**
* Set a current state of given [dataSet] into a branch [name]. Does not propagate updates
*/
public fun node(name: Name, dataSet: DataSet<T>) {
//remove previous items
if (name != Name.EMPTY) {
remove(name)
}
//Set new items
dataSet.forEach {
data(name + it.name, it.data)
}
}
/**
* Set meta for the given node
*/
public fun meta(name: Name, meta: Meta)
}
/**
* Define meta in this [DataSet]
*/
public fun <T : Any> DataSetBuilder<T>.meta(value: Meta): Unit = meta(Name.EMPTY, value)
/**
* Define meta in this [DataSet]
*/
public fun <T : Any> DataSetBuilder<T>.meta(mutableMeta: MutableMeta.() -> Unit): Unit = meta(Meta(mutableMeta))
@PublishedApi
internal class SubSetBuilder<in T : Any>(
private val parent: DataSetBuilder<T>,
private val branch: Name,
) : DataSetBuilder<T> {
override val dataType: KType get() = parent.dataType
override fun remove(name: Name) {
parent.remove(branch + name)
}
override fun data(name: Name, data: Data<T>?) {
parent.data(branch + name, data)
}
override fun node(name: Name, dataSet: DataSet<T>) {
parent.node(branch + name, dataSet)
}
override fun meta(name: Name, meta: Meta) {
parent.meta(branch + name, meta)
}
}
public inline fun <T : Any> DataSetBuilder<T>.node(
name: Name,
crossinline block: DataSetBuilder<T>.() -> Unit,
) {
if (name.isEmpty()) block() else SubSetBuilder(this, name).block()
}
public fun <T : Any> DataSetBuilder<T>.data(name: String, value: Data<T>) {
data(Name.parse(name), value)
}
public fun <T : Any> DataSetBuilder<T>.node(name: String, set: DataSet<T>) {
node(Name.parse(name), set)
}
public inline fun <T : Any> DataSetBuilder<T>.node(
name: String,
crossinline block: DataSetBuilder<T>.() -> Unit,
): Unit = node(Name.parse(name), block)
public fun <T : Any> DataSetBuilder<T>.set(value: NamedData<T>) {
data(value.name, value.data)
}
/**
* Produce lazy [Data] and emit it into the [DataSetBuilder]
*/
public inline fun <reified T : Any> DataSetBuilder<T>.produce(
name: String,
meta: Meta = Meta.EMPTY,
noinline producer: suspend () -> T,
) {
val data = Data(meta, block = producer)
data(name, data)
}
public inline fun <reified T : Any> DataSetBuilder<T>.produce(
name: Name,
meta: Meta = Meta.EMPTY,
noinline producer: suspend () -> T,
) {
val data = Data(meta, block = producer)
data(name, data)
}
/**
* Emit a static data with the fixed value
*/
public inline fun <reified T : Any> DataSetBuilder<T>.static(
name: String,
data: T,
meta: Meta = Meta.EMPTY,
): Unit = data(name, Data.static(data, meta))
public inline fun <reified T : Any> DataSetBuilder<T>.static(
name: Name,
data: T,
meta: Meta = Meta.EMPTY,
): Unit = data(name, Data.static(data, meta))
public inline fun <reified T : Any> DataSetBuilder<T>.static(
name: String,
data: T,
mutableMeta: MutableMeta.() -> Unit,
): Unit = data(Name.parse(name), Data.static(data, Meta(mutableMeta)))
/**
* Update data with given node data and meta with node meta.
*/
@DFExperimental
public fun <T : Any> DataSetBuilder<T>.populateFrom(tree: DataSet<T>): Unit {
tree.forEach {
//TODO check if the place is occupied
data(it.name, it.data)
}
}
//public fun <T : Any> DataSetBuilder<T>.populateFrom(flow: Flow<NamedData<T>>) {
// flow.collect {
// data(it.name, it.data)
// }
//}
public fun <T : Any> DataSetBuilder<T>.populateFrom(sequence: Sequence<NamedData<T>>) {
sequence.forEach {
data(it.name, it.data)
}
}

View File

@ -0,0 +1,330 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.*
import kotlin.contracts.contract
import kotlin.reflect.KType
import kotlin.reflect.typeOf
/**
* A generic data provider
*/
public interface DataSource<out T> {
/**
* The minimal common ancestor to all data in the node
*/
public val dataType: KType
/**
* Get data with given name. Or null if it is not present
*/
public fun read(name: Name): Data<T>?
}
/**
* A data provider with possible dynamic updates
*/
public interface ObservableDataSource<out T> : DataSource<T> {
/**
* Flow updates made to the data
*/
public fun updates(): Flow<NamedData<T>>
}
/**
* A tree like structure for data holding
*/
public interface GenericDataTree<out T, out TR : GenericDataTree<T, TR>> : DataSource<T> {
public val self: TR
public val data: Data<T>?
public val items: Map<NameToken, TR>
override fun read(name: Name): Data<T>? = when (name.length) {
0 -> data
else -> items[name.first()]?.read(name.cutFirst())
}
public companion object {
private object EmptyDataTree : GenericDataTree<Nothing, EmptyDataTree> {
override val self: EmptyDataTree get() = this
override val data: Data<Nothing>? = null
override val items: Map<NameToken, EmptyDataTree> = emptyMap()
override val dataType: KType = typeOf<Unit>()
override fun read(name: Name): Data<Nothing>? = null
}
public val EMPTY: GenericDataTree<Nothing, *> = EmptyDataTree
}
}
public typealias DataTree<T> = GenericDataTree<T, GenericDataTree<T, *>>
/**
* Return a single data in this tree. Throw error if it is not single.
*/
public fun <T> DataTree<T>.single(): NamedData<T> = asSequence().single()
/**
* An alias for easier access to tree values
*/
public operator fun <T> DataTree<T>.get(name: Name): Data<T>? = read(name)
public operator fun <T> DataTree<T>.get(name: String): Data<T>? = read(name.parseAsName())
/**
* Return a sequence of all data items in this tree.
* This method does not take updates into account.
*/
public fun <T> DataTree<T>.asSequence(
namePrefix: Name = Name.EMPTY,
): Sequence<NamedData<T>> = sequence {
data?.let { yield(it.named(namePrefix)) }
items.forEach { (token, tree) ->
yieldAll(tree.asSequence(namePrefix + token))
}
}
public val DataTree<*>.meta: Meta? get() = data?.meta
/**
* Provide subtree if it exists
*/
public tailrec fun <T, TR : GenericDataTree<T, TR>> GenericDataTree<T, TR>.branch(name: Name): TR? =
when (name.length) {
0 -> self
1 -> items[name.first()]
else -> items[name.first()]?.branch(name.cutFirst())
}
public fun <T, TR : GenericDataTree<T, TR>> GenericDataTree<T, TR>.branch(name: String): TR? =
branch(name.parseAsName())
public fun GenericDataTree<*, *>.isEmpty(): Boolean = data == null && items.isEmpty()
@PublishedApi
internal class FlatDataTree<T>(
override val dataType: KType,
private val dataSet: Map<Name, Data<T>>,
private val prefix: Name,
) : GenericDataTree<T, FlatDataTree<T>> {
override val self: FlatDataTree<T> get() = this
override val data: Data<T>? get() = dataSet[prefix]
override val items: Map<NameToken, FlatDataTree<T>>
get() = dataSet.keys
.filter { it.startsWith(prefix) && it.length > prefix.length }
.map { it.tokens[prefix.length] }
.associateWith { FlatDataTree(dataType, dataSet, prefix + it) }
override fun read(name: Name): Data<T>? = dataSet[prefix + name]
}
/**
* Represent this flat data map as a [DataTree] without copying it
*/
public inline fun <reified T> Map<Name, Data<T>>.asTree(): DataTree<T> = FlatDataTree(typeOf<T>(), this, Name.EMPTY)
internal fun <T> Sequence<NamedData<T>>.toTree(type: KType): DataTree<T> =
FlatDataTree(type, associate { it.name to it.data }, Name.EMPTY)
/**
* Collect a sequence of [NamedData] to a [DataTree]
*/
public inline fun <reified T> Sequence<NamedData<T>>.toTree(): DataTree<T> =
FlatDataTree(typeOf<T>(), associate { it.name to it.data }, Name.EMPTY)
public interface GenericObservableDataTree<out T, out TR : GenericObservableDataTree<T, TR>> :
GenericDataTree<T, TR>, ObservableDataSource<T>, AutoCloseable {
/**
* A scope that is used to propagate updates. When this scope is closed, no new updates could arrive.
*/
public val updatesScope: CoroutineScope
/**
* Close this data tree updates channel
*/
override fun close() {
updatesScope.cancel()
}
}
public typealias ObservableDataTree<T> = GenericObservableDataTree<T, GenericObservableDataTree<T, *>>
/**
* Check if the [DataTree] is observable
*/
public fun <T> DataTree<T>.isObservable(): Boolean {
contract {
returns(true) implies (this@isObservable is GenericObservableDataTree<T, *>)
}
return this is GenericObservableDataTree<T, *>
}
/**
* Wait for this data tree to stop spawning updates (updatesScope is closed).
* If this [DataTree] is not observable, return immediately.
*/
public suspend fun <T> DataTree<T>.awaitClose() {
if (isObservable()) {
updatesScope.coroutineContext[Job]?.join()
}
}
public fun <T> DataTree<T>.updates(): Flow<NamedData<T>> =
if (this is GenericObservableDataTree<T, *>) updates() else emptyFlow()
public fun interface DataSink<in T> {
public fun put(name: Name, data: Data<T>?)
}
@DFInternal
public class DataTreeBuilder<T>(private val type: KType) : DataSink<T> {
private val map = HashMap<Name, Data<T>>()
override fun put(name: Name, data: Data<T>?) {
if (data == null) {
map.remove(name)
} else {
map[name] = data
}
}
public fun build(): DataTree<T> = FlatDataTree(type, map, Name.EMPTY)
}
@DFInternal
public inline fun <T> DataTree(
dataType: KType,
generator: DataSink<T>.() -> Unit,
): DataTree<T> = DataTreeBuilder<T>(dataType).apply(generator).build()
/**
* Create and a data tree.
*/
@OptIn(DFInternal::class)
public inline fun <reified T> DataTree(
generator: DataSink<T>.() -> Unit,
): DataTree<T> = DataTreeBuilder<T>(typeOf<T>()).apply(generator).build()
/**
* A mutable version of [GenericDataTree]
*/
public interface MutableDataTree<T> : GenericObservableDataTree<T, MutableDataTree<T>>, DataSink<T> {
override var data: Data<T>?
override val items: Map<NameToken, MutableDataTree<T>>
public fun getOrCreateItem(token: NameToken): MutableDataTree<T>
public operator fun set(token: NameToken, data: Data<T>?)
override fun put(name: Name, data: Data<T>?): Unit = set(name, data)
}
public tailrec operator fun <T> MutableDataTree<T>.set(name: Name, data: Data<T>?): Unit {
when (name.length) {
0 -> this.data = data
1 -> set(name.first(), data)
else -> getOrCreateItem(name.first())[name.cutFirst()] = data
}
}
private class MutableDataTreeImpl<T>(
override val dataType: KType,
override val updatesScope: CoroutineScope,
) : MutableDataTree<T> {
private val updates = MutableSharedFlow<NamedData<T>>()
private val children = HashMap<NameToken, MutableDataTree<T>>()
override var data: Data<T>? = null
set(value) {
if (!updatesScope.isActive) error("Can't send updates to closed MutableDataTree")
field = value
if (value != null) {
updatesScope.launch {
updates.emit(value.named(Name.EMPTY))
}
}
}
override val items: Map<NameToken, MutableDataTree<T>> get() = children
override fun getOrCreateItem(token: NameToken): MutableDataTree<T> = children.getOrPut(token){
MutableDataTreeImpl(dataType, updatesScope)
}
override val self: MutableDataTree<T> get() = this
override fun set(token: NameToken, data: Data<T>?) {
if (!updatesScope.isActive) error("Can't send updates to closed MutableDataTree")
val subTree = getOrCreateItem(token)
subTree.updates().onEach {
updates.emit(it.named(token + it.name))
}.launchIn(updatesScope)
subTree.data = data
}
override fun updates(): Flow<NamedData<T>> = updates
}
/**
* Create a new [MutableDataTree]
*
* @param parentScope a [CoroutineScope] to control data propagation. By default uses [GlobalScope]
*/
@OptIn(DelicateCoroutinesApi::class)
public fun <T> MutableDataTree(
type: KType,
parentScope: CoroutineScope = GlobalScope,
): MutableDataTree<T> = MutableDataTreeImpl<T>(
type,
CoroutineScope(parentScope.coroutineContext + Job(parentScope.coroutineContext[Job]))
)
/**
* Create and initialize a observable mutable data tree.
*/
@OptIn(DelicateCoroutinesApi::class)
public inline fun <reified T> MutableDataTree(
parentScope: CoroutineScope = GlobalScope,
generator: MutableDataTree<T>.() -> Unit = {},
): MutableDataTree<T> = MutableDataTree<T>(typeOf<T>(), parentScope).apply { generator() }
//@DFInternal
//public fun <T> ObservableDataTree(
// type: KType,
// scope: CoroutineScope,
// generator: suspend MutableDataTree<T>.() -> Unit = {},
//): ObservableDataTree<T> = MutableDataTree<T>(type, scope.coroutineContext).apply(generator)
public inline fun <reified T> ObservableDataTree(
parentScope: CoroutineScope,
generator: MutableDataTree<T>.() -> Unit = {},
): ObservableDataTree<T> = MutableDataTree<T>(typeOf<T>(), parentScope).apply(generator)
/**
* Collect a [Sequence] into an observable tree with additional [updates]
*/
public fun <T> Sequence<NamedData<T>>.toObservableTree(
dataType: KType,
parentScope: CoroutineScope,
updates: Flow<NamedData<T>>,
): ObservableDataTree<T> = MutableDataTree<T>(dataType, parentScope).apply {
this.putAll(this@toObservableTree)
updates.onEach {
put(it.name, it.data)
}.launchIn(updatesScope)
}

View File

@ -1,119 +0,0 @@
package space.kscience.dataforge.data
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.misc.DfType
import space.kscience.dataforge.names.*
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.reflect.KType
import kotlin.reflect.typeOf
public sealed class DataTreeItem<out T : Any> {
public abstract val meta: Meta
public class Node<out T : Any>(public val tree: DataTree<T>) : DataTreeItem<T>() {
override val meta: Meta get() = tree.meta
}
public class Leaf<out T : Any>(public val data: Data<T>) : DataTreeItem<T>() {
override val meta: Meta get() = data.meta
}
}
public val <T : Any> DataTreeItem<T>.type: KType
get() = when (this) {
is DataTreeItem.Node -> tree.dataType
is DataTreeItem.Leaf -> data.type
}
/**
* A tree-like [DataSet] grouped into the node. All data inside the node must inherit its type
*/
@DfType(DataTree.TYPE)
public interface DataTree<out T : Any> : DataSet<T> {
/**
* Top-level children items of this [DataTree]
*/
public val items: Map<NameToken, DataTreeItem<T>>
override val meta: Meta get() = items[META_ITEM_NAME_TOKEN]?.meta ?: Meta.EMPTY
override fun iterator(): Iterator<NamedData<T>> = iterator {
items.forEach { (token, childItem: DataTreeItem<T>) ->
if (!token.body.startsWith("@")) {
when (childItem) {
is DataTreeItem.Leaf -> yield(childItem.data.named(token.asName()))
is DataTreeItem.Node -> yieldAll(childItem.tree.asSequence().map { it.named(token + it.name) })
}
}
}
}
override fun get(name: Name): Data<T>? = when (name.length) {
0 -> null
1 -> items[name.firstOrNull()!!].data
else -> items[name.firstOrNull()!!].tree?.get(name.cutFirst())
}
public companion object {
public const val TYPE: String = "dataTree"
/**
* A name token used to designate tree node meta
*/
public val META_ITEM_NAME_TOKEN: NameToken = NameToken("@meta")
@DFInternal
public fun <T : Any> emptyWithType(type: KType, meta: Meta = Meta.EMPTY): DataTree<T> = object : DataTree<T> {
override val items: Map<NameToken, DataTreeItem<T>> get() = emptyMap()
override val dataType: KType get() = type
override val meta: Meta get() = meta
}
@OptIn(DFInternal::class)
public inline fun <reified T : Any> empty(meta: Meta = Meta.EMPTY): DataTree<T> =
emptyWithType<T>(typeOf<T>(), meta)
}
}
public fun <T : Any> DataTree<T>.listChildren(prefix: Name): List<Name> =
getItem(prefix).tree?.items?.keys?.map { prefix + it } ?: emptyList()
/**
* Get a [DataTreeItem] with given [name] or null if the item does not exist
*/
public tailrec fun <T : Any> DataTree<T>.getItem(name: Name): DataTreeItem<T>? = when (name.length) {
0 -> DataTreeItem.Node(this)
1 -> items[name.firstOrNull()]
else -> items[name.firstOrNull()!!].tree?.getItem(name.cutFirst())
}
public val <T : Any> DataTreeItem<T>?.tree: DataTree<T>? get() = (this as? DataTreeItem.Node<T>)?.tree
public val <T : Any> DataTreeItem<T>?.data: Data<T>? get() = (this as? DataTreeItem.Leaf<T>)?.data
/**
* A [Sequence] of all children including nodes
*/
public fun <T : Any> DataTree<T>.traverseItems(): Sequence<Pair<Name, DataTreeItem<T>>> = sequence {
items.forEach { (head, item) ->
yield(head.asName() to item)
if (item is DataTreeItem.Node) {
val subSequence = item.tree.traverseItems()
.map { (name, data) -> (head.asName() + name) to data }
yieldAll(subSequence)
}
}
}
/**
* Get a branch of this [DataTree] with a given [branchName].
* The difference from similar method for [DataSet] is that internal logic is more simple and the return value is a [DataTree]
*/
@OptIn(DFInternal::class)
public fun <T : Any> DataTree<T>.branch(branchName: Name): DataTree<T> =
getItem(branchName)?.tree ?: DataTree.emptyWithType(dataType)
public fun <T : Any> DataTree<T>.branch(branchName: String): DataTree<T> = branch(branchName.parseAsName())

View File

@ -1,121 +0,0 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.misc.ThreadSafe
import space.kscience.dataforge.names.*
import kotlin.collections.set
import kotlin.coroutines.CoroutineContext
import kotlin.reflect.KType
import kotlin.reflect.typeOf
public interface DataSourceBuilder<T : Any> : DataSetBuilder<T>, DataSource<T> {
override val updates: MutableSharedFlow<Name>
}
/**
* A mutable [DataTree] that propagates updates
*/
public class DataTreeBuilder<T : Any> internal constructor(
override val dataType: KType,
coroutineContext: CoroutineContext,
) : DataTree<T>, DataSourceBuilder<T> {
override val coroutineContext: CoroutineContext =
coroutineContext + Job(coroutineContext[Job]) + GoalExecutionRestriction()
private val treeItems = HashMap<NameToken, DataTreeItem<T>>()
override val items: Map<NameToken, DataTreeItem<T>>
get() = treeItems.filter { !it.key.body.startsWith("@") }
override val updates: MutableSharedFlow<Name> = MutableSharedFlow<Name>()
@ThreadSafe
private fun remove(token: NameToken) {
if (treeItems.remove(token) != null) {
launch {
updates.emit(token.asName())
}
}
}
override fun remove(name: Name) {
if (name.isEmpty()) error("Can't remove the root node")
(getItem(name.cutLast()).tree as? DataTreeBuilder)?.remove(name.lastOrNull()!!)
}
@ThreadSafe
private fun set(token: NameToken, data: Data<T>) {
treeItems[token] = DataTreeItem.Leaf(data)
}
@ThreadSafe
private fun set(token: NameToken, node: DataTree<T>) {
treeItems[token] = DataTreeItem.Node(node)
}
private fun getOrCreateNode(token: NameToken): DataTreeBuilder<T> =
(treeItems[token] as? DataTreeItem.Node<T>)?.tree as? DataTreeBuilder<T>
?: DataTreeBuilder<T>(dataType, coroutineContext).also { set(token, it) }
private fun getOrCreateNode(name: Name): DataTreeBuilder<T> = when (name.length) {
0 -> this
1 -> getOrCreateNode(name.firstOrNull()!!)
else -> getOrCreateNode(name.firstOrNull()!!).getOrCreateNode(name.cutFirst())
}
override fun data(name: Name, data: Data<T>?) {
if (data == null) {
remove(name)
} else {
when (name.length) {
0 -> error("Can't add data with empty name")
1 -> set(name.firstOrNull()!!, data)
2 -> getOrCreateNode(name.cutLast()).set(name.lastOrNull()!!, data)
}
}
launch {
updates.emit(name)
}
}
override fun meta(name: Name, meta: Meta) {
val item = getItem(name)
if (item is DataTreeItem.Leaf) error("TODO: Can't change meta of existing leaf item.")
data(name + DataTree.META_ITEM_NAME_TOKEN, Data.empty(meta))
}
}
/**
* Create a dynamic [DataSource]. Initial data is placed synchronously.
*/
@DFInternal
@Suppress("FunctionName")
public fun <T : Any> DataSource(
type: KType,
parent: CoroutineScope,
block: DataSourceBuilder<T>.() -> Unit = {},
): DataTreeBuilder<T> = DataTreeBuilder<T>(type, parent.coroutineContext).apply(block)
@Suppress("OPT_IN_USAGE", "FunctionName")
public inline fun <reified T : Any> DataSource(
parent: CoroutineScope,
crossinline block: DataSourceBuilder<T>.() -> Unit = {},
): DataTreeBuilder<T> = DataSource(typeOf<T>(), parent) { block() }
public inline fun <reified T : Any> DataSourceBuilder<T>.emit(
name: Name,
parent: CoroutineScope,
noinline block: DataSourceBuilder<T>.() -> Unit,
): Unit = node(name, DataSource(parent, block))
public inline fun <reified T : Any> DataSourceBuilder<T>.emit(
name: String,
parent: CoroutineScope,
noinline block: DataSourceBuilder<T>.() -> Unit,
): Unit = node(Name.parse(name), DataSource(parent, block))

View File

@ -9,7 +9,7 @@ import kotlin.coroutines.EmptyCoroutineContext
* Lazy computation result with its dependencies to allowing to stat computing dependencies ahead of time
*/
public interface Goal<out T> {
public val dependencies: Collection<Goal<*>>
public val dependencies: Iterable<Goal<*>>
/**
* Returns current running coroutine if the goal is started. Null if the computation is not started.
@ -54,7 +54,7 @@ public open class StaticGoal<T>(public val value: T) : Goal<T> {
*/
public open class LazyGoal<T>(
private val coroutineContext: CoroutineContext = EmptyCoroutineContext,
override val dependencies: Collection<Goal<*>> = emptyList(),
override val dependencies: Iterable<Goal<*>> = emptyList(),
public val block: suspend () -> T,
) : Goal<T> {
@ -82,8 +82,8 @@ public open class LazyGoal<T>(
}
log?.emit { "Starting dependencies computation for ${this@LazyGoal}" }
val startedDependencies = this.dependencies.map { goal ->
goal.run { async(coroutineScope) }
val startedDependencies = dependencies.map { goal ->
goal.async(coroutineScope)
}
return deferred ?: coroutineScope.async(
coroutineContext

View File

@ -15,13 +15,12 @@
*/
package space.kscience.dataforge.data
import kotlinx.coroutines.launch
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFInternal
public interface GroupRule {
public fun <T : Any> gather(set: DataSet<T>): Map<String, DataSet<T>>
public fun <T : Any> gather(set: DataTree<T>): Map<String, DataTree<T>>
public companion object {
/**
@ -39,39 +38,17 @@ public interface GroupRule {
): GroupRule = object : GroupRule {
override fun <T : Any> gather(
set: DataSet<T>,
): Map<String, DataSet<T>> {
val map = HashMap<String, DataSet<T>>()
set: DataTree<T>,
): Map<String, DataTree<T>> {
val map = HashMap<String, DataTreeBuilder<T>>()
if (set is DataSource) {
set.forEach { data ->
val tagValue: String = data.meta[key]?.string ?: defaultTagValue
(map.getOrPut(tagValue) { DataTreeBuilder(set.dataType, set.coroutineContext) } as DataTreeBuilder<T>)
.data(data.name, data.data)
set.launch {
set.updates.collect { name ->
val dataUpdate = set[name]
val updateTagValue = dataUpdate?.meta?.get(key)?.string ?: defaultTagValue
map.getOrPut(updateTagValue) {
DataSource(set.dataType, this) {
data(name, dataUpdate)
}
}
}
}
}
} else {
set.forEach { data ->
val tagValue: String = data.meta[key]?.string ?: defaultTagValue
(map.getOrPut(tagValue) { StaticDataTree(set.dataType) } as StaticDataTree<T>)
.data(data.name, data.data)
}
set.forEach { data ->
val tagValue: String = data.meta[key]?.string ?: defaultTagValue
map.getOrPut(tagValue) { DataTreeBuilder(set.dataType) }.put(data.name, data.data)
}
return map
return map.mapValues { it.value.build() }
}
}
}

View File

@ -4,7 +4,7 @@ import space.kscience.dataforge.meta.isEmpty
import space.kscience.dataforge.misc.Named
import space.kscience.dataforge.names.Name
public interface NamedData<out T : Any> : Named, Data<T> {
public interface NamedData<out T> : Named, Data<T> {
override val name: Name
public val data: Data<T>
}
@ -12,7 +12,7 @@ public interface NamedData<out T : Any> : Named, Data<T> {
public operator fun NamedData<*>.component1(): Name = name
public operator fun <T: Any> NamedData<T>.component2(): Data<T> = data
private class NamedDataImpl<out T : Any>(
private class NamedDataImpl<T>(
override val name: Name,
override val data: Data<T>,
) : Data<T> by data, NamedData<T> {
@ -28,7 +28,7 @@ private class NamedDataImpl<out T : Any>(
}
}
public fun <T : Any> Data<T>.named(name: Name): NamedData<T> = if (this is NamedData) {
public fun <T> Data<T>.named(name: Name): NamedData<T> = if (this is NamedData) {
NamedDataImpl(name, this.data)
} else {
NamedDataImpl(name, this)

View File

@ -1,80 +0,0 @@
package space.kscience.dataforge.data
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.*
import kotlin.reflect.KType
import kotlin.reflect.typeOf
@PublishedApi
internal class StaticDataTree<T : Any>(
override val dataType: KType,
) : DataSetBuilder<T>, DataTree<T> {
private val _items: MutableMap<NameToken, DataTreeItem<T>> = HashMap()
override val items: Map<NameToken, DataTreeItem<T>>
get() = _items.filter { !it.key.body.startsWith("@") }
override fun remove(name: Name) {
when (name.length) {
0 -> error("Can't remove root tree node")
1 -> _items.remove(name.firstOrNull()!!)
else -> (_items[name.firstOrNull()!!].tree as? StaticDataTree<T>)?.remove(name.cutFirst())
}
}
private fun getOrCreateNode(name: Name): StaticDataTree<T> = when (name.length) {
0 -> this
1 -> {
val itemName = name.firstOrNull()!!
(_items[itemName].tree as? StaticDataTree<T>) ?: StaticDataTree<T>(dataType).also {
_items[itemName] = DataTreeItem.Node(it)
}
}
else -> getOrCreateNode(name.cutLast()).getOrCreateNode(name.lastOrNull()!!.asName())
}
private fun set(name: Name, item: DataTreeItem<T>?) {
if (name.isEmpty()) error("Can't set top level tree node")
if (item == null) {
remove(name)
} else {
getOrCreateNode(name.cutLast())._items[name.lastOrNull()!!] = item
}
}
override fun data(name: Name, data: Data<T>?) {
set(name, data?.let { DataTreeItem.Leaf(it) })
}
override fun node(name: Name, dataSet: DataSet<T>) {
if (dataSet is StaticDataTree) {
set(name, DataTreeItem.Node(dataSet))
} else {
dataSet.forEach {
data(name + it.name, it.data)
}
}
}
override fun meta(name: Name, meta: Meta) {
val item = getItem(name)
if (item is DataTreeItem.Leaf) TODO("Can't change meta of existing leaf item.")
data(name + DataTree.META_ITEM_NAME_TOKEN, Data.empty(meta))
}
}
public inline fun <T : Any> DataTree(
dataType: KType,
block: DataSetBuilder<T>.() -> Unit,
): DataTree<T> = StaticDataTree<T>(dataType).apply { block() }
public inline fun <reified T : Any> DataTree(
noinline block: DataSetBuilder<T>.() -> Unit,
): DataTree<T> = DataTree(typeOf<T>(), block)
@OptIn(DFExperimental::class)
public fun <T : Any> DataSet<T>.seal(): DataTree<T> = DataTree(dataType) {
populateFrom(this@seal)
}

View File

@ -0,0 +1,132 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.isEmpty
import space.kscience.dataforge.names.plus
public fun <T> DataSink<T>.put(value: NamedData<T>) {
put(value.name, value.data)
}
public fun <T> DataSink<T>.branch(dataTree: DataTree<T>) {
putAll(dataTree.asSequence())
}
public inline fun <T> DataSink<T>.branch(
prefix: Name,
block: DataSink<T>.() -> Unit,
) {
if (prefix.isEmpty()) {
apply(block)
} else {
val proxyDataSink = DataSink { nameWithoutPrefix, data ->
this.put(prefix + nameWithoutPrefix, data)
}
proxyDataSink.apply(block)
}
}
public inline fun <T> DataSink<T>.branch(
prefix: String,
block: DataSink<T>.() -> Unit,
): Unit = branch(prefix.asName(), block)
public fun <T> DataSink<T>.put(name: String, value: Data<T>) {
put(Name.parse(name), value)
}
public fun <T> DataSink<T>.branch(name: Name, set: DataTree<T>) {
branch(name) { putAll(set.asSequence()) }
}
public fun <T> DataSink<T>.branch(name: String, set: DataTree<T>) {
branch(Name.parse(name)) { putAll(set.asSequence()) }
}
/**
* Produce lazy [Data] and emit it into the [MutableDataTree]
*/
public inline fun <reified T> DataSink<T>.put(
name: String,
meta: Meta = Meta.EMPTY,
noinline producer: suspend () -> T,
) {
val data = Data(meta, block = producer)
put(name, data)
}
public inline fun <reified T> DataSink<T>.put(
name: Name,
meta: Meta = Meta.EMPTY,
noinline producer: suspend () -> T,
) {
val data = Data(meta, block = producer)
put(name, data)
}
/**
* Emit static data with the fixed value
*/
public inline fun <reified T> DataSink<T>.wrap(
name: String,
data: T,
meta: Meta = Meta.EMPTY,
): Unit = put(name, Data.static(data, meta))
public inline fun <reified T> DataSink<T>.wrap(
name: Name,
data: T,
meta: Meta = Meta.EMPTY,
): Unit = put(name, Data.static(data, meta))
public inline fun <reified T> DataSink<T>.wrap(
name: String,
data: T,
mutableMeta: MutableMeta.() -> Unit,
): Unit = put(Name.parse(name), Data.static(data, Meta(mutableMeta)))
public fun <T> DataSink<T>.putAll(sequence: Sequence<NamedData<T>>) {
sequence.forEach {
put(it.name, it.data)
}
}
public fun <T> DataSink<T>.putAll(tree: DataTree<T>) {
this.putAll(tree.asSequence())
}
/**
* Update data with given node data and meta with node meta.
*/
@DFExperimental
public fun <T> MutableDataTree<T>.putAll(source: DataTree<T>) {
source.forEach {
put(it.name, it.data)
}
}
/**
* Copy given data set and mirror its changes to this [DataSink] in [this@setAndObserve]. Returns an update [Job]
*/
public fun <T : Any> DataSink<T>.watchBranch(
name: Name,
dataSet: ObservableDataTree<T>,
): Job {
branch(name, dataSet)
return dataSet.updates().onEach {
put(name + it.name, it.data)
}.launchIn(dataSet.updatesScope)
}

View File

@ -1,105 +0,0 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KType
/**
* A stateless filtered [DataSet]
*/
public fun <T : Any> DataSet<T>.filter(
predicate: (Name, Meta) -> Boolean,
): DataSource<T> = object : DataSource<T> {
override val dataType: KType get() = this@filter.dataType
override val coroutineContext: CoroutineContext
get() = (this@filter as? DataSource)?.coroutineContext ?: EmptyCoroutineContext
override val meta: Meta get() = this@filter.meta
override fun iterator(): Iterator<NamedData<T>> = iterator {
for (d in this@filter) {
if (predicate(d.name, d.meta)) {
yield(d)
}
}
}
override fun get(name: Name): Data<T>? = this@filter.get(name)?.takeIf {
predicate(name, it.meta)
}
override val updates: Flow<Name> = this@filter.updates.filter flowFilter@{ name ->
val theData = this@filter[name] ?: return@flowFilter false
predicate(name, theData.meta)
}
}
/**
* Generate a wrapper data set with a given name prefix appended to all names
*/
public fun <T : Any> DataSet<T>.withNamePrefix(prefix: Name): DataSet<T> = if (prefix.isEmpty()) {
this
} else object : DataSource<T> {
override val dataType: KType get() = this@withNamePrefix.dataType
override val coroutineContext: CoroutineContext
get() = (this@withNamePrefix as? DataSource)?.coroutineContext ?: EmptyCoroutineContext
override val meta: Meta get() = this@withNamePrefix.meta
override fun iterator(): Iterator<NamedData<T>> = iterator {
for (d in this@withNamePrefix) {
yield(d.data.named(prefix + d.name))
}
}
override fun get(name: Name): Data<T>? =
name.removeFirstOrNull(name)?.let { this@withNamePrefix.get(it) }
override val updates: Flow<Name> get() = this@withNamePrefix.updates.map { prefix + it }
}
/**
* Get a subset of data starting with a given [branchName]
*/
public fun <T : Any> DataSet<T>.branch(branchName: Name): DataSet<T> = if (branchName.isEmpty()) {
this
} else object : DataSource<T> {
override val dataType: KType get() = this@branch.dataType
override val coroutineContext: CoroutineContext
get() = (this@branch as? DataSource)?.coroutineContext ?: EmptyCoroutineContext
override val meta: Meta get() = this@branch.meta
override fun iterator(): Iterator<NamedData<T>> = iterator {
for (d in this@branch) {
d.name.removeFirstOrNull(branchName)?.let { name ->
yield(d.data.named(name))
}
}
}
override fun get(name: Name): Data<T>? = this@branch.get(branchName + name)
override val updates: Flow<Name> get() = this@branch.updates.mapNotNull { it.removeFirstOrNull(branchName) }
}
public fun <T : Any> DataSet<T>.branch(branchName: String): DataSet<T> = this@branch.branch(branchName.parseAsName())
@DFExperimental
public suspend fun <T : Any> DataSet<T>.rootData(): Data<T>? = get(Name.EMPTY)

View File

@ -9,14 +9,14 @@ import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KType
import kotlin.reflect.typeOf
public data class ValueWithMeta<T>(val meta: Meta, val value: T)
public data class ValueWithMeta<T>(val value: T, val meta: Meta)
public suspend fun <T : Any> Data<T>.awaitWithMeta(): ValueWithMeta<T> = ValueWithMeta(meta, await())
public suspend fun <T> Data<T>.awaitWithMeta(): ValueWithMeta<T> = ValueWithMeta(await(), meta)
public data class NamedValueWithMeta<T>(val name: Name, val meta: Meta, val value: T)
public data class NamedValueWithMeta<T>(val name: Name, val value: T, val meta: Meta)
public suspend fun <T : Any> NamedData<T>.awaitWithMeta(): NamedValueWithMeta<T> =
NamedValueWithMeta(name, meta, await())
public suspend fun <T> NamedData<T>.awaitWithMeta(): NamedValueWithMeta<T> =
NamedValueWithMeta(name, await(), meta)
/**
@ -25,7 +25,7 @@ public suspend fun <T : Any> NamedData<T>.awaitWithMeta(): NamedValueWithMeta<T>
* @param meta for the resulting data. By default equals input data.
* @param block the transformation itself
*/
public inline fun <T : Any, reified R : Any> Data<T>.map(
public inline fun <T, reified R> Data<T>.transform(
meta: Meta = this.meta,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline block: suspend (T) -> R,
@ -36,7 +36,7 @@ public inline fun <T : Any, reified R : Any> Data<T>.map(
/**
* Combine this data with the other data using [block]. See [Data::map] for other details
*/
public inline fun <T1 : Any, T2 : Any, reified R : Any> Data<T1>.combine(
public inline fun <T1, T2, reified R> Data<T1>.combine(
other: Data<T2>,
meta: Meta = this.meta,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -58,20 +58,7 @@ internal fun Iterable<Data<*>>.joinMeta(): Meta = Meta {
}
}
/**
* Lazily reduce a collection of [Data] to a single data.
*/
public inline fun <T : Any, reified R : Any> Collection<Data<T>>.reduceToData(
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline block: suspend (List<ValueWithMeta<T>>) -> R,
): Data<R> = Data(
meta,
coroutineContext,
this
) {
block(map { it.awaitWithMeta() })
}
@PublishedApi
internal fun Map<*, Data<*>>.joinMeta(): Meta = Meta {
@ -82,7 +69,7 @@ internal fun Map<*, Data<*>>.joinMeta(): Meta = Meta {
}
@DFInternal
public fun <K, T : Any, R : Any> Map<K, Data<T>>.reduceToData(
public fun <K, T, R> Map<K, Data<T>>.reduceToData(
outputType: KType,
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -102,7 +89,7 @@ public fun <K, T : Any, R : Any> Map<K, Data<T>>.reduceToData(
* @param T type of the input goal
* @param R type of the result goal
*/
public inline fun <K, T : Any, reified R : Any> Map<K, Data<T>>.reduceToData(
public inline fun <K, T, reified R> Map<K, Data<T>>.reduceToData(
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline block: suspend (Map<K, ValueWithMeta<T>>) -> R,
@ -117,7 +104,7 @@ public inline fun <K, T : Any, reified R : Any> Map<K, Data<T>>.reduceToData(
//Iterable operations
@DFInternal
public inline fun <T : Any, R : Any> Iterable<Data<T>>.reduceToData(
public inline fun <T, R> Iterable<Data<T>>.reduceToData(
outputType: KType,
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -132,7 +119,7 @@ public inline fun <T : Any, R : Any> Iterable<Data<T>>.reduceToData(
}
@OptIn(DFInternal::class)
public inline fun <T : Any, reified R : Any> Iterable<Data<T>>.reduceToData(
public inline fun <T, reified R> Iterable<Data<T>>.reduceToData(
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline transformation: suspend (Collection<ValueWithMeta<T>>) -> R,
@ -140,7 +127,7 @@ public inline fun <T : Any, reified R : Any> Iterable<Data<T>>.reduceToData(
transformation(it)
}
public inline fun <T : Any, reified R : Any> Iterable<Data<T>>.foldToData(
public inline fun <T, reified R> Iterable<Data<T>>.foldToData(
initial: R,
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -155,7 +142,7 @@ public inline fun <T : Any, reified R : Any> Iterable<Data<T>>.foldToData(
* Transform an [Iterable] of [NamedData] to a single [Data].
*/
@DFInternal
public inline fun <T : Any, R : Any> Iterable<NamedData<T>>.reduceNamedToData(
public inline fun <T, R> Iterable<NamedData<T>>.reduceNamedToData(
outputType: KType,
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -170,7 +157,7 @@ public inline fun <T : Any, R : Any> Iterable<NamedData<T>>.reduceNamedToData(
}
@OptIn(DFInternal::class)
public inline fun <T : Any, reified R : Any> Iterable<NamedData<T>>.reduceNamedToData(
public inline fun <T, reified R> Iterable<NamedData<T>>.reduceNamedToData(
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline transformation: suspend (Collection<NamedValueWithMeta<T>>) -> R,
@ -181,7 +168,7 @@ public inline fun <T : Any, reified R : Any> Iterable<NamedData<T>>.reduceNamedT
/**
* Fold a [Iterable] of named data into a single [Data]
*/
public inline fun <T : Any, reified R : Any> Iterable<NamedData<T>>.foldNamedToData(
public inline fun <T, reified R> Iterable<NamedData<T>>.foldNamedToData(
initial: R,
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -195,53 +182,52 @@ public inline fun <T : Any, reified R : Any> Iterable<NamedData<T>>.foldNamedToD
//DataSet operations
@DFInternal
public suspend fun <T : Any, R : Any> DataSet<T>.map(
public suspend fun <T, R> DataTree<T>.transform(
outputType: KType,
metaTransform: MutableMeta.() -> Unit = {},
coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: suspend (NamedValueWithMeta<T>) -> R,
): DataTree<R> = DataTree<R>(outputType) {
forEach {
val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal()
val d = Data(outputType, newMeta, coroutineContext, listOf(it)) {
block(it.awaitWithMeta())
): DataTree<R> = DataTree<R>(outputType){
//quasi-synchronous processing of elements in the tree
asSequence().forEach { namedData: NamedData<T> ->
val newMeta = namedData.meta.toMutableMeta().apply(metaTransform).seal()
val d = Data(outputType, newMeta, coroutineContext, listOf(namedData)) {
block(namedData.awaitWithMeta())
}
data(it.name, d)
put(namedData.name, d)
}
}
@OptIn(DFInternal::class)
public suspend inline fun <T : Any, reified R : Any> DataSet<T>.map(
public suspend inline fun <T, reified R> DataTree<T>.transform(
noinline metaTransform: MutableMeta.() -> Unit = {},
coroutineContext: CoroutineContext = EmptyCoroutineContext,
noinline block: suspend (NamedValueWithMeta<T>) -> R,
): DataTree<R> = map(typeOf<R>(), metaTransform, coroutineContext, block)
): DataTree<R> = this@transform.transform(typeOf<R>(), metaTransform, coroutineContext, block)
public inline fun <T : Any> DataSet<T>.forEach(block: (NamedData<T>) -> Unit) {
for (d in this) {
block(d)
}
public inline fun <T> DataTree<T>.forEach(block: (NamedData<T>) -> Unit) {
asSequence().forEach(block)
}
// DataSet reduction
@PublishedApi
internal fun DataSet<*>.joinMeta(): Meta = Meta {
forEach { (key, data) ->
val token = NameToken("data", key.toString())
set(token, data.meta)
internal fun DataTree<*>.joinMeta(): Meta = Meta {
asSequence().forEach {
val token = NameToken("data", it.name.toString())
set(token, it.meta)
}
}
public inline fun <T : Any, reified R : Any> DataSet<T>.reduceToData(
public inline fun <T, reified R> DataTree<T>.reduceToData(
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline transformation: suspend (Iterable<NamedValueWithMeta<T>>) -> R,
): Data<R> = asIterable().reduceNamedToData(meta, coroutineContext, transformation)
): Data<R> = asSequence().asIterable().reduceNamedToData(meta, coroutineContext, transformation)
public inline fun <T : Any, reified R : Any> DataSet<T>.foldToData(
public inline fun <T, reified R> DataTree<T>.foldToData(
initial: R,
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline block: suspend (result: R, data: NamedValueWithMeta<T>) -> R,
): Data<R> = asIterable().foldNamedToData(initial, meta, coroutineContext, block)
): Data<R> = asSequence().asIterable().foldNamedToData(initial, meta, coroutineContext, block)

View File

@ -1,12 +1,10 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.Name
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KType
import kotlin.reflect.full.isSubtypeOf
import kotlin.reflect.typeOf
@ -16,7 +14,7 @@ import kotlin.reflect.typeOf
* Cast the node to given type if the cast is possible or return null
*/
@Suppress("UNCHECKED_CAST")
private fun <R : Any> Data<*>.castOrNull(type: KType): Data<R>? =
private fun <R> Data<*>.castOrNull(type: KType): Data<R>? =
if (!this.type.isSubtypeOf(type)) {
null
} else {
@ -25,61 +23,65 @@ private fun <R : Any> Data<*>.castOrNull(type: KType): Data<R>? =
}
}
@Suppress("UNCHECKED_CAST")
@DFInternal
public fun <R> Sequence<NamedData<*>>.filterByDataType(type: KType): Sequence<NamedData<R>> =
filter { it.type.isSubtypeOf(type) } as Sequence<NamedData<R>>
@Suppress("UNCHECKED_CAST")
@DFInternal
public fun <R> Flow<NamedData<*>>.filterByDataType(type: KType): Flow<NamedData<R>> =
filter { it.type.isSubtypeOf(type) } as Flow<NamedData<R>>
/**
* Select all data matching given type and filters. Does not modify paths
*
* @param predicate addition filtering condition based on item name and meta. By default, accepts all
* @param predicate additional filtering condition based on item name and meta. By default, accepts all
*/
@OptIn(DFExperimental::class)
public fun <R : Any> DataSet<*>.filterByType(
@DFInternal
public fun <R> DataTree<*>.filterByType(
type: KType,
predicate: (name: Name, meta: Meta) -> Boolean = { _, _ -> true },
): DataSource<R> = object : DataSource<R> {
override val dataType = type
override val coroutineContext: CoroutineContext
get() = (this@filterByType as? DataSource)?.coroutineContext ?: EmptyCoroutineContext
override val meta: Meta get() = this@filterByType.meta
private fun checkDatum(name: Name, datum: Data<*>): Boolean = datum.type.isSubtypeOf(type)
&& predicate(name, datum.meta)
override fun iterator(): Iterator<NamedData<R>> = iterator {
for(d in this@filterByType){
if(checkDatum(d.name,d.data)){
@Suppress("UNCHECKED_CAST")
yield(d as NamedData<R>)
}
}
}
override fun get(name: Name): Data<R>? = this@filterByType[name]?.let { datum ->
if (checkDatum(name, datum)) datum.castOrNull(type) else null
}
override val updates: Flow<Name> = this@filterByType.updates.filter { name ->
get(name)?.let { datum ->
checkDatum(name, datum)
} ?: false
}
}
predicate: DataFilter = DataFilter.EMPTY,
): DataTree<R> = asSequence().filterByDataType<R>(type).filterData(predicate).toTree(type)
/**
* Select a single datum of the appropriate type
*/
public inline fun <reified R : Any> DataSet<*>.filterByType(
noinline predicate: (name: Name, meta: Meta) -> Boolean = { _, _ -> true },
): DataSet<R> = filterByType(typeOf<R>(), predicate)
@OptIn(DFInternal::class)
public inline fun <reified R : Any> DataTree<*>.filterByType(
predicate: DataFilter = DataFilter.EMPTY,
): DataTree<R> = filterByType(typeOf<R>(), predicate)
/**
* Select a single datum if it is present and of given [type]
*/
public fun <R : Any> DataSet<*>.getByType(type: KType, name: Name): NamedData<R>? =
public fun <R> DataTree<*>.getByType(type: KType, name: Name): NamedData<R>? =
get(name)?.castOrNull<R>(type)?.named(name)
public inline fun <reified R : Any> DataSet<*>.getByType(name: Name): NamedData<R>? =
public inline fun <reified R : Any> DataTree<*>.getByType(name: Name): NamedData<R>? =
this@getByType.getByType(typeOf<R>(), name)
public inline fun <reified R : Any> DataSet<*>.getByType(name: String): NamedData<R>? =
this@getByType.getByType(typeOf<R>(), Name.parse(name))
public inline fun <reified R : Any> DataTree<*>.getByType(name: String): NamedData<R>? =
this@getByType.getByType(typeOf<R>(), Name.parse(name))
/**
* Select all data matching given type and filters. Does not modify paths
*
* @param predicate additional filtering condition based on item name and meta. By default, accepts all
*/
@DFInternal
public fun <R> ObservableDataTree<*>.filterByType(
type: KType,
scope: CoroutineScope,
predicate: DataFilter = DataFilter.EMPTY,
): ObservableDataTree<R> = asSequence()
.filterByDataType<R>(type)
.filterData(predicate)
.toObservableTree(type, scope, updates().filterByDataType<R>(type).filterData(predicate))
@OptIn(DFInternal::class)
public inline fun <reified R> ObservableDataTree<*>.filterByType(
scope: CoroutineScope,
predicate: DataFilter = DataFilter.EMPTY,
): ObservableDataTree<R> = filterByType(typeOf<R>(),scope,predicate)

View File

@ -1,40 +1,27 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.plus
/**
* Append data to node
*/
context(DataSetBuilder<T>) public infix fun <T : Any> String.put(data: Data<T>): Unit =
data(Name.parse(this), data)
context(DataSink<T>)
public infix fun <T : Any> String.put(data: Data<T>): Unit =
put(Name.parse(this), data)
/**
* Append node
*/
context(DataSetBuilder<T>) public infix fun <T : Any> String.put(dataSet: DataSet<T>): Unit =
node(Name.parse(this), dataSet)
context(DataSink<T>)
public infix fun <T : Any> String.put(dataSet: DataTree<T>): Unit =
branch(this, dataSet)
/**
* Build and append node
*/
context(DataSetBuilder<T>) public infix fun <T : Any> String.put(
block: DataSetBuilder<T>.() -> Unit,
): Unit = node(Name.parse(this), block)
context(DataSink<T>)
public infix fun <T : Any> String.put(
block: DataSink<T>.() -> Unit,
): Unit = branch(Name.parse(this), block)
/**
* Copy given data set and mirror its changes to this [DataTreeBuilder] in [this@setAndObserve]. Returns an update [Job]
*/
context(DataSetBuilder<T>) public fun <T : Any> CoroutineScope.setAndWatch(
name: Name,
dataSet: DataSet<T>,
): Job = launch {
node(name, dataSet)
dataSet.updates.collect { nameInBranch ->
data(name + nameInBranch, dataSet.get(nameInBranch))
}
}

View File

@ -1,7 +1,7 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test
import space.kscience.dataforge.actions.Action
@ -10,13 +10,13 @@ import space.kscience.dataforge.actions.mapping
import space.kscience.dataforge.misc.DFExperimental
import kotlin.test.assertEquals
@OptIn(DFExperimental::class, ExperimentalCoroutinesApi::class)
@OptIn(DFExperimental::class)
internal class ActionsTest {
@Test
fun testStaticMapAction() = runTest {
val data: DataTree<Int> = DataTree {
repeat(10) {
static(it.toString(), it)
wrap(it.toString(), it)
}
}
@ -28,23 +28,26 @@ internal class ActionsTest {
}
@Test
fun testDynamicMapAction() = runTest {
val data: DataSourceBuilder<Int> = DataSource(this)
fun testDynamicMapAction() = runBlocking {
val source: MutableDataTree<Int> = MutableDataTree()
val plusOne = Action.mapping<Int, Int> {
result { it + 1 }
}
val result = plusOne(data)
val result = plusOne(source)
repeat(10) {
data.static(it.toString(), it)
source.wrap(it.toString(), it)
}
delay(20)
delay(10)
source.close()
result.awaitClose()
assertEquals(2, result["1"]?.await())
data.close()
}
}

View File

@ -1,6 +1,8 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.asName
import kotlin.test.Test
@ -9,27 +11,26 @@ import kotlin.test.assertEquals
internal class DataTreeBuilderTest {
@Test
fun testTreeBuild() = runBlocking {
fun testTreeBuild() = runTest {
val node = DataTree<Any> {
"primary" put {
static("a", "a")
static("b", "b")
wrap("a", "a")
wrap("b", "b")
}
static("c.d", "c.d")
static("c.f", "c.f")
}
runBlocking {
assertEquals("a", node["primary.a"]?.await())
assertEquals("b", node["primary.b"]?.await())
assertEquals("c.d", node["c.d"]?.await())
assertEquals("c.f", node["c.f"]?.await())
wrap("c.d", "c.d")
wrap("c.f", "c.f")
}
assertEquals("a", node["primary.a"]?.await())
assertEquals("b", node["primary.b"]?.await())
assertEquals("c.d", node["c.d"]?.await())
assertEquals("c.f", node["c.f"]?.await())
}
@OptIn(DFExperimental::class)
@Test
fun testDataUpdate() = runBlocking {
val updateData: DataTree<Any> = DataTree {
fun testDataUpdate() = runTest {
val updateData = DataTree<Any> {
"update" put {
"a" put Data.static("a")
"b" put Data.static("b")
@ -38,54 +39,30 @@ internal class DataTreeBuilderTest {
val node = DataTree<Any> {
"primary" put {
static("a", "a")
static("b", "b")
wrap("a", "a")
wrap("b", "b")
}
static("root", "root")
populateFrom(updateData)
wrap("root", "root")
putAll(updateData)
}
runBlocking {
assertEquals("a", node["update.a"]?.await())
assertEquals("a", node["primary.a"]?.await())
}
assertEquals("a", node["update.a"]?.await())
assertEquals("a", node["primary.a"]?.await())
}
@Test
fun testDynamicUpdates() = runBlocking {
try {
lateinit var updateJob: Job
supervisorScope {
val subNode = DataSource<Int>(this) {
updateJob = launch {
repeat(10) {
delay(10)
static("value", it)
}
delay(10)
}
}
launch {
subNode.updatesWithData.collect {
println(it)
}
}
val rootNode = DataSource<Int>(this) {
setAndWatch("sub".asName(), subNode)
}
val subNode = MutableDataTree<Int>()
launch {
rootNode.updatesWithData.collect {
println(it)
}
}
updateJob.join()
assertEquals(9, rootNode["sub.value"]?.await())
cancel()
}
} catch (t: Throwable) {
if (t !is CancellationException) throw t
val rootNode = MutableDataTree<Int> {
watchBranch("sub".asName(), subNode)
}
repeat(10) {
subNode.wrap("value[$it]", it)
}
delay(20)
assertEquals(9, rootNode["sub.value[9]"]?.await())
}
}

View File

@ -4,7 +4,7 @@ plugins {
description = "IO module"
val ioVersion = "0.3.0"
val ioVersion = "0.3.1"
kscience {
jvm()

View File

@ -10,10 +10,7 @@ import space.kscience.dataforge.names.asName
import kotlin.reflect.KType
import kotlin.reflect.typeOf
public interface EnvelopeFormat : IOFormat<Envelope> {
override val type: KType get() = typeOf<Envelope>()
}
public interface EnvelopeFormat : IOFormat<Envelope>
public fun EnvelopeFormat.read(input: Source): Envelope = readFrom(input)

View File

@ -17,11 +17,7 @@ import kotlin.reflect.typeOf
/**
* Reader of a custom object from input
*/
public interface IOReader<out T> {
/**
* The type of object being read
*/
public val type: KType
public fun interface IOReader<out T> {
public fun readFrom(source: Source): T
@ -32,7 +28,6 @@ public interface IOReader<out T> {
* no-op reader for binaries.
*/
public val binary: IOReader<Binary> = object : IOReader<Binary> {
override val type: KType = typeOf<Binary>()
override fun readFrom(source: Source): Binary = source.readByteArray().asBinary()
@ -42,8 +37,6 @@ public interface IOReader<out T> {
}
public inline fun <reified T> IOReader(crossinline read: Source.() -> T): IOReader<T> = object : IOReader<T> {
override val type: KType = typeOf<T>()
override fun readFrom(source: Source): T = source.read()
}
@ -56,24 +49,24 @@ public fun interface IOWriter<in T> {
*/
public interface IOFormat<T> : IOReader<T>, IOWriter<T>
public fun <T : Any> Source.readWith(format: IOReader<T>): T = format.readFrom(this)
public fun <T> Source.readWith(format: IOReader<T>): T = format.readFrom(this)
/**
* Read given binary as an object using given format
*/
public fun <T : Any> Binary.readWith(format: IOReader<T>): T = read {
readWith(format)
public fun <T> Binary.readWith(format: IOReader<T>): T = read {
this.readWith(format)
}
/**
* Write an object to the [Sink] with given [format]
*/
public fun <T : Any> Sink.writeWith(format: IOWriter<T>, obj: T): Unit =
public fun <T> Sink.writeWith(format: IOWriter<T>, obj: T): Unit =
format.writeTo(this, obj)
@DfType(IO_FORMAT_TYPE)
public interface IOFormatFactory<T : Any> : Factory<IOFormat<T>>, Named {
public interface IOFormatFactory<T> : Factory<IOFormat<T>>, Named {
/**
* Explicit type for dynamic type checks
*/
@ -86,7 +79,7 @@ public interface IOFormatFactory<T : Any> : Factory<IOFormat<T>>, Named {
}
}
public fun <T : Any> Binary(obj: T, format: IOWriter<T>): Binary = Binary { format.writeTo(this, obj) }
public fun <T> Binary(obj: T, format: IOWriter<T>): Binary = Binary { format.writeTo(this, obj) }
public object FloatIOFormat : IOFormat<Float>, IOFormatFactory<Float> {
override fun build(context: Context, meta: Meta): IOFormat<Float> = this

View File

@ -5,7 +5,6 @@ import space.kscience.dataforge.io.EnvelopeFormatFactory.Companion.ENVELOPE_FORM
import space.kscience.dataforge.io.IOFormatFactory.Companion.IO_FORMAT_TYPE
import space.kscience.dataforge.io.MetaFormatFactory.Companion.META_FORMAT_TYPE
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.Name
@ -21,11 +20,11 @@ public class IOPlugin(meta: Meta) : AbstractPlugin(meta) {
@Suppress("UNCHECKED_CAST")
@DFInternal
public fun <T : Any> resolveIOFormat(type: KType, meta: Meta): IOFormat<T>? =
public fun <T> resolveIOFormat(type: KType, meta: Meta): IOFormat<T>? =
ioFormatFactories.singleOrNull { it.type == type }?.build(context, meta) as? IOFormat<T>
@OptIn(DFInternal::class)
public inline fun <reified T : Any> resolveIOFormat(meta: Meta = Meta.EMPTY): IOFormat<T>? =
public inline fun <reified T> resolveIOFormat(meta: Meta = Meta.EMPTY): IOFormat<T>? =
resolveIOFormat(typeOf<T>(), meta)

View File

@ -21,8 +21,6 @@ import kotlin.reflect.typeOf
*/
public interface MetaFormat : IOFormat<Meta> {
override val type: KType get() = typeOf<Meta>()
override fun writeTo(sink: Sink, obj: Meta) {
writeMeta(sink, obj, null)
}

View File

@ -6,8 +6,6 @@ import kotlinx.serialization.json.encodeToJsonElement
import kotlinx.serialization.serializer
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
import space.kscience.dataforge.misc.DFExperimental
import kotlin.reflect.KType
import kotlin.reflect.typeOf
/**
@ -15,15 +13,10 @@ import kotlin.reflect.typeOf
*/
public interface MetaConverter<T>: MetaSpec<T> {
/**
* Runtime type of [T]
*/
public val type: KType
/**
* A descriptor for resulting meta
*/
override val descriptor: MetaDescriptor get() = MetaDescriptor.EMPTY
override val descriptor: MetaDescriptor? get() = null
/**
* Attempt conversion of [source] to an object or return null if conversion failed
@ -38,22 +31,16 @@ public interface MetaConverter<T>: MetaSpec<T> {
public companion object {
public val meta: MetaConverter<Meta> = object : MetaConverter<Meta> {
override val type: KType = typeOf<Meta>()
override fun readOrNull(source: Meta): Meta = source
override fun convert(obj: Meta): Meta = obj
}
public val value: MetaConverter<Value> = object : MetaConverter<Value> {
override val type: KType = typeOf<Value>()
override fun readOrNull(source: Meta): Value? = source.value
override fun convert(obj: Value): Meta = Meta(obj)
}
public val string: MetaConverter<String> = object : MetaConverter<String> {
override val type: KType = typeOf<String>()
override val descriptor: MetaDescriptor = MetaDescriptor {
valueType(ValueType.STRING)
}
@ -64,8 +51,6 @@ public interface MetaConverter<T>: MetaSpec<T> {
}
public val boolean: MetaConverter<Boolean> = object : MetaConverter<Boolean> {
override val type: KType = typeOf<Boolean>()
override val descriptor: MetaDescriptor = MetaDescriptor {
valueType(ValueType.BOOLEAN)
}
@ -75,8 +60,6 @@ public interface MetaConverter<T>: MetaSpec<T> {
}
public val number: MetaConverter<Number> = object : MetaConverter<Number> {
override val type: KType = typeOf<Number>()
override val descriptor: MetaDescriptor = MetaDescriptor {
valueType(ValueType.NUMBER)
}
@ -86,8 +69,6 @@ public interface MetaConverter<T>: MetaSpec<T> {
}
public val double: MetaConverter<Double> = object : MetaConverter<Double> {
override val type: KType = typeOf<Double>()
override val descriptor: MetaDescriptor = MetaDescriptor {
valueType(ValueType.NUMBER)
}
@ -97,8 +78,6 @@ public interface MetaConverter<T>: MetaSpec<T> {
}
public val float: MetaConverter<Float> = object : MetaConverter<Float> {
override val type: KType = typeOf<Float>()
override val descriptor: MetaDescriptor = MetaDescriptor {
valueType(ValueType.NUMBER)
}
@ -108,8 +87,6 @@ public interface MetaConverter<T>: MetaSpec<T> {
}
public val int: MetaConverter<Int> = object : MetaConverter<Int> {
override val type: KType = typeOf<Int>()
override val descriptor: MetaDescriptor = MetaDescriptor {
valueType(ValueType.NUMBER)
}
@ -119,8 +96,6 @@ public interface MetaConverter<T>: MetaSpec<T> {
}
public val long: MetaConverter<Long> = object : MetaConverter<Long> {
override val type: KType = typeOf<Long>()
override val descriptor: MetaDescriptor = MetaDescriptor {
valueType(ValueType.NUMBER)
}
@ -130,8 +105,6 @@ public interface MetaConverter<T>: MetaSpec<T> {
}
public inline fun <reified E : Enum<E>> enum(): MetaConverter<E> = object : MetaConverter<E> {
override val type: KType = typeOf<E>()
override val descriptor: MetaDescriptor = MetaDescriptor {
valueType(ValueType.STRING)
allowedValues(enumValues<E>())
@ -147,8 +120,6 @@ public interface MetaConverter<T>: MetaSpec<T> {
writer: (T) -> Value = { Value.of(it) },
reader: (Value) -> T,
): MetaConverter<List<T>> = object : MetaConverter<List<T>> {
override val type: KType = typeOf<List<T>>()
override val descriptor: MetaDescriptor = MetaDescriptor {
valueType(ValueType.LIST)
}
@ -165,7 +136,6 @@ public interface MetaConverter<T>: MetaSpec<T> {
public inline fun <reified T> serializable(
descriptor: MetaDescriptor? = null,
): MetaConverter<T> = object : MetaConverter<T> {
override val type: KType = typeOf<T>()
private val serializer: KSerializer<T> = serializer()
override fun readOrNull(source: Meta): T? {

View File

@ -166,9 +166,9 @@ public inline fun <T : Scheme> T.copy(spec: SchemeSpec<T>, block: T.() -> Unit =
/**
* A specification for simplified generation of wrappers
*/
public open class SchemeSpec<out T : Scheme>(
public open class SchemeSpec<T : Scheme>(
private val builder: () -> T,
) : MetaSpec<T> {
) : MetaConverter<T> {
override val descriptor: MetaDescriptor? get() = null
@ -187,6 +187,8 @@ public open class SchemeSpec<out T : Scheme>(
it.initialize(MutableMeta(), Meta.EMPTY, descriptor)
}
override fun convert(obj: T): Meta = obj.meta
/**
* A convenience method to use specifications in builders
*/

View File

@ -113,6 +113,13 @@ public class Name(public val tokens: List<NameToken>) {
}
}
/**
* Transform this [Name] to a string without escaping special characters in tokens.
*
* Parsing it back will produce a valid, but different name
*/
public fun Name.toStringUnescaped(): String = tokens.joinToString(separator = Name.NAME_SEPARATOR) { it.toStringUnescaped() }
public operator fun Name.get(i: Int): NameToken = tokens[i]
/**

View File

@ -1,46 +0,0 @@
package space.kscience.dataforge.workspace
import space.kscience.dataforge.data.DataTree.Companion.META_ITEM_NAME_TOKEN
import space.kscience.dataforge.io.Envelope
import space.kscience.dataforge.io.IOReader
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import kotlin.reflect.KType
public abstract class EnvelopeTask<T : Any>(
override val descriptor: MetaDescriptor?,
private val reader: IOReader<T>,
) : Task<T> {
public abstract suspend fun produceEnvelopes(
workspace: Workspace,
taskName: Name,
taskMeta: Meta,
): Map<Name, Envelope>
override suspend fun execute(workspace: Workspace, taskName: Name, taskMeta: Meta): TaskResult<T> =
Result(workspace, taskName, taskMeta, reader, produceEnvelopes(workspace, taskName, taskMeta))
private class Result<T : Any>(
override val workspace: Workspace,
override val taskName: Name,
override val taskMeta: Meta,
val reader: IOReader<T>,
envelopes: Map<Name, Envelope>,
) : TaskResult<T> {
private val dataMap = envelopes.mapValues {
workspace.wrapData(it.value.toData(reader), it.key, taskName, taskMeta)
}
override val meta: Meta get() = dataMap[META_ITEM_NAME_TOKEN.asName()]?.meta ?: Meta.EMPTY
override val dataType: KType get() = reader.type
override fun iterator(): Iterator<TaskData<T>> = dataMap.values.iterator()
override fun get(name: Name): TaskData<T>? = dataMap[name]
}
}

View File

@ -1,9 +1,9 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.withContext
import space.kscience.dataforge.data.DataSetBuilder
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.DataSink
import space.kscience.dataforge.data.GoalExecutionRestriction
import space.kscience.dataforge.data.MutableDataTree
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaRepr
import space.kscience.dataforge.meta.MetaSpec
@ -20,7 +20,7 @@ import kotlin.reflect.typeOf
* In general no computations should be made until the result is called.
*/
@DfType(TYPE)
public interface Task<out T : Any> : Described {
public interface Task<T> : Described {
/**
* A task identification string used to compare tasks and check task body for change
@ -45,7 +45,7 @@ public interface Task<out T : Any> : Described {
/**
* A [Task] with [MetaSpec] for wrapping and unwrapping task configuration
*/
public interface TaskWithSpec<out T : Any, C : Any> : Task<T> {
public interface TaskWithSpec<T, C : Any> : Task<T> {
public val spec: MetaSpec<C>
override val descriptor: MetaDescriptor? get() = spec.descriptor
@ -61,12 +61,12 @@ public interface TaskWithSpec<out T : Any, C : Any> : Task<T> {
// block: C.() -> Unit = {},
//): TaskResult<T> = execute(workspace, taskName, spec(block))
public class TaskResultBuilder<in T : Any>(
public class TaskResultBuilder<T>(
public val workspace: Workspace,
public val taskName: Name,
public val taskMeta: Meta,
private val dataDrop: DataSetBuilder<T>,
) : DataSetBuilder<T> by dataDrop
private val dataSink: DataSink<T>,
) : DataSink<T> by dataSink
/**
* Create a [Task] that composes a result using [builder]. Only data from the workspace could be used.
@ -88,12 +88,17 @@ public fun <T : Any> Task(
workspace: Workspace,
taskName: Name,
taskMeta: Meta,
): TaskResult<T> = withContext(GoalExecutionRestriction() + workspace.goalLogger) {
): TaskResult<T> {
//TODO use safe builder and check for external data on add and detects cycles
val dataset = DataTree<T>(resultType) {
TaskResultBuilder(workspace, taskName, taskMeta, this).apply { builder() }
val dataset = MutableDataTree<T>(resultType, workspace.context).apply {
TaskResultBuilder(workspace, taskName, taskMeta, this).apply {
withContext(GoalExecutionRestriction() + workspace.goalLogger) {
builder()
}
}
}
workspace.wrapResult(dataset, taskName, taskMeta)
return workspace.wrapResult(dataset, taskName, taskMeta)
}
}
@ -111,6 +116,7 @@ public inline fun <reified T : Any> Task(
* @param specification a specification for task configuration
* @param builder for resulting data set
*/
@Suppress("FunctionName")
public fun <T : Any, C : MetaRepr> Task(
resultType: KType,
@ -126,7 +132,7 @@ public fun <T : Any, C : MetaRepr> Task(
): TaskResult<T> = withContext(GoalExecutionRestriction() + workspace.goalLogger) {
//TODO use safe builder and check for external data on add and detects cycles
val taskMeta = configuration.toMeta()
val dataset = DataTree<T>(resultType) {
val dataset = MutableDataTree<T>(resultType, this).apply {
TaskResultBuilder(workspace, taskName, taskMeta, this).apply { builder(configuration) }
}
workspace.wrapResult(dataset, taskName, taskMeta)

View File

@ -1,50 +0,0 @@
package space.kscience.dataforge.workspace
import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.NamedData
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
/**
* A [Workspace]-locked [NamedData], that serves as a computation model.
*/
public interface TaskData<out T : Any> : NamedData<T> {
/**
* The [Workspace] this data belongs to
*/
public val workspace: Workspace
/**
* The name of the stage that produced this data. [Name.EMPTY] if the workspace intrinsic data is used.
*/
public val taskName: Name
/**
* Stage configuration used to produce this data.
*/
public val taskMeta: Meta
/**
* Dependencies that allow to compute transitive dependencies as well.
*/
// override val dependencies: Collection<TaskData<*>>
}
private class TaskDataImpl<out T : Any>(
override val workspace: Workspace,
override val data: Data<T>,
override val name: Name,
override val taskName: Name,
override val taskMeta: Meta,
) : TaskData<T>, Data<T> by data {
// override val dependencies: Collection<TaskData<*>> = data.dependencies.map {
// it as? TaskData<*> ?: error("TaskData can't depend on external data")
// }
}
/**
* Adopt data into this workspace
*/
public fun <T : Any> Workspace.wrapData(data: Data<T>, name: Name, taskName: Name, taskMeta: Meta): TaskData<T> =
TaskDataImpl(this, data, name, taskName, taskMeta)

View File

@ -1,54 +1,41 @@
package space.kscience.dataforge.workspace
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.forEach
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import space.kscience.dataforge.data.ObservableDataTree
import space.kscience.dataforge.data.asSequence
import space.kscience.dataforge.data.launch
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
/**
* A result of a [Task]
* @param workspace the [Workspace] that produced the result
* @param taskName the name of the task that produced the result
* @param taskMeta The configuration of the task that produced the result
*/
public interface TaskResult<out T : Any> : DataSet<T> {
/**
* The [Workspace] this [DataSet] belongs to
*/
public val workspace: Workspace
/**
* The [Name] of the stage that produced this [DataSet]
*/
public val taskName: Name
/**
* The configuration of the stage that produced this [DataSet]
*/
public val taskMeta: Meta
override fun iterator(): Iterator<TaskData<T>>
override fun get(name: Name): TaskData<T>?
}
private class TaskResultImpl<out T : Any>(
override val workspace: Workspace,
override val taskName: Name,
override val taskMeta: Meta,
val dataSet: DataSet<T>,
) : TaskResult<T>, DataSet<T> by dataSet {
override fun iterator(): Iterator<TaskData<T>> = iterator {
dataSet.forEach {
yield(workspace.wrapData(it, it.name, taskName, taskMeta))
}
}
override fun get(name: Name): TaskData<T>? = dataSet[name]?.let {
workspace.wrapData(it, name, taskName, taskMeta)
}
}
public data class TaskResult<T>(
public val content: ObservableDataTree<T>,
public val workspace: Workspace,
public val taskName: Name,
public val taskMeta: Meta,
) : ObservableDataTree<T> by content
/**
* Wrap data into [TaskResult]
*/
public fun <T : Any> Workspace.wrapResult(dataSet: DataSet<T>, taskName: Name, taskMeta: Meta): TaskResult<T> =
TaskResultImpl(this, taskName, taskMeta, dataSet)
public fun <T> Workspace.wrapResult(data: ObservableDataTree<T>, taskName: Name, taskMeta: Meta): TaskResult<T> =
TaskResult(data, this, taskName, taskMeta)
/**
* Start computation for all data elements of this node.
* The resulting [Job] is completed only when all of them are completed.
*/
public fun TaskResult<*>.launch(scope: CoroutineScope): Job {
val jobs = asSequence().map {
it.data.launch(scope)
}.toList()
return scope.launch { jobs.joinAll() }
}

View File

@ -1,29 +1,32 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.CoroutineScope
import space.kscience.dataforge.context.ContextAware
import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.asSequence
import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.misc.DfType
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.provider.Provider
import kotlin.coroutines.CoroutineContext
public interface DataSelector<T: Any>{
public suspend fun select(workspace: Workspace, meta: Meta): DataSet<T>
public fun interface DataSelector<T> {
public suspend fun select(workspace: Workspace, meta: Meta): DataTree<T>
}
/**
* An environment for pull-mode computation
*/
@DfType(Workspace.TYPE)
public interface Workspace : ContextAware, Provider {
public interface Workspace : ContextAware, Provider, CoroutineScope {
override val coroutineContext: CoroutineContext get() = context.coroutineContext
/**
* The whole data node for current workspace
*/
public val data: TaskResult<*>
public val data: ObservableDataTree<*>
/**
* All targets associated with the workspace
@ -37,7 +40,7 @@ public interface Workspace : ContextAware, Provider {
override fun content(target: String): Map<Name, Any> {
return when (target) {
"target", Meta.TYPE -> targets.mapKeys { Name.parse(it.key)}
"target", Meta.TYPE -> targets.mapKeys { Name.parse(it.key) }
Task.TYPE -> tasks
Data.TYPE -> data.asSequence().associateBy { it.name }
else -> emptyMap()
@ -49,7 +52,7 @@ public interface Workspace : ContextAware, Provider {
return task.execute(this, taskName, taskMeta)
}
public suspend fun produceData(taskName: Name, taskMeta: Meta, name: Name): TaskData<*>? =
public suspend fun produceData(taskName: Name, taskMeta: Meta, name: Name): Data<*>? =
produce(taskName, taskMeta)[name]
public companion object {

View File

@ -5,9 +5,9 @@ import space.kscience.dataforge.actions.Action
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.ContextBuilder
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.DataSource
import space.kscience.dataforge.data.DataSourceBuilder
import space.kscience.dataforge.data.DataSink
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.MutableDataTree
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
import space.kscience.dataforge.meta.descriptors.MetaDescriptorBuilder
@ -17,13 +17,14 @@ import space.kscience.dataforge.names.asName
import kotlin.collections.set
import kotlin.properties.PropertyDelegateProvider
import kotlin.properties.ReadOnlyProperty
import kotlin.reflect.typeOf
public data class TaskReference<T : Any>(public val taskName: Name, public val task: Task<T>) : DataSelector<T> {
public data class TaskReference<T>(public val taskName: Name, public val task: Task<T>) : DataSelector<T> {
@Suppress("UNCHECKED_CAST")
override suspend fun select(workspace: Workspace, meta: Meta): DataSet<T> {
override suspend fun select(workspace: Workspace, meta: Meta): DataTree<T> {
if (workspace.tasks[taskName] == task) {
return workspace.produce(taskName, meta) as TaskResult<T>
return workspace.produce(taskName, meta) as DataTree<T>
} else {
error("Task $taskName does not belong to the workspace")
}
@ -45,7 +46,7 @@ public inline fun <reified T : Any> TaskContainer.registerTask(
): Unit = registerTask(Name.parse(name), Task(MetaDescriptor(descriptorBuilder), builder))
/**
* Create a new t
* Create and register a new task
*/
public inline fun <reified T : Any> TaskContainer.buildTask(
name: String,
@ -109,7 +110,7 @@ public class WorkspaceBuilder(
private val coroutineScope: CoroutineScope = parentContext,
) : TaskContainer {
private var context: Context? = null
private val data = DataSource<Any>(coroutineScope)
private val data = MutableDataTree<Any?>(typeOf<Any?>(), coroutineScope)
private val targets: HashMap<String, Meta> = HashMap()
private val tasks = HashMap<Name, Task<*>>()
private var cache: WorkspaceCache? = null
@ -124,7 +125,7 @@ public class WorkspaceBuilder(
/**
* Define intrinsic data for the workspace
*/
public fun data(builder: DataSourceBuilder<Any>.() -> Unit) {
public fun data(builder: DataSink<Any?>.() -> Unit) {
data.apply(builder)
}
@ -149,7 +150,7 @@ public class WorkspaceBuilder(
public fun build(): Workspace {
val postProcess: suspend (TaskResult<*>) -> TaskResult<*> = { result ->
cache?.evaluate(result) ?: result
cache?.cache(result) ?: result
}
return WorkspaceImpl(context ?: parentContext, data, targets, tasks, postProcess)
}

View File

@ -1,5 +1,5 @@
package space.kscience.dataforge.workspace
public interface WorkspaceCache {
public suspend fun <T : Any> evaluate(result: TaskResult<T>): TaskResult<T>
public suspend fun <T> cache(result: TaskResult<T>): TaskResult<T>
}

View File

@ -2,21 +2,19 @@ package space.kscience.dataforge.workspace
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.gather
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.ObservableDataTree
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
internal class WorkspaceImpl internal constructor(
override val context: Context,
data: DataSet<*>,
override val data: ObservableDataTree<*>,
override val targets: Map<String, Meta>,
tasks: Map<Name, Task<*>>,
private val postProcess: suspend (TaskResult<*>) -> TaskResult<*>,
) : Workspace {
override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY)
override val tasks: Map<Name, Task<*>> by lazy { context.gather<Task<*>>(Task.TYPE) + tasks }
override suspend fun produce(taskName: Name, taskMeta: Meta): TaskResult<*> {

View File

@ -4,13 +4,14 @@ import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.await
import space.kscience.dataforge.io.*
import space.kscience.dataforge.misc.DFInternal
import kotlin.reflect.typeOf
/**
* Convert an [Envelope] to a data via given format. The actual parsing is done lazily.
*/
@OptIn(DFInternal::class)
public fun <T : Any> Envelope.toData(format: IOReader<T>): Data<T> = Data(format.type, meta) {
public inline fun <reified T : Any> Envelope.toData(format: IOReader<T>): Data<T> = Data(typeOf<T>(), meta) {
data?.readWith(format) ?: error("Can't convert envelope without data to Data")
}

View File

@ -2,9 +2,10 @@ package space.kscience.dataforge.workspace
import space.kscience.dataforge.actions.Action
import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.branch
import space.kscience.dataforge.data.forEach
import space.kscience.dataforge.data.map
import space.kscience.dataforge.data.transform
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
@ -24,22 +25,22 @@ public val TaskResultBuilder<*>.defaultDependencyMeta: Meta
* @param selector a workspace data selector. Could be either task selector or initial data selector.
* @param dependencyMeta meta used for selector. The same meta is used for caching. By default, uses [defaultDependencyMeta].
*/
public suspend fun <T : Any> TaskResultBuilder<*>.from(
public suspend fun <T> TaskResultBuilder<*>.from(
selector: DataSelector<T>,
dependencyMeta: Meta = defaultDependencyMeta,
): DataSet<T> = selector.select(workspace, dependencyMeta)
): DataTree<T> = selector.select(workspace, dependencyMeta)
public suspend inline fun <T : Any, reified P : WorkspacePlugin> TaskResultBuilder<*>.from(
public suspend inline fun <T, reified P : WorkspacePlugin> TaskResultBuilder<*>.from(
plugin: P,
dependencyMeta: Meta = defaultDependencyMeta,
selectorBuilder: P.() -> TaskReference<T>,
): DataSet<T> {
): TaskResult<T> {
require(workspace.context.plugins.contains(plugin)) { "Plugin $plugin is not loaded into $workspace" }
val taskReference: TaskReference<T> = plugin.selectorBuilder()
val res = workspace.produce(plugin.name + taskReference.taskName, dependencyMeta)
//TODO add explicit check after https://youtrack.jetbrains.com/issue/KT-32956
@Suppress("UNCHECKED_CAST")
return res as TaskResult<T>
return res as TaskResult<T>
}
/**
@ -49,11 +50,11 @@ public suspend inline fun <T : Any, reified P : WorkspacePlugin> TaskResultBuild
* @param dependencyMeta meta used for selector. The same meta is used for caching. By default, uses [defaultDependencyMeta].
* @param selectorBuilder a builder of task from the plugin.
*/
public suspend inline fun <reified T : Any, reified P : WorkspacePlugin> TaskResultBuilder<*>.from(
public suspend inline fun <reified T, reified P : WorkspacePlugin> TaskResultBuilder<*>.from(
pluginFactory: PluginFactory<P>,
dependencyMeta: Meta = defaultDependencyMeta,
selectorBuilder: P.() -> TaskReference<T>,
): DataSet<T> {
): TaskResult<T> {
val plugin = workspace.context.plugins[pluginFactory]
?: error("Plugin ${pluginFactory.tag} not loaded into workspace context")
val taskReference: TaskReference<T> = plugin.selectorBuilder()
@ -64,9 +65,7 @@ public suspend inline fun <reified T : Any, reified P : WorkspacePlugin> TaskRes
}
public val TaskResultBuilder<*>.allData: DataSelector<*>
get() = object : DataSelector<Any> {
override suspend fun select(workspace: Workspace, meta: Meta): DataSet<Any> = workspace.data
}
get() = DataSelector { workspace, _ -> workspace.data }
/**
* Perform a lazy mapping task using given [selector] and one-to-one [action].
@ -78,7 +77,7 @@ public val TaskResultBuilder<*>.allData: DataSelector<*>
* @param action process individual data asynchronously.
*/
@DFExperimental
public suspend inline fun <T : Any, reified R : Any> TaskResultBuilder<R>.transformEach(
public suspend inline fun <T, reified R> TaskResultBuilder<R>.transformEach(
selector: DataSelector<T>,
dependencyMeta: Meta = defaultDependencyMeta,
dataMetaTransform: MutableMeta.(name: Name) -> Unit = {},
@ -90,31 +89,31 @@ public suspend inline fun <T : Any, reified R : Any> TaskResultBuilder<R>.transf
dataMetaTransform(data.name)
}
val res = data.map(meta, workspace.context.coroutineContext) {
val res = data.transform(meta, workspace.context.coroutineContext) {
action(it, data.name, meta)
}
data(data.name, res)
put(data.name, res)
}
}
/**
* Set given [dataSet] as a task result.
*/
public fun <T : Any> TaskResultBuilder<T>.result(dataSet: DataSet<T>) {
node(Name.EMPTY, dataSet)
public fun <T> TaskResultBuilder<T>.result(dataSet: DataTree<T>) {
branch(dataSet)
}
/**
* Use provided [action] to fill the result
*/
@DFExperimental
public suspend inline fun <T : Any, reified R : Any> TaskResultBuilder<R>.actionFrom(
public suspend inline fun <T, reified R> TaskResultBuilder<R>.actionFrom(
selector: DataSelector<T>,
action: Action<T,R>,
action: Action<T, R>,
dependencyMeta: Meta = defaultDependencyMeta,
) {
node(Name.EMPTY, action.execute(from(selector,dependencyMeta), dependencyMeta))
branch(action.execute(from(selector, dependencyMeta), dependencyMeta))
}

View File

@ -1,5 +1,6 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.flow.map
import kotlinx.io.*
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.KSerializer
@ -9,12 +10,10 @@ import kotlinx.serialization.serializer
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.context.request
import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.await
import space.kscience.dataforge.data.*
import space.kscience.dataforge.io.*
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.withIndex
import java.nio.file.Path
import kotlin.io.path.deleteIfExists
@ -22,7 +21,7 @@ import kotlin.io.path.div
import kotlin.io.path.exists
import kotlin.reflect.KType
public class JsonIOFormat<T : Any>(override val type: KType) : IOFormat<T> {
public class JsonIOFormat<T>(private val type: KType) : IOFormat<T> {
@Suppress("UNCHECKED_CAST")
private val serializer: KSerializer<T> = serializer(type) as KSerializer<T>
@ -35,7 +34,7 @@ public class JsonIOFormat<T : Any>(override val type: KType) : IOFormat<T> {
}
@OptIn(ExperimentalSerializationApi::class)
public class ProtobufIOFormat<T : Any>(override val type: KType) : IOFormat<T> {
public class ProtobufIOFormat<T>(private val type: KType) : IOFormat<T> {
@Suppress("UNCHECKED_CAST")
private val serializer: KSerializer<T> = serializer(type) as KSerializer<T>
@ -53,14 +52,14 @@ public class FileWorkspaceCache(public val cacheDirectory: Path) : WorkspaceCach
// private fun <T : Any> TaskData<*>.checkType(taskType: KType): TaskData<T> = this as TaskData<T>
@OptIn(DFExperimental::class, DFInternal::class)
override suspend fun <T : Any> evaluate(result: TaskResult<T>): TaskResult<T> {
override suspend fun <T> cache(result: TaskResult<T>): TaskResult<T> {
val io = result.workspace.context.request(IOPlugin)
val format: IOFormat<T> = io.resolveIOFormat(result.dataType, result.taskMeta)
?: ProtobufIOFormat(result.dataType)
?: error("Can't resolve IOFormat for ${result.dataType}")
fun evaluateDatum(data: TaskData<T>): TaskData<T> {
fun cacheOne(data: NamedData<T>): NamedData<T> {
val path = cacheDirectory /
result.taskName.withIndex(result.taskMeta.hashCode().toString(16)).toString() /
@ -92,15 +91,14 @@ public class FileWorkspaceCache(public val cacheDirectory: Path) : WorkspaceCach
}
}
return data.workspace.wrapData(datum, data.name, data.taskName, data.taskMeta)
return datum.named(data.name)
}
return object : TaskResult<T> by result {
override fun iterator(): Iterator<TaskData<T>> =
result.iterator().asSequence().map { evaluateDatum(it) }.iterator()
override fun get(name: Name): TaskData<T>? = result[name]?.let { evaluateDatum(it) }
}
val cachedTree = result.asSequence().map { cacheOne(it) }
.toObservableTree(result.dataType, result.workspace, result.updates().map { cacheOne(it) })
return result.workspace.wrapResult(cachedTree, result.taskName, result.taskMeta)
}
}

View File

@ -1,39 +1,39 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.flow.map
import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import kotlin.reflect.KType
import kotlin.reflect.full.isSubtypeOf
private typealias TaskResultId = Pair<Name, Meta>
private data class TaskResultId(val name: Name, val meta: Meta)
public class InMemoryWorkspaceCache : WorkspaceCache {
// never do that at home!
private val cache = HashMap<TaskResultId, HashMap<Name, TaskData<*>>>()
private val cache = HashMap<TaskResultId, HashMap<Name, Data<*>>>()
@Suppress("UNCHECKED_CAST")
private fun <T : Any> TaskData<*>.checkType(taskType: KType): TaskData<T> =
if (type.isSubtypeOf(taskType)) this as TaskData<T>
private fun <T> Data<*>.checkType(taskType: KType): Data<T> =
if (type.isSubtypeOf(taskType)) this as Data<T>
else error("Cached data type mismatch: expected $taskType but got $type")
override suspend fun <T : Any> evaluate(result: TaskResult<T>): TaskResult<T> {
for (d: TaskData<T> in result) {
cache.getOrPut(result.taskName to result.taskMeta) { HashMap() }.getOrPut(d.name) { d }
}
return object : TaskResult<T> by result {
override fun iterator(): Iterator<TaskData<T>> = (cache[result.taskName to result.taskMeta]
?.values?.map { it.checkType<T>(result.dataType) }
?: emptyList()).iterator()
override fun get(name: Name): TaskData<T>? {
val cached: TaskData<*> = cache[result.taskName to result.taskMeta]?.get(name) ?: return null
//TODO check types
return cached.checkType(result.dataType)
override suspend fun <T> cache(result: TaskResult<T>): TaskResult<T> {
fun cacheOne(data: NamedData<T>): NamedData<T> {
val cachedData = cache.getOrPut(TaskResultId(result.taskName, result.taskMeta)){
HashMap()
}.getOrPut(data.name){
data.data
}
return cachedData.checkType<T>(result.dataType).named(data.name)
}
val cachedTree = result.asSequence().map { cacheOne(it) }
.toObservableTree(result.dataType, result.workspace, result.updates().map { cacheOne(it) })
return result.workspace.wrapResult(cachedTree, result.taskName, result.taskMeta)
}
}

View File

@ -1,318 +0,0 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.context.warn
import space.kscience.dataforge.data.*
import space.kscience.dataforge.io.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.copy
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.plus
import space.kscience.dataforge.workspace.FileData.Companion.DEFAULT_IGNORE_EXTENSIONS
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardWatchEventKinds
import java.nio.file.WatchEvent
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.spi.FileSystemProvider
import java.time.Instant
import kotlin.io.path.*
import kotlin.reflect.KType
import kotlin.reflect.typeOf
//public typealias FileFormatResolver<T> = (Path, Meta) -> IOFormat<T>
public typealias FileFormatResolver<T> = (path: Path, meta: Meta) -> IOReader<T>?
/**
* A data based on a filesystem [Path]
*/
public class FileData<T> internal constructor(private val data: Data<T>, public val path: Path) : Data<T> by data {
// public val path: String? get() = meta[META_FILE_PATH_KEY].string
// public val extension: String? get() = meta[META_FILE_EXTENSION_KEY].string
//
public val createdTime: Instant? get() = meta[FILE_CREATE_TIME_KEY].string?.let { Instant.parse(it) }
public val updatedTime: Instant? get() = meta[FILE_UPDATE_TIME_KEY].string?.let { Instant.parse(it) }
public companion object {
public val FILE_KEY: Name = "file".asName()
public val FILE_PATH_KEY: Name = FILE_KEY + "path"
public val FILE_EXTENSION_KEY: Name = FILE_KEY + "extension"
public val FILE_CREATE_TIME_KEY: Name = FILE_KEY + "created"
public val FILE_UPDATE_TIME_KEY: Name = FILE_KEY + "updated"
public const val DF_FILE_EXTENSION: String = "df"
public val DEFAULT_IGNORE_EXTENSIONS: Set<String> = setOf(DF_FILE_EXTENSION)
}
}
/**
* Read data with supported envelope format and binary format. If envelope format is null, then read binary directly from file.
* The operation is blocking since it must read meta header. The reading of envelope body is lazy
*/
@OptIn(DFInternal::class)
@DFExperimental
public fun <T : Any> IOPlugin.readDataFile(
path: Path,
formatResolver: FileFormatResolver<T>,
): FileData<T>? {
val envelope = readEnvelopeFile(path, true)
val format = formatResolver(path, envelope.meta) ?: return null
val updatedMeta = envelope.meta.copy {
FileData.FILE_PATH_KEY put path.toString()
FileData.FILE_EXTENSION_KEY put path.extension
val attributes = path.readAttributes<BasicFileAttributes>()
FileData.FILE_UPDATE_TIME_KEY put attributes.lastModifiedTime().toInstant().toString()
FileData.FILE_CREATE_TIME_KEY put attributes.creationTime().toInstant().toString()
}
return FileData(
Data(format.type, updatedMeta) {
(envelope.data ?: Binary.EMPTY).readWith(format)
},
path
)
}
context(IOPlugin) @DFExperimental
public fun <T : Any> DataSetBuilder<T>.directory(
path: Path,
ignoreExtensions: Set<String>,
formatResolver: FileFormatResolver<T>,
) {
Files.list(path).forEach { childPath ->
val fileName = childPath.fileName.toString()
if (fileName.startsWith(IOPlugin.META_FILE_NAME)) {
meta(readMetaFile(childPath))
} else if (!fileName.startsWith("@")) {
file(childPath, ignoreExtensions, formatResolver)
}
}
}
/**
* Read the directory as a data node. If [path] is a zip archive, read it as directory
*/
@DFExperimental
@DFInternal
public fun <T : Any> IOPlugin.readDataDirectory(
type: KType,
path: Path,
ignoreExtensions: Set<String> = DEFAULT_IGNORE_EXTENSIONS,
formatResolver: FileFormatResolver<T>,
): DataTree<T> {
//read zipped data node
if (path.fileName != null && path.fileName.toString().endsWith(".zip")) {
//Using explicit Zip file system to avoid bizarre compatibility bugs
val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" }
?: error("Zip file system provider not found")
val fs = fsProvider.newFileSystem(path, mapOf("create" to "true"))
return readDataDirectory(type, fs.rootDirectories.first(), ignoreExtensions, formatResolver)
}
if (!Files.isDirectory(path)) error("Provided path $path is not a directory")
return DataTree(type) {
meta {
FileData.FILE_PATH_KEY put path.toString()
}
directory(path, ignoreExtensions, formatResolver)
}
}
@OptIn(DFInternal::class)
@DFExperimental
public inline fun <reified T : Any> IOPlugin.readDataDirectory(
path: Path,
ignoreExtensions: Set<String> = DEFAULT_IGNORE_EXTENSIONS,
noinline formatResolver: FileFormatResolver<T>,
): DataTree<T> = readDataDirectory(typeOf<T>(), path, ignoreExtensions, formatResolver)
/**
* Read a raw binary data tree from the directory. All files are read as-is (save for meta files).
*/
@DFExperimental
public fun IOPlugin.readRawDirectory(
path: Path,
ignoreExtensions: Set<String> = emptySet(),
): DataTree<Binary> = readDataDirectory(path, ignoreExtensions) { _, _ -> IOReader.binary }
private fun Path.toName() = Name(map { NameToken.parse(it.nameWithoutExtension) })
@DFInternal
@DFExperimental
public fun <T : Any> IOPlugin.monitorDataDirectory(
type: KType,
path: Path,
ignoreExtensions: Set<String> = DEFAULT_IGNORE_EXTENSIONS,
formatResolver: FileFormatResolver<T>,
): DataSource<T> {
if (path.fileName.toString().endsWith(".zip")) error("Monitoring not supported for ZipFS")
if (!Files.isDirectory(path)) error("Provided path $path is not a directory")
return DataSource(type, context) {
directory(path, ignoreExtensions, formatResolver)
launch(Dispatchers.IO) {
val watchService = path.fileSystem.newWatchService()
path.register(
watchService,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.ENTRY_CREATE
)
do {
val key = watchService.take()
if (key != null) {
for (event: WatchEvent<*> in key.pollEvents()) {
val eventPath = event.context() as Path
if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
remove(eventPath.toName())
} else {
val fileName = eventPath.fileName.toString()
if (fileName.startsWith(IOPlugin.META_FILE_NAME)) {
meta(readMetaFile(eventPath))
} else if (!fileName.startsWith("@")) {
file(eventPath, ignoreExtensions, formatResolver)
}
}
}
key.reset()
}
} while (isActive && key != null)
}
}
}
/**
* Start monitoring given directory ([path]) as a [DataSource].
*/
@OptIn(DFInternal::class)
@DFExperimental
public inline fun <reified T : Any> IOPlugin.monitorDataDirectory(
path: Path,
ignoreExtensions: Set<String> = DEFAULT_IGNORE_EXTENSIONS,
noinline formatResolver: FileFormatResolver<T>,
): DataSource<T> = monitorDataDirectory(typeOf<T>(), path, ignoreExtensions, formatResolver)
/**
* Read and monitor raw binary data tree from the directory. All files are read as-is (save for meta files).
*/
@DFExperimental
public fun IOPlugin.monitorRawDirectory(
path: Path,
ignoreExtensions: Set<String> = DEFAULT_IGNORE_EXTENSIONS,
): DataSource<Binary> = monitorDataDirectory(path, ignoreExtensions) { _, _ -> IOReader.binary }
/**
* Write data tree to existing directory or create a new one using default [java.nio.file.FileSystem] provider
*/
@DFExperimental
public suspend fun <T : Any> IOPlugin.writeDataDirectory(
path: Path,
tree: DataTree<T>,
format: IOWriter<T>,
envelopeFormat: EnvelopeFormat? = null,
) {
withContext(Dispatchers.IO) {
if (!Files.exists(path)) {
Files.createDirectories(path)
} else if (!Files.isDirectory(path)) {
error("Can't write a node into file")
}
tree.items.forEach { (token, item) ->
val childPath = path.resolve(token.toString())
when (item) {
is DataTreeItem.Node -> {
writeDataDirectory(childPath, item.tree, format, envelopeFormat)
}
is DataTreeItem.Leaf -> {
val envelope = item.data.toEnvelope(format)
if (envelopeFormat != null) {
writeEnvelopeFile(childPath, envelope, envelopeFormat)
} else {
writeEnvelopeDirectory(childPath, envelope)
}
}
}
}
val treeMeta = tree.meta
writeMetaFile(path, treeMeta)
}
}
/**
* Reads the specified resources and returns a [DataTree] containing the data.
*
* @param resources The names of the resources to read.
* @param classLoader The class loader to use for loading the resources. By default, it uses the current thread's context class loader.
* @return A DataTree containing the data read from the resources.
*/
@DFExperimental
private fun IOPlugin.readResources(
vararg resources: String,
classLoader: ClassLoader = Thread.currentThread().contextClassLoader,
): DataTree<Binary> {
// require(resource.isNotBlank()) {"Can't mount root resource tree as data root"}
return DataTree {
resources.forEach { resource ->
val path = classLoader.getResource(resource)?.toURI()?.toPath() ?: error(
"Resource with name $resource is not resolved"
)
node(resource, readRawDirectory(path))
}
}
}
/**
* Add file/directory-based data tree item
*
* @param ignoreExtensions a list of file extensions for which extension should be cut from the resulting item name
*/
context(IOPlugin)
@OptIn(DFInternal::class)
@DFExperimental
public fun <T : Any> DataSetBuilder<T>.file(
path: Path,
ignoreExtensions: Set<String> = DEFAULT_IGNORE_EXTENSIONS,
formatResolver: FileFormatResolver<out T>,
) {
fun defaultPath() = if (path.extension in ignoreExtensions) path.nameWithoutExtension else path.name
try {
//If path is a single file or a special directory, read it as single datum
if (!Files.isDirectory(path) || Files.list(path).allMatch { it.fileName.toString().startsWith("@") }) {
val data = readDataFile(path, formatResolver)
if (data == null) {
logger.warn { "File format is not resolved for $path. Skipping." }
return
}
val name: String = data.meta[Envelope.ENVELOPE_NAME_KEY].string ?: defaultPath()
data(name.asName(), data)
} else {
//otherwise, read as directory
val data: DataTree<T> = readDataDirectory(dataType, path, ignoreExtensions, formatResolver)
val name = data.meta[Envelope.ENVELOPE_NAME_KEY].string ?: defaultPath()
node(name.asName(), data)
}
} catch (ex: Exception) {
logger.error { "Failed to read file or directory at $path: ${ex.message}" }
}
}

View File

@ -0,0 +1,186 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.*
import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.DataSink
import space.kscience.dataforge.data.StaticData
import space.kscience.dataforge.io.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.copy
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.plus
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardWatchEventKinds
import java.nio.file.WatchEvent
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.spi.FileSystemProvider
import kotlin.io.path.*
import kotlin.reflect.typeOf
public object FileData {
public val FILE_KEY: Name = "file".asName()
public val FILE_PATH_KEY: Name = FILE_KEY + "path"
public val FILE_EXTENSION_KEY: Name = FILE_KEY + "extension"
public val FILE_CREATE_TIME_KEY: Name = FILE_KEY + "created"
public val FILE_UPDATE_TIME_KEY: Name = FILE_KEY + "updated"
public const val DF_FILE_EXTENSION: String = "df"
public val DEFAULT_IGNORE_EXTENSIONS: Set<String> = setOf(DF_FILE_EXTENSION)
}
/**
* Read data with supported envelope format and binary format. If the envelope format is null, then read binary directly from file.
* The operation is blocking since it must read the meta header. The reading of envelope body is lazy
*/
@OptIn(DFExperimental::class)
public fun IOPlugin.readFileData(
path: Path,
): Data<Binary> {
val envelope = readEnvelopeFile(path, true)
val updatedMeta = envelope.meta.copy {
FileData.FILE_PATH_KEY put path.toString()
FileData.FILE_EXTENSION_KEY put path.extension
val attributes = path.readAttributes<BasicFileAttributes>()
FileData.FILE_UPDATE_TIME_KEY put attributes.lastModifiedTime().toInstant().toString()
FileData.FILE_CREATE_TIME_KEY put attributes.creationTime().toInstant().toString()
}
return StaticData(
typeOf<Binary>(),
envelope.data ?: Binary.EMPTY,
updatedMeta
)
}
public fun DataSink<Binary>.file(io: IOPlugin, name: Name, path: Path) {
if (!path.isRegularFile()) error("Only regular files could be handled by this function")
put(name, io.readFileData(path))
}
public fun DataSink<Binary>.directory(
io: IOPlugin,
name: Name,
path: Path,
) {
if (!path.isDirectory()) error("Only directories could be handled by this function")
//process root data
var dataBinary: Binary? = null
var meta: Meta? = null
Files.list(path).forEach { childPath ->
val fileName = childPath.fileName.toString()
if (fileName == IOPlugin.DATA_FILE_NAME) {
dataBinary = childPath.asBinary()
} else if (fileName.startsWith(IOPlugin.META_FILE_NAME)) {
meta = io.readMetaFileOrNull(childPath)
} else if (!fileName.startsWith("@")) {
val token = if (childPath.isRegularFile() && childPath.extension in FileData.DEFAULT_IGNORE_EXTENSIONS) {
NameToken(childPath.nameWithoutExtension)
} else {
NameToken(childPath.name)
}
files(io, name + token, childPath)
}
}
//set data if it is relevant
if (dataBinary != null || meta != null) {
put(
name,
StaticData(
typeOf<Binary>(),
dataBinary ?: Binary.EMPTY,
meta ?: Meta.EMPTY
)
)
}
}
public fun DataSink<Binary>.files(
io: IOPlugin,
name: Name,
path: Path,
) {
if (path.isRegularFile() && path.extension == "zip") {
//Using explicit Zip file system to avoid bizarre compatibility bugs
val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" }
?: error("Zip file system provider not found")
val fs = fsProvider.newFileSystem(path, emptyMap<String, Any>())
files(io, name, fs.rootDirectories.first())
}
if (path.isRegularFile()) {
file(io, name, path)
} else {
directory(io, name, path)
}
}
private fun Path.toName() = Name(map { NameToken.parse(it.nameWithoutExtension) })
@DFInternal
@DFExperimental
public fun DataSink<Binary>.monitorFiles(
io: IOPlugin,
name: Name,
path: Path,
scope: CoroutineScope = io.context,
): Job {
files(io, name, path)
return scope.launch(Dispatchers.IO) {
val watchService = path.fileSystem.newWatchService()
path.register(
watchService,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.ENTRY_CREATE
)
do {
val key = watchService.take()
if (key != null) {
for (event: WatchEvent<*> in key.pollEvents()) {
val eventPath = event.context() as Path
if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
put(eventPath.toName(), null)
} else {
val fileName = eventPath.fileName.toString()
if (!fileName.startsWith("@")) {
files(io, name, eventPath)
}
}
}
key.reset()
}
} while (isActive && key != null)
}
}
/**
* @param resources The names of the resources to read.
* @param classLoader The class loader to use for loading the resources. By default, it uses the current thread's context class loader.
*/
@DFExperimental
public fun DataSink<Binary>.resources(
io: IOPlugin,
vararg resources: String,
classLoader: ClassLoader = Thread.currentThread().contextClassLoader,
) {
resources.forEach { resource ->
val path = classLoader.getResource(resource)?.toURI()?.toPath() ?: error(
"Resource with name $resource is not resolved"
)
files(io, resource.asName(), path)
}
}

View File

@ -1,6 +1,6 @@
package space.kscience.dataforge.workspace
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.filterByType
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
@ -16,14 +16,13 @@ import space.kscience.dataforge.names.matches
*/
@OptIn(DFExperimental::class)
public inline fun <reified T : Any> TaskResultBuilder<*>.dataByType(namePattern: Name? = null): DataSelector<T> =
object : DataSelector<T> {
override suspend fun select(workspace: Workspace, meta: Meta): DataSet<T> =
workspace.data.filterByType { name, _ ->
namePattern == null || name.matches(namePattern)
}
DataSelector<T> { workspace, _ ->
workspace.data.filterByType { name, _, _ ->
namePattern == null || name.matches(namePattern)
}
}
public suspend inline fun <reified T : Any> TaskResultBuilder<*>.fromTask(
task: Name,
taskMeta: Meta = Meta.EMPTY,
): DataSet<T> = workspace.produce(task, taskMeta).filterByType()
): DataTree<T> = workspace.produce(task, taskMeta).filterByType()

View File

@ -0,0 +1,72 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import space.kscience.dataforge.data.*
import space.kscience.dataforge.io.*
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.spi.FileSystemProvider
import kotlin.io.path.Path
import kotlin.io.path.createDirectories
import kotlin.io.path.exists
import kotlin.io.path.extension
/**
* Write the data tree to existing directory or create a new one using default [java.nio.file.FileSystem] provider
*
* @param nameToPath a [Name] to [Path] converter used to create
*/
@DFExperimental
public suspend fun <T : Any> IOPlugin.writeDataDirectory(
path: Path,
dataSet: DataTree<T>,
format: IOWriter<T>,
envelopeFormat: EnvelopeFormat? = null,
): Unit = withContext(Dispatchers.IO) {
if (!Files.exists(path)) {
Files.createDirectories(path)
} else if (!Files.isDirectory(path)) {
error("Can't write a node into file")
}
dataSet.forEach { (name, data) ->
val childPath = path.resolve(name.tokens.joinToString("/") { token -> token.toStringUnescaped() })
childPath.parent.createDirectories()
val envelope = data.toEnvelope(format)
if (envelopeFormat != null) {
writeEnvelopeFile(childPath, envelope, envelopeFormat)
} else {
writeEnvelopeDirectory(childPath, envelope)
}
}
dataSet.meta?.let { writeMetaFile(path, it) }
}
/**
* Write this [DataTree] as a zip archive
*/
@DFExperimental
public suspend fun <T : Any> IOPlugin.writeZip(
path: Path,
dataSet: DataTree<T>,
format: IOWriter<T>,
envelopeFormat: EnvelopeFormat? = null,
): Unit = withContext(Dispatchers.IO) {
if (path.exists()) error("Can't override existing zip data file $path")
val actualFile = if (path.extension == "zip") {
path
} else {
path.resolveSibling(path.fileName.toString() + ".zip")
}
val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" }
?: error("Zip file system provider not found")
//val fs = FileSystems.newFileSystem(actualFile, mapOf("create" to true))
val fs = fsProvider.newFileSystem(actualFile, mapOf("create" to true))
fs.use {
writeDataDirectory(fs.rootDirectories.first(), dataSet, format, envelopeFormat)
}
}

View File

@ -1,73 +0,0 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.DataTreeItem
import space.kscience.dataforge.io.*
import space.kscience.dataforge.misc.DFExperimental
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import java.util.zip.ZipEntry
import java.util.zip.ZipOutputStream
private suspend fun <T : Any> ZipOutputStream.writeNode(
name: String,
treeItem: DataTreeItem<T>,
dataFormat: IOFormat<T>,
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat,
): Unit = withContext(Dispatchers.IO) {
when (treeItem) {
is DataTreeItem.Leaf -> {
//TODO add directory-based envelope writer
val envelope = treeItem.data.toEnvelope(dataFormat)
val entry = ZipEntry(name)
putNextEntry(entry)
//TODO remove additional copy
val bytes = ByteArray {
writeWith(envelopeFormat, envelope)
}
write(bytes)
}
is DataTreeItem.Node -> {
val entry = ZipEntry("$name/")
putNextEntry(entry)
closeEntry()
treeItem.tree.items.forEach { (token, item) ->
val childName = "$name/$token"
writeNode(childName, item, dataFormat, envelopeFormat)
}
}
}
}
/**
* Write this [DataTree] as a zip archive
*/
@DFExperimental
public suspend fun <T : Any> DataTree<T>.writeZip(
path: Path,
format: IOFormat<T>,
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat,
): Unit = withContext(Dispatchers.IO) {
val actualFile = if (path.toString().endsWith(".zip")) {
path
} else {
path.resolveSibling(path.fileName.toString() + ".zip")
}
val fos = Files.newOutputStream(
actualFile,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING
)
val zos = ZipOutputStream(fos)
zos.use {
it.writeNode("", DataTreeItem.Node(this@writeZip), format, envelopeFormat)
}
}

View File

@ -1,18 +1,16 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test
import space.kscience.dataforge.data.startAll
import space.kscience.dataforge.data.static
import space.kscience.dataforge.data.wrap
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.boolean
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.misc.DFExperimental
import kotlin.test.assertEquals
@OptIn(ExperimentalCoroutinesApi::class, DFExperimental::class)
@OptIn(DFExperimental::class)
internal class CachingWorkspaceTest {
@Test
@ -24,7 +22,7 @@ internal class CachingWorkspaceTest {
data {
//statically initialize data
repeat(5) {
static("myData[$it]", it)
wrap("myData[$it]", it)
}
}
@ -37,11 +35,10 @@ internal class CachingWorkspaceTest {
}
}
@Suppress("UNUSED_VARIABLE")
val doSecond by task<Any> {
transformEach(
doFirst,
dependencyMeta = if(taskMeta["flag"].boolean == true) taskMeta else Meta.EMPTY
dependencyMeta = if (taskMeta["flag"].boolean == true) taskMeta else Meta.EMPTY
) { _, name, _ ->
secondCounter++
println("Done second on $name with flag=${taskMeta["flag"].boolean ?: false}")
@ -53,13 +50,15 @@ internal class CachingWorkspaceTest {
val secondA = workspace.produce("doSecond")
val secondB = workspace.produce("doSecond", Meta { "flag" put true })
val secondC = workspace.produce("doSecond")
//use coroutineScope to wait for the result
coroutineScope {
first.startAll(this)
secondA.startAll(this)
secondB.startAll(this)
first.launch(this)
secondA.launch(this)
secondB.launch(this)
//repeat to check caching
secondC.startAll(this)
secondC.launch(this)
}
assertEquals(10, firstCounter)
assertEquals(10, secondCounter)
}

View File

@ -20,13 +20,13 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
val result: Data<Int> = selectedData.foldToData(0) { result, data ->
result + data.value
}
data("result", result)
put("result", result)
}
val singleData by task<Int> {
workspace.data.filterByType<Int>()["myData[12]"]?.let {
data("result", it)
put("result", it)
}
}
@ -47,7 +47,7 @@ class DataPropagationTest {
}
data {
repeat(100) {
static("myData[$it]", it)
wrap("myData[$it]", it)
}
}
}
@ -55,12 +55,12 @@ class DataPropagationTest {
@Test
fun testAllData() = runTest {
val node = testWorkspace.produce("Test.allData")
assertEquals(4950, node.asSequence().single().await())
assertEquals(4950, node.content.asSequence().single().await())
}
@Test
fun testSingleData() = runTest {
val node = testWorkspace.produce("Test.singleData")
assertEquals(12, node.asSequence().single().await())
assertEquals(12, node.content.asSequence().single().await())
}
}

View File

@ -1,6 +1,5 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import kotlinx.io.Sink
import kotlinx.io.Source
@ -9,38 +8,34 @@ import kotlinx.io.writeString
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.data.*
import space.kscience.dataforge.io.Envelope
import space.kscience.dataforge.io.IOFormat
import space.kscience.dataforge.io.io
import space.kscience.dataforge.io.readEnvelopeFile
import space.kscience.dataforge.io.*
import space.kscience.dataforge.io.yaml.YamlPlugin
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import java.nio.file.Files
import kotlin.io.path.deleteExisting
import kotlin.io.path.fileSize
import kotlin.io.path.toPath
import kotlin.reflect.KType
import kotlin.reflect.typeOf
import kotlin.test.Test
import kotlin.test.assertEquals
class FileDataTest {
val dataNode = DataTree<String> {
node("dir") {
static("a", "Some string") {
branch("dir") {
wrap("a", "Some string") {
"content" put "Some string"
}
}
static("b", "root data")
meta {
"content" put "This is root meta node"
}
wrap("b", "root data")
// meta {
// "content" put "This is root meta node"
// }
}
object StringIOFormat : IOFormat<String> {
override val type: KType get() = typeOf<String>()
override fun writeTo(sink: Sink, obj: String) {
sink.writeString(obj)
@ -51,29 +46,33 @@ class FileDataTest {
@Test
@DFExperimental
fun testDataWriteRead() = with(Global.io) {
fun testDataWriteRead() = runTest {
val io = Global.io
val dir = Files.createTempDirectory("df_data_node")
runBlocking {
writeDataDirectory(dir, dataNode, StringIOFormat)
println(dir.toUri().toString())
val reconstructed = readDataDirectory(dir) { _, _ -> StringIOFormat }
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await())
io.writeDataDirectory(dir, dataNode, StringIOFormat)
println(dir.toUri().toString())
val data = DataTree {
files(io, Name.EMPTY, dir)
}
val reconstructed = data.transform { (_, value) -> value.toByteArray().decodeToString() }
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await())
}
@Test
@DFExperimental
fun testZipWriteRead() = runTest {
with(Global.io) {
val zip = Files.createTempFile("df_data_node", ".zip")
dataNode.writeZip(zip, StringIOFormat)
println(zip.toUri().toString())
val reconstructed = readDataDirectory(zip) { _, _ -> StringIOFormat }
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await())
}
val io = Global.io
val zip = Files.createTempFile("df_data_node", ".zip")
zip.deleteExisting()
io.writeZip(zip, dataNode, StringIOFormat)
println(zip.toUri().toString())
val reconstructed = DataTree { files(io, Name.EMPTY, zip) }
.transform { (_, value) -> value.toByteArray().decodeToString() }
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await())
}
@OptIn(DFExperimental::class)

View File

@ -3,8 +3,7 @@ package space.kscience.dataforge.workspace
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test
import space.kscience.dataforge.data.startAll
import space.kscience.dataforge.data.static
import space.kscience.dataforge.data.wrap
import space.kscience.dataforge.misc.DFExperimental
import java.nio.file.Files
@ -17,18 +16,17 @@ class FileWorkspaceCacheTest {
data {
//statically initialize data
repeat(5) {
static("myData[$it]", it)
wrap("myData[$it]", it)
}
}
fileCache(Files.createTempDirectory("dataforge-temporary-cache"))
@Suppress("UNUSED_VARIABLE")
val echo by task<String> {
transformEach(dataByType<String>()) { arg, _, _ -> arg }
}
}
workspace.produce("echo").startAll(this)
workspace.produce("echo").launch(this)
}
}

View File

@ -27,8 +27,8 @@ public fun <P : Plugin> P.toFactory(): PluginFactory<P> = object : PluginFactory
override val tag: PluginTag = this@toFactory.tag
}
public fun Workspace.produceBlocking(task: String, block: MutableMeta.() -> Unit = {}): DataSet<Any> = runBlocking {
produce(task, block)
public fun Workspace.produceBlocking(task: String, block: MutableMeta.() -> Unit = {}): DataTree<*> = runBlocking {
produce(task, block).content
}
@OptIn(DFExperimental::class)
@ -62,14 +62,14 @@ internal class SimpleWorkspaceTest {
data {
//statically initialize data
repeat(100) {
static("myData[$it]", it)
wrap("myData[$it]", it)
}
}
val filterOne by task<Int> {
val name by taskMeta.string { error("Name field not defined") }
from(testPluginFactory) { test }.getByType<Int>(name)?.let { source ->
data(source.name, source.map { it })
from(testPluginFactory) { test }[name]?.let { source: Data<Int> ->
put(name, source)
}
}
@ -97,7 +97,7 @@ internal class SimpleWorkspaceTest {
val newData: Data<Int> = data.combine(linearData[data.name]!!) { l, r ->
l + r
}
data(data.name, newData)
put(data.name, newData)
}
}
@ -106,23 +106,23 @@ internal class SimpleWorkspaceTest {
val res = from(square).foldToData(0) { l, r ->
l + r.value
}
data("sum", res)
put("sum", res)
}
val averageByGroup by task<Int> {
val evenSum = workspace.data.filterByType<Int> { name, _ ->
val evenSum = workspace.data.filterByType<Int> { name, _, _ ->
name.toString().toInt() % 2 == 0
}.foldToData(0) { l, r ->
l + r.value
}
data("even", evenSum)
val oddSum = workspace.data.filterByType<Int> { name, _ ->
put("even", evenSum)
val oddSum = workspace.data.filterByType<Int> { name, _, _ ->
name.toString().toInt() % 2 == 1
}.foldToData(0) { l, r ->
l + r.value
}
data("odd", oddSum)
put("odd", oddSum)
}
val delta by task<Int> {
@ -132,7 +132,7 @@ internal class SimpleWorkspaceTest {
val res = even.combine(odd) { l, r ->
l - r
}
data("res", res)
put("res", res)
}
val customPipe by task<Int> {
@ -140,7 +140,7 @@ internal class SimpleWorkspaceTest {
val meta = data.meta.toMutableMeta().apply {
"newValue" put 22
}
data(data.name + "new", data.map { (data.meta["value"].int ?: 0) + it })
put(data.name + "new", data.transform { (data.meta["value"].int ?: 0) + it })
}
}
@ -159,7 +159,7 @@ internal class SimpleWorkspaceTest {
@Timeout(1)
fun testMetaPropagation() = runTest {
val node = workspace.produce("sum") { "testFlag" put true }
val res = node.asSequence().single().await()
val res = node.single().await()
}
@Test
@ -170,20 +170,25 @@ internal class SimpleWorkspaceTest {
}
@Test
fun testFullSquare() {
runBlocking {
val node = workspace.produce("fullSquare")
println(node.toMeta())
fun testFullSquare() = runTest {
val result = workspace.produce("fullSquare")
result.forEach {
println(
"""
Name: ${it.name}
Meta: ${it.meta}
Data: ${it.data.await()}
""".trimIndent()
)
}
}
@Test
fun testFilter() {
runBlocking {
val node = workspace.produce("filterOne") {
"name" put "myData[12]"
}
assertEquals(12, node.single().await())
fun testFilter() = runTest {
val node = workspace.produce("filterOne") {
"name" put "myData[12]"
}
assertEquals(12, node.single().await())
}
}