Rework data. Split static state and dynamic state

This commit is contained in:
Alexander Nozik 2024-12-15 10:56:35 +03:00
parent 088ed64f4a
commit 425f9801a5
22 changed files with 318 additions and 258 deletions
CHANGELOG.mdbuild.gradle.kts
dataforge-data/src
commonMain/kotlin/space/kscience/dataforge
commonTest/kotlin/space/kscience/dataforge/data
jvmMain/kotlin/space/kscience/dataforge/data
jvmTest/kotlin/space/kscience/dataforge/data
dataforge-meta
build.gradle.kts
src/commonMain/kotlin/space/kscience/dataforge/meta
dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace
docs/templates
gradle.properties

@ -8,6 +8,7 @@
### Changed
- Simplify inheritance logic in `MutableTypedMeta`
- API of DataSink.
### Deprecated
- MetaProvider `spec` is replaced by `readable`. `listOfSpec` replaced with `listOfReadable`

@ -9,7 +9,7 @@ plugins {
allprojects {
group = "space.kscience"
version = "0.9.1"
version = "0.10.0"
}
subprojects {

@ -1,12 +1,7 @@
package space.kscience.dataforge.actions
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import space.kscience.dataforge.data.DataSink
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.DataUpdate
import space.kscience.dataforge.data.launchUpdate
import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.UnsafeKType
import space.kscience.dataforge.names.Name
@ -31,10 +26,10 @@ public abstract class AbstractAction<T, R>(
/**
* Generate initial content of the output
*/
protected abstract fun DataSink<R>.generate(
protected abstract fun DataBuilderScope<R>.generate(
source: DataTree<T>,
meta: Meta,
)
): Map<Name, Data<R>>
/**
* Update part of the data set using provided data
@ -49,7 +44,7 @@ public abstract class AbstractAction<T, R>(
updatedData: DataUpdate<T>,
) {
//by default regenerate the whole data set
generate(source, meta)
putAll(generate(source, meta))
}
@OptIn(UnsafeKType::class)
@ -57,13 +52,21 @@ public abstract class AbstractAction<T, R>(
source: DataTree<T>,
meta: Meta,
updatesScope: CoroutineScope
): DataTree<R> = DataTree(outputType) {
generate(source, meta)
): DataTree<R> = DataTree(
dataType = outputType,
scope = updatesScope,
initialData = DataBuilderScope<R>().generate(source, meta)
) {
//propagate updates
launchUpdate(updatesScope) {
source.updates.onEach { update ->
update(source, meta, update)
}.collect()
val updateSink = DataSink<R> { name, data ->
put(name, data)
}
with(updateSink) {
source.updates.collect { du: DataUpdate<T> ->
update(source, meta, du)
}
}
}
}

@ -54,22 +54,21 @@ public class MapAction<T, R>(
private val block: MapActionBuilder<T, R>.() -> Unit,
) : AbstractAction<T, R>(outputType) {
private fun DataSink<R>.mapOne(name: Name, data: Data<T>?, meta: Meta) {
private fun mapOne(name: Name, data: Data<T>?, meta: Meta): Pair<Name, Data<R>?> {
//fast return for null data
if (data == null) {
put(name, null)
return
return name to null
}
// Creating a new environment for action using **old** name, old meta and task meta
val env = ActionEnv(name, data.meta, meta)
//applying transformation from builder
val builder = MapActionBuilder<T, R>(
name,
data.meta.toMutableMeta(), // using data meta
meta,
data.type,
outputType
name = name,
meta = data.meta.toMutableMeta(), // using data meta
actionMeta = meta,
dataType = data.type,
outputType = outputType
).apply(block)
//getting new name
@ -82,21 +81,26 @@ public class MapAction<T, R>(
builder.result(env, data.await())
}
//setting the data node
put(newName, newData)
return newName to newData
}
override fun DataSink<R>.generate(source: DataTree<T>, meta: Meta) {
source.forEach { mapOne(it.name, it.data, meta) }
override fun DataBuilderScope<R>.generate(source: DataTree<T>, meta: Meta): Map<Name, Data<R>> = buildMap {
source.forEach {
val (name, data) = mapOne(it.name, it.data, meta)
if (data != null) {
check(name !in keys) { "Data with key $name already exist in the result" }
put(name, data)
}
}
}
override suspend fun DataSink<R>.update(
source: DataTree<T>,
meta: Meta,
updatedData: DataUpdate<T>,
) {
mapOne(updatedData.name, updatedData.data, meta)
) {
val (name, data) = mapOne(updatedData.name, updatedData.data, meta)
put(name, data)
}
}

@ -3,6 +3,8 @@ package space.kscience.dataforge.actions
import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFBuilder
import space.kscience.dataforge.misc.UnsafeKType
import space.kscience.dataforge.names.Name
@ -13,7 +15,7 @@ import kotlin.reflect.typeOf
public class JoinGroup<T, R>(
public var name: String,
internal val set: DataTree<T>,
internal val data: DataTree<T>,
@PublishedApi internal var outputType: KType,
) {
@ -41,12 +43,17 @@ public class ReduceGroupBuilder<T, R>(
private val groupRules: MutableList<(DataTree<T>) -> List<JoinGroup<T, R>>> = ArrayList();
/**
* introduce grouping by meta value
* Group by a meta value
*/
public fun byValue(tag: String, defaultTag: String = "@default", action: JoinGroup<T, R>.() -> Unit) {
@OptIn(UnsafeKType::class)
public fun byMetaValue(tag: String, defaultTag: String = "@default", action: JoinGroup<T, R>.() -> Unit) {
groupRules += { node ->
GroupRule.byMetaValue(tag, defaultTag).gather(node).map {
JoinGroup<T, R>(it.key, it.value, outputType).apply(action)
val groups = mutableMapOf<String, MutableMap<Name, Data<T>>>()
node.forEach { data ->
groups.getOrPut(data.meta[tag]?.string ?: defaultTag) { mutableMapOf() }.put(data.name, data)
}
groups.map { (key, dataMap) ->
JoinGroup<T, R>(key, dataMap.asTree(node.dataType), outputType).apply(action)
}
}
}
@ -84,9 +91,9 @@ internal class ReduceAction<T, R>(
) : AbstractAction<T, R>(outputType) {
//TODO optimize reduction. Currently, the whole action recalculates on push
override fun DataSink<R>.generate(source: DataTree<T>, meta: Meta) {
override fun DataBuilderScope<R>.generate(source: DataTree<T>, meta: Meta): Map<Name, Data<R>> = buildMap {
ReduceGroupBuilder<T, R>(meta, outputType).apply(action).buildGroups(source).forEach { group ->
val dataFlow: Map<Name, Data<T>> = group.set.asSequence().fold(HashMap()) { acc, value ->
val dataFlow: Map<Name, Data<T>> = group.data.asSequence().fold(HashMap()) { acc, value ->
acc.apply {
acc[value.name] = value.data
}

@ -7,7 +7,6 @@ import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.meta.toMutableMeta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.parseAsName
import kotlin.collections.set
import kotlin.reflect.KType
import kotlin.reflect.typeOf
@ -48,7 +47,7 @@ internal class SplitAction<T, R>(
private val action: SplitBuilder<T, R>.() -> Unit,
) : AbstractAction<T, R>(outputType) {
private fun DataSink<R>.splitOne(name: Name, data: Data<T>?, meta: Meta) {
private fun splitOne(name: Name, data: Data<T>?, meta: Meta): Map<Name, Data<R>?> = buildMap {
val laminate = Laminate(data?.meta, meta)
val split = SplitBuilder<T, R>(name, data?.meta ?: Meta.EMPTY).apply(action)
@ -76,16 +75,26 @@ internal class SplitAction<T, R>(
}
}
override fun DataSink<R>.generate(source: DataTree<T>, meta: Meta) {
source.forEach { splitOne(it.name, it.data, meta) }
override fun DataBuilderScope<R>.generate(
source: DataTree<T>,
meta: Meta
): Map<Name, Data<R>> = buildMap {
source.forEach {
splitOne(it.name, it.data, meta).forEach { (name, data) ->
check(name !in keys) { "Data with key $name already exist in the result" }
if (data != null) {
put(name, data)
}
}
}
}
override suspend fun DataSink<R>.update(
source: DataTree<T>,
meta: Meta,
updatedData: DataUpdate<T>,
) {
splitOne(updatedData.name, updatedData.data, meta)
) {
putAll(splitOne(updatedData.name, updatedData.data, meta))
}
}

@ -15,40 +15,41 @@
*/
package space.kscience.dataforge.data
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.misc.UnsafeKType
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.plus
import kotlin.reflect.KType
public interface GroupRule {
public fun <T> gather(set: DataTree<T>): Map<String, DataTree<T>>
/**
* Interface that define rename rule for [Data]
*/
@DFExperimental
public fun interface DataRenamer {
public fun rename(name: Name, meta: Meta, type: KType): Name
public companion object {
/**
* Create grouping rule that creates groups for different values of value
* field with name [key]
*
* @param key
* @param defaultTagValue
* @return
* Prepend name token `key\[tagValue\]` to data name
*/
@OptIn(UnsafeKType::class)
public fun byMetaValue(
public fun groupByMetaValue(
key: String,
defaultTagValue: String,
): GroupRule = object : GroupRule {
): DataRenamer = object : DataRenamer {
override fun <T> gather(
set: DataTree<T>,
): Map<String, DataTree<T>> {
val map = HashMap<String, MutableDataTree<T>>()
set.forEach { data ->
val tagValue: String = data.meta[key]?.string ?: defaultTagValue
map.getOrPut(tagValue) { MutableDataTree(set.dataType) }.put(data.name, data.data)
}
return map
override fun rename(
name: Name,
meta: Meta,
type: KType
): Name {
val tagValue: String = meta[key]?.string ?: defaultTagValue
return NameToken(key,tagValue).plus(name)
}
}
}

@ -1,49 +1,28 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.launch
import space.kscience.dataforge.misc.UnsafeKType
import space.kscience.dataforge.names.*
import kotlin.reflect.KType
import kotlin.reflect.typeOf
public interface DataSink<in T> {
/**
* Put data without notification
*/
public fun put(name: Name, data: Data<T>?)
/**
* Put data and propagate changes downstream
*/
public suspend fun update(name: Name, data: Data<T>?)
public interface DataBuilderScope<in T>{
public companion object: DataBuilderScope<Nothing>
}
/**
* Launch continuous update using
*/
public fun <T> DataSink<T>.launchUpdate(
scope: CoroutineScope,
updater: suspend DataSink<T>.() -> Unit,
): Job = scope.launch {
object : DataSink<T> {
override fun put(name: Name, data: Data<T>?) {
launch {
this@launchUpdate.update(name, data)
}
}
@Suppress("UNCHECKED_CAST")
public fun <T> DataBuilderScope(): DataBuilderScope<T> = DataBuilderScope as DataBuilderScope<T>
override suspend fun update(name: Name, data: Data<T>?) {
this@launchUpdate.update(name, data)
}
}.updater()
public fun interface DataSink<in T>: DataBuilderScope<T> {
/**
* Put data and notify listeners if needed
*/
public suspend fun put(name: Name, data: Data<T>?)
}
/**
* A mutable version of [DataTree]
*/
@ -54,16 +33,14 @@ public interface MutableDataTree<T> : DataTree<T>, DataSink<T> {
public fun getOrCreateItem(token: NameToken): MutableDataTree<T>
public operator fun set(token: NameToken, data: Data<T>?)
public suspend fun put(token: NameToken, data: Data<T>?)
override fun put(name: Name, data: Data<T>?): Unit = set(name, data)
}
public tailrec operator fun <T> MutableDataTree<T>.set(name: Name, data: Data<T>?): Unit {
when (name.length) {
0 -> this.data = data
1 -> set(name.first(), data)
else -> getOrCreateItem(name.first())[name.cutFirst()] = data
override suspend fun put(name: Name, data: Data<T>?): Unit {
when (name.length) {
0 -> this.data = data
1 -> put(name.first(), data)
else -> getOrCreateItem(name.first()).put(name.cutFirst(), data)
}
}
}
@ -81,8 +58,8 @@ private class MutableDataTreeRoot<T>(
override val dataType: KType,
) : MutableDataTree<T> {
override val updates = MutableSharedFlow<DataUpdate<T>>(100, onBufferOverflow = BufferOverflow.DROP_LATEST)
override val items = HashMap<NameToken, MutableDataTree<T>>()
override val updates = MutableSharedFlow<DataUpdate<T>>(extraBufferCapacity = 100)
inner class MutableDataTreeBranch(val branchName: Name) : MutableDataTree<T> {
@ -101,44 +78,21 @@ private class MutableDataTreeRoot<T>(
override fun getOrCreateItem(token: NameToken): MutableDataTree<T> =
items.getOrPut(token) { MutableDataTreeBranch(branchName + token) }
override fun set(token: NameToken, data: Data<T>?) {
val subTree = getOrCreateItem(token)
subTree.data = data
override suspend fun put(token: NameToken, data: Data<T>?) {
this.data = data
this@MutableDataTreeRoot.updates.emit(DataUpdate(data?.type ?: dataType, branchName + token, data))
}
override suspend fun update(name: Name, data: Data<T>?) {
if (name.isEmpty()) {
this.data = data
this@MutableDataTreeRoot.updates.emit(DataUpdate(data?.type ?: dataType, branchName + name, data))
} else {
getOrCreateItem(name.first()).update(name.cutFirst(), data)
}
}
}
override var data: Data<T>? = null
override val items = HashMap<NameToken, MutableDataTree<T>>()
override fun getOrCreateItem(token: NameToken): MutableDataTree<T> = items.getOrPut(token) {
MutableDataTreeBranch(token.asName())
}
override fun set(token: NameToken, data: Data<T>?) {
val subTree = getOrCreateItem(token)
subTree.data = data
}
override suspend fun update(name: Name, data: Data<T>?) {
if (name.isEmpty()) {
this.data = data
updates.emit(DataUpdate(data?.type ?: dataType, name, data))
} else {
getOrCreateItem(name.first()).update(name.cutFirst(), data)
}
override suspend fun put(token: NameToken, data: Data<T>?) {
this.data = data
updates.emit(DataUpdate(data?.type ?: dataType, token.asName(), data))
}
}

@ -2,6 +2,7 @@ package space.kscience.dataforge.data
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.first
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.*
import kotlin.contracts.contract
@ -30,11 +31,18 @@ public interface DataSource<out T> {
public interface ObservableDataSource<out T> : DataSource<T> {
/**
* Flow updates made to the data
* Flow updates made to the data. Updates are considered critical. The producer will suspend unless all updates are consumed.
*/
public val updates: Flow<DataUpdate<T>>
}
public suspend fun <T> ObservableDataSource<T>.awaitData(name: Name): Data<T> {
return read(name) ?: updates.first { it.name == name && it.data != null }.data!!
}
public suspend fun <T> ObservableDataSource<T>.awaitData(name: String): Data<T> =
awaitData(name.parseAsName())
/**
* A tree like structure for data holding
*/
@ -54,8 +62,7 @@ public interface DataTree<out T> : ObservableDataSource<T> {
override val updates: Flow<DataUpdate<T>>
public companion object {
private object EmptyDataTree :
DataTree<Nothing> {
private object EmptyDataTree : DataTree<Nothing> {
override val data: Data<Nothing>? = null
override val items: Map<NameToken, EmptyDataTree> = emptyMap()
override val dataType: KType = typeOf<Unit>()

@ -8,7 +8,7 @@ import space.kscience.dataforge.meta.copy
private class MetaMaskData<T>(val origin: Data<T>, override val meta: Meta) : Data<T> by origin
/**
* A data with overriden meta. It reflects original data computed state.
* A data with overridden meta. It reflects original data computed state.
*/
public fun <T> Data<T>.withMeta(newMeta: Meta): Data<T> = if (this is MetaMaskData) {
MetaMaskData(origin, newMeta)

@ -20,6 +20,11 @@ public fun <T> DataUpdate(type: KType, name: Name, data: Data<T>?): DataUpdate<T
override val type: KType = type
override val name: Name = name
override val data: Data<T>? = data
override fun toString(): String {
return "DataUpdate(type=$type, name=$name, data=$data)"
}
}
/**

@ -1,15 +1,14 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.names.*
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.isEmpty
import space.kscience.dataforge.names.plus
public fun <T> DataSink<T>.put(value: NamedData<T>) {
public suspend fun <T> DataSink<T>.put(value: NamedData<T>) {
put(value.name, value.data)
}
@ -20,16 +19,7 @@ public inline fun <T> DataSink<T>.putAll(
if (prefix.isEmpty()) {
apply(block)
} else {
val proxyDataSink = object :DataSink<T>{
override fun put(name: Name, data: Data<T>?) {
this@putAll.put(prefix + name, data)
}
override suspend fun update(name: Name, data: Data<T>?) {
this@putAll.update(prefix + name, data)
}
}
val proxyDataSink = DataSink<T> { name, data -> this@putAll.put(prefix + name, data) }
proxyDataSink.apply(block)
}
@ -42,23 +32,23 @@ public inline fun <T> DataSink<T>.putAll(
): Unit = putAll(prefix.asName(), block)
public fun <T> DataSink<T>.put(name: String, value: Data<T>) {
public suspend fun <T> DataSink<T>.put(name: String, value: Data<T>) {
put(Name.parse(name), value)
}
public fun <T> DataSink<T>.putAll(name: Name, tree: DataTree<T>) {
public suspend fun <T> DataSink<T>.putAll(name: Name, tree: DataTree<T>) {
putAll(name) { putAll(tree.asSequence()) }
}
public fun <T> DataSink<T>.putAll(name: String, tree: DataTree<T>) {
public suspend fun <T> DataSink<T>.putAll(name: String, tree: DataTree<T>) {
putAll(Name.parse(name)) { putAll(tree.asSequence()) }
}
/**
* Produce lazy [Data] and emit it into the [MutableDataTree]
*/
public inline fun <reified T> DataSink<T>.putValue(
public suspend inline fun <reified T> DataSink<T>.putValue(
name: String,
meta: Meta = Meta.EMPTY,
noinline producer: suspend () -> T,
@ -67,7 +57,7 @@ public inline fun <reified T> DataSink<T>.putValue(
put(name, data)
}
public inline fun <reified T> DataSink<T>.putValue(
public suspend inline fun <reified T> DataSink<T>.putValue(
name: Name,
meta: Meta = Meta.EMPTY,
noinline producer: suspend () -> T,
@ -79,56 +69,49 @@ public inline fun <reified T> DataSink<T>.putValue(
/**
* Emit static data with the fixed value
*/
public inline fun <reified T> DataSink<T>.putValue(
public suspend inline fun <reified T> DataSink<T>.putValue(
name: Name,
value: T,
meta: Meta = Meta.EMPTY,
): Unit = put(name, Data.wrapValue(value, meta))
public inline fun <reified T> DataSink<T>.putValue(
public suspend inline fun <reified T> DataSink<T>.putValue(
name: String,
value: T,
meta: Meta = Meta.EMPTY,
): Unit = put(name, Data.wrapValue(value, meta))
public inline fun <reified T> DataSink<T>.putValue(
public suspend inline fun <reified T> DataSink<T>.putValue(
name: String,
value: T,
metaBuilder: MutableMeta.() -> Unit,
): Unit = put(Name.parse(name), Data.wrapValue(value, Meta(metaBuilder)))
public suspend inline fun <reified T> DataSink<T>.updateValue(
name: Name,
value: T,
meta: Meta = Meta.EMPTY,
): Unit = update(name, Data.wrapValue(value, meta))
public suspend inline fun <reified T> DataSink<T>.updateValue(
name: String,
value: T,
meta: Meta = Meta.EMPTY,
): Unit = update(name.parseAsName(), Data.wrapValue(value, meta))
public fun <T> DataSink<T>.putAll(sequence: Sequence<NamedData<T>>) {
public suspend fun <T> DataSink<T>.putAll(sequence: Sequence<NamedData<T>>) {
sequence.forEach {
put(it.name, it.data)
}
}
public fun <T> DataSink<T>.putAll(tree: DataTree<T>) {
public suspend fun <T> DataSink<T>.putAll(map: Map<Name, Data<T>?>) {
map.forEach { (name, data) ->
put(name, data)
}
}
public suspend fun <T> DataSink<T>.putAll(tree: DataTree<T>) {
putAll(tree.asSequence())
}
/**
* Copy given data set and mirror its changes to this [DataSink] in [this@setAndObserve]. Returns an update [Job]
* Copy given data set and mirror its changes to this [DataSink]. Suspends indefinitely.
*/
public fun <T : Any> DataSink<T>.putAllAndWatch(
scope: CoroutineScope,
branchName: Name = Name.EMPTY,
public suspend fun <T : Any> DataSink<T>.putAllAndWatch(
source: DataTree<T>,
): Job {
branchName: Name = Name.EMPTY,
) {
putAll(branchName, source)
return source.updates.onEach {
update(branchName + it.name, it.data)
}.launchIn(scope)
source.updates.collect {
put(branchName + it.name, it.data)
}
}

@ -1,5 +1,6 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.misc.UnsafeKType
import space.kscience.dataforge.names.Name
@ -36,7 +37,6 @@ public fun <T, R> Data<T>.transform(
}
/**
* Lazily transform this data to another data. By convention [block] should not use external data (be pure).
* @param coroutineContext additional [CoroutineContext] elements used for data computation.
@ -77,7 +77,6 @@ internal fun Iterable<Data<*>>.joinMeta(): Meta = Meta {
}
@PublishedApi
internal fun Map<*, Data<*>>.joinMeta(): Meta = Meta {
forEach { (key, data) ->
@ -201,34 +200,46 @@ public inline fun <T, reified R> Iterable<NamedData<T>>.foldNamedToData(
@UnsafeKType
public suspend fun <T, R> DataTree<T>.transform(
public fun <T, R> DataTree<T>.map(
outputType: KType,
scope: CoroutineScope,
metaTransform: MutableMeta.() -> Unit = {},
coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: suspend (NamedValueWithMeta<T>) -> R,
): DataTree<R> = DataTree<R>(outputType){
//quasi-synchronous processing of elements in the tree
asSequence().forEach { namedData: NamedData<T> ->
compute: suspend (NamedValueWithMeta<T>) -> R,
): DataTree<R> = DataTree<R>(
outputType,
scope,
initialData = asSequence().associate { namedData: NamedData<T> ->
val newMeta = namedData.meta.toMutableMeta().apply(metaTransform).seal()
val d = Data(outputType, newMeta, coroutineContext, listOf(namedData)) {
block(namedData.awaitWithMeta())
val newData = Data(outputType, newMeta, scope.coroutineContext, listOf(namedData)) {
compute(namedData.awaitWithMeta())
}
namedData.name to newData
}
) {
updates.collect { update ->
val data: Data<T>? = update.data
if (data == null) put(update.name, null) else {
val newMeta = data.meta.toMutableMeta().apply(metaTransform).seal()
val d = Data(outputType, newMeta, scope.coroutineContext, listOf(data)) {
compute(NamedValueWithMeta(update.name, data.await(), data.meta))
}
put(update.name, d)
}
put(namedData.name, d)
}
}
@OptIn(UnsafeKType::class)
public suspend inline fun <T, reified R> DataTree<T>.transform(
public inline fun <T, reified R> DataTree<T>.map(
scope: CoroutineScope,
noinline metaTransform: MutableMeta.() -> Unit = {},
coroutineContext: CoroutineContext = EmptyCoroutineContext,
noinline block: suspend (NamedValueWithMeta<T>) -> R,
): DataTree<R> = this@transform.transform(typeOf<R>(), metaTransform, coroutineContext, block)
): DataTree<R> = map(typeOf<R>(), scope, metaTransform, block)
public inline fun <T> DataTree<T>.forEach(block: (NamedData<T>) -> Unit) {
asSequence().forEach(block)
}
// DataSet reduction
// DataSet snapshot reduction
@PublishedApi
internal fun DataTree<*>.joinMeta(): Meta = Meta {
@ -238,6 +249,10 @@ internal fun DataTree<*>.joinMeta(): Meta = Meta {
}
}
/**
* Reduce current snapshot of the [DataTree] to a single [Data].
* Even if a tree is changed in the future, only current data set is taken.
*/
public inline fun <T, reified R> DataTree<T>.reduceToData(
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,

@ -1,8 +1,11 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import space.kscience.dataforge.misc.UnsafeKType
@ -14,7 +17,7 @@ import kotlin.reflect.typeOf
private class FlatDataTree<T>(
override val dataType: KType,
private val dataSet: Map<Name, Data<T>>,
private val sourceUpdates: Flow<DataUpdate<T>>,
private val sourceUpdates: SharedFlow<DataUpdate<T>>,
private val prefix: Name,
) : DataTree<T> {
override val data: Data<T>? get() = dataSet[prefix]
@ -33,7 +36,7 @@ private class FlatDataTree<T>(
}
/**
* A builder for static [DataTree].
* A builder for [DataTree].
*/
private class DataTreeBuilder<T>(
private val type: KType,
@ -46,20 +49,13 @@ private class DataTreeBuilder<T>(
private val updatesFlow = MutableSharedFlow<DataUpdate<T>>()
override fun put(name: Name, data: Data<T>?) {
if (data == null) {
map.remove(name)
} else {
map[name] = data
}
}
override suspend fun update(name: Name, data: Data<T>?) {
override suspend fun put(name: Name, data: Data<T>?) {
mutex.withLock {
if (data == null) {
map.remove(name)
} else {
map.put(name, data)
map[name] = data
}
}
updatesFlow.emit(DataUpdate(data?.type ?: type, name, data))
@ -74,16 +70,24 @@ private class DataTreeBuilder<T>(
@UnsafeKType
public fun <T> DataTree(
dataType: KType,
generator: DataSink<T>.() -> Unit,
): DataTree<T> = DataTreeBuilder<T>(dataType).apply(generator).build()
scope: CoroutineScope,
initialData: Map<Name, Data<T>> = emptyMap(),
updater: suspend DataSink<T>.() -> Unit,
): DataTree<T> = DataTreeBuilder<T>(dataType, initialData).apply {
scope.launch{
updater()
}
}.build()
/**
* Create and a data tree.
*/
@OptIn(UnsafeKType::class)
public inline fun <reified T> DataTree(
noinline generator: DataSink<T>.() -> Unit,
): DataTree<T> = DataTree(typeOf<T>(), generator)
scope: CoroutineScope,
initialData: Map<Name, Data<T>> = emptyMap(),
noinline updater: suspend DataSink<T>.() -> Unit,
): DataTree<T> = DataTree(typeOf<T>(), scope, initialData, updater)
/**

@ -1,8 +1,7 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import space.kscience.dataforge.names.asName
import kotlin.test.Test
@ -57,17 +56,18 @@ internal class DataTreeBuilderTest {
val subNode = MutableDataTree<Int>()
val rootNode = MutableDataTree<Int>() {
job = putAllAndWatch(this@runTest, "sub".asName(), subNode)
job = launch { putAllAndWatch(subNode,"sub".asName())}
}
repeat(10) {
subNode.updateValue("value[$it]", it)
}
rootNode.updates.take(10).collect()
assertEquals(9, rootNode["sub.value[9]"]?.await())
assertEquals(8, rootNode["sub.value[8]"]?.await())
assertEquals(9, subNode.awaitData("value[9]").await())
assertEquals(8, subNode.awaitData("value[8]").await())
assertEquals(9, rootNode.awaitData("sub.value[9]").await())
assertEquals(8, rootNode.awaitData("sub.value[8]").await())
println("finished")
job?.cancel()
}
}

@ -1,3 +1,5 @@
@file:Suppress("CONTEXT_RECEIVERS_DEPRECATED")
package space.kscience.dataforge.data
import space.kscience.dataforge.names.Name
@ -7,14 +9,14 @@ import space.kscience.dataforge.names.Name
* Append data to node
*/
context(DataSink<T>)
public infix fun <T : Any> String.put(data: Data<T>): Unit =
public suspend infix fun <T : Any> String.put(data: Data<T>): Unit =
put(Name.parse(this), data)
/**
* Append node
*/
context(DataSink<T>)
public infix fun <T : Any> String.putAll(dataSet: DataTree<T>): Unit =
public suspend infix fun <T : Any> String.putAll(dataSet: DataTree<T>): Unit =
putAll(this, dataSet)
/**

@ -1,11 +1,8 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import space.kscience.dataforge.actions.Action
import space.kscience.dataforge.actions.invoke
import space.kscience.dataforge.actions.mapping
@ -18,22 +15,20 @@ import kotlin.time.Duration.Companion.milliseconds
internal class ActionsTest {
@Test
fun testStaticMapAction() = runTest(timeout = 500.milliseconds) {
withContext(Dispatchers.Default) {
val plusOne = Action.mapping<Int, Int> {
result { it + 1 }
}
val data: DataTree<Int> = DataTree {
repeat(10) {
putValue(it.toString(), it)
}
}
val result = plusOne(data)
advanceUntilIdle()
assertEquals(2, result["1"]?.await())
val plusOne = Action.mapping<Int, Int> {
result { it + 1 }
}
val data: DataTree<Int> = DataTree {
repeat(10) {
putValue(it.toString(), it)
}
}
val result = plusOne(data)
advanceUntilIdle()
assertEquals(2, result["1"]?.await())
}
@Test
@ -44,19 +39,13 @@ internal class ActionsTest {
val source: MutableDataTree<Int> = MutableDataTree()
val result = plusOne(source)
val result: DataTree<Int> = plusOne(source)
repeat(10) {
source.updateValue(it.toString(), it)
}
delay(10)
// result.updates.take(10).onEach { println(it.name) }.collect()
assertEquals(2, result["1"]?.await())
assertEquals(2, result.awaitData("1").await())
}
}

@ -16,4 +16,10 @@ description = "Meta definition and basic operations on meta"
readme{
maturity = space.kscience.gradle.Maturity.DEVELOPMENT
feature("metadata"){
"""
""".trimIndent()
}
}

@ -11,9 +11,18 @@ public fun Value.isNull(): Boolean = this == Null
public fun Value.isList(): Boolean = this.type == ValueType.LIST
public val Value.boolean: Boolean
get() = this == True
|| this.list.firstOrNull() == True
|| (type == ValueType.STRING && string.toBoolean())
get() = when (type) {
ValueType.NUMBER -> int > 0
ValueType.STRING -> string.toBoolean()
ValueType.BOOLEAN -> this === True
ValueType.LIST -> list.singleOrNull()?.boolean == true
ValueType.NULL -> false
}
// this == True
// || this.list.firstOrNull() == True
// || (type == ValueType.STRING && string.toBoolean())
// || (type == ValueType.)
public val Value.int: Int get() = number.toInt()

@ -54,7 +54,7 @@ class FileDataTest {
val data = DataTree {
files(io, Name.EMPTY, dir)
}
val reconstructed = data.transform { (_, value) -> value.toByteArray().decodeToString() }
val reconstructed = data.map { (_, value) -> value.toByteArray().decodeToString() }
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await())
}
@ -69,7 +69,7 @@ class FileDataTest {
io.writeZip(zip, dataNode, StringIOFormat)
println(zip.toUri().toString())
val reconstructed = DataTree { files(io, Name.EMPTY, zip) }
.transform { (_, value) -> value.toByteArray().decodeToString() }
.map { (_, value) -> value.toByteArray().decodeToString() }
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await())

@ -3,4 +3,65 @@
![Gradle build](https://github.com/mipt-npm/dataforge-core/workflows/Gradle%20build/badge.svg)
## Publications
* [A general overview](https://doi.org/10.1051/epjconf/201817705003)
* [An application in "Troitsk nu-mass" experiment](https://doi.org/10.1088/1742-6596/1525/1/012024)
## Video
* [A presentation on application of (old version of) DataForge to Troitsk nu-mass analysis.] (https://youtu.be/OpWzLXUZnLI?si=3qn7EMruOHMJX3Bc)
## Questions and Answers
In this section, we will try to cover DataForge main ideas in the form of questions and answers.
### General
**Q**: I have a lot of data to analyze. The analysis process is complicated, requires a lot of stages and data flow is not always obvious. To top it the data size is huge, so I don't want to perform operation I don't need (calculate something I won't need or calculate something twice). And yes, I need it to be performed in parallel and probably on remote computer. By the way, I am sick and tired of scripts that modify other scripts that control scripts. Could you help me?
**A**: Yes, that is precisely the problem DataForge was made to solve. It allows to perform some automated data manipulations with automatic optimization and parallelization. The important thing that data processing recipes are made in the declarative way, so it is quite easy to perform computations on a remote station. Also, DataForge guarantees reproducibility of analysis results.
**Q**: How does it work?
**A**: At the core of DataForge lies the idea of metadata processor. It utilizes the fact that in order to analyze something you need data itself and some additional information about what does that data represent and what does user want as a result. This additional information is called metadata and could be organized in a regular structure (a tree of values not unlike XML or JSON). The important thing is that this distinction leaves no place for user instructions (or scripts). Indeed, the idea of DataForge logic is that one do not need imperative commands. The framework configures itself according to input meta-data and decides what operations should be performed in the most efficient way.
**Q**: But where does it take algorithms to use?
**A**: Of course algorithms must be written somewhere. No magic here. The logic is written in specialized modules. Some modules are provided out of the box at the system core, some need to be developed for specific problem.
**Q**: So I still need to write the code? What is the difference then?
**A**: Yes, someone still needs to write the code. But not necessary you. Simple operations could be performed using provided core logic. Also, your group can have one programmer writing the logic and all other using it without any real programming expertise. The framework organized in a such way that one writes some additional logic, they do not need to think about complicated thing like parallel computing, resource handling, logging, caching etc. Most of the things are done by the DataForge.
### Platform
**Q**: Which platform does DataForge use? Which operating system is it working on?
**A**: The DataForge is mostly written in Kotlin-multiplatform and could be used on JVM, JS and native targets. Some modules and functions are supported only on JVM
**Q**: Can I use my C++/Fortran/Python code in DataForge?
A: Yes, as long as the code could be called from Java. Most of common languages have a bridge for Java access. There are completely no problems with compiled C/Fortran libraries. Python code could be called via one of existing python-java interfaces. It is also planned to implement remote method invocation for common languages, so your Python, or, say, Julia, code could run in its native environment. The metadata processor paradigm makes it much easier to do so.
### Features
**Q**: What other features does DataForge provide?
**A**: Alongside metadata processing (and a lot of tools for metadata manipulation and layering), DataForge has two additional important concepts:
* **Modularisation**. Contrary to lot other frameworks, DataForge is intrinsically modular. The mandatory part is a rather tiny core module. Everything else could be customized.
* **Context encapsulation**. Every DataForge task is executed in some context. The context isolates environment for the task and also works as dependency injection base and specifies interaction of the task with the external world.
### Misc
**Q**: So everything looks great, can I replace my ROOT / other data analysis framework with DataForge?
**A**: One must note, that DataForge is made for analysis, not for visualisation. The visualisation and user interaction capabilities of DataForge are rather limited compared to frameworks like ROOT, JAS3 or DataMelt. The idea is to provide reliable API and core functionality. In fact JAS3 and DataMelt could be used as a frontend for DataForge mechanics.
**Q**: How does DataForge compare to cluster computation frameworks like Apache Spark?
**A**: Again, it is not the purpose of DataForge to replace cluster software. DataForge has some internal parallelism mechanics and implementations, but they are most certainly worse than specially developed programs. Still, DataForge is not fixed on one single implementation. Your favourite parallel processing tool could be still used as a back-end for the DataForge. With full benefit of configuration tools, integrations and no performance overhead.
**Q**: Is it possible to use DataForge in notebook mode?
**A**: [Kotlin jupyter](https://github.com/Kotlin/kotlin-jupyter) allows to use any JVM program in a notebook mode. The dedicated module for DataForge is work in progress.
${modules}

@ -6,4 +6,4 @@ org.gradle.jvmargs=-Xmx4096m
kotlin.mpp.stability.nowarn=true
kotlin.native.ignoreDisabledTargets=true
toolsVersion=0.15.4-kotlin-2.0.0
toolsVersion=0.16.0-kotlin-2.1.0