move to Kotlin 1.6.20 and KTor 2.0

This commit is contained in:
Alexander Nozik 2022-04-17 21:59:09 +03:00
parent 3c6bc15716
commit e5000171f1
No known key found for this signature in database
GPG Key ID: F7FCF2DD25C71357
22 changed files with 234 additions and 173 deletions

View File

@ -4,11 +4,12 @@
### Added
- Add `specOrNull` delegate to meta and Scheme
- Suspended read methods to the `Binary`
- Static `Meta` to all `DataSet`s
- Synchronously accessed `meta` to all `DataSet`s
### Changed
- `Factory` is now `fun interface` and uses `build` instead of `invoke`. `invoke moved to an extension.
- KTor 2.0
- DataTree `items` call is blocking.
### Deprecated

View File

@ -4,7 +4,7 @@ plugins {
allprojects {
group = "space.kscience"
version = "0.5.3-dev-4"
version = "0.6.0-dev-1"
repositories{
mavenCentral()
}

View File

@ -1,7 +1,6 @@
package space.kscience.dataforge.actions
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import space.kscience.dataforge.data.*
@ -79,13 +78,13 @@ internal class MapAction<in T : Any, out R : Any>(
val flow = dataSet.flowData().map(::mapOne)
return ActiveDataTree(outputType) {
populate(flow)
populateWith(flow)
scope?.launch {
dataSet.updates.collect { name ->
//clear old nodes
remove(name)
//collect new items
populate(dataSet.flowChildren(name).map(::mapOne))
populateWith(dataSet.flowChildren(name).map(::mapOne))
}
}
}

View File

@ -2,7 +2,10 @@ package space.kscience.dataforge.actions
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Laminate
@ -72,13 +75,13 @@ internal class SplitAction<T : Any, R : Any>(
}
return ActiveDataTree<R>(outputType) {
populate(dataSet.flowData().flatMapConcat(transform = ::splitOne))
populateWith(dataSet.flowData().flatMapConcat(transform = ::splitOne))
scope?.launch {
dataSet.updates.collect { name ->
//clear old nodes
remove(name)
//collect new items
populate(dataSet.flowChildren(name).flatMapConcat(transform = ::splitOne))
populateWith(dataSet.flowChildren(name).flatMapConcat(transform = ::splitOne))
}
}
}

View File

@ -1,9 +1,6 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import space.kscience.dataforge.meta.*
@ -12,7 +9,7 @@ import kotlin.reflect.KType
import kotlin.reflect.typeOf
/**
* A mutable [DataTree.Companion.active]. It
* A mutable [DataTree].
*/
public class ActiveDataTree<T : Any>(
override val dataType: KType,
@ -20,33 +17,28 @@ public class ActiveDataTree<T : Any>(
private val mutex = Mutex()
private val treeItems = HashMap<NameToken, DataTreeItem<T>>()
override suspend fun items(): Map<NameToken, DataTreeItem<T>> = mutex.withLock {
treeItems.filter { !it.key.body.startsWith("@") }
}
override val items: Map<NameToken, DataTreeItem<T>>
get() = treeItems.filter { !it.key.body.startsWith("@") }
private val _updates = MutableSharedFlow<Name>()
override val updates: Flow<Name>
get() = _updates
private suspend fun remove(token: NameToken) {
mutex.withLock {
private suspend fun remove(token: NameToken) = mutex.withLock {
if (treeItems.remove(token) != null) {
_updates.emit(token.asName())
}
}
}
override suspend fun remove(name: Name) {
if (name.isEmpty()) error("Can't remove the root node")
(getItem(name.cutLast()).tree as? ActiveDataTree)?.remove(name.lastOrNull()!!)
}
private suspend fun set(token: NameToken, data: Data<T>) {
mutex.withLock {
private suspend fun set(token: NameToken, data: Data<T>) = mutex.withLock {
treeItems[token] = DataTreeItem.Leaf(data)
}
}
private suspend fun getOrCreateNode(token: NameToken): ActiveDataTree<T> =
(treeItems[token] as? DataTreeItem.Node<T>)?.tree as? ActiveDataTree<T>
@ -56,15 +48,13 @@ public class ActiveDataTree<T : Any>(
}
}
private suspend fun getOrCreateNode(name: Name): ActiveDataTree<T> {
return when (name.length) {
private suspend fun getOrCreateNode(name: Name): ActiveDataTree<T> = when (name.length) {
0 -> this
1 -> getOrCreateNode(name.firstOrNull()!!)
else -> getOrCreateNode(name.firstOrNull()!!).getOrCreateNode(name.cutFirst())
}
}
override suspend fun emit(name: Name, data: Data<T>?) {
override suspend fun data(name: Name, data: Data<T>?) {
if (data == null) {
remove(name)
} else {
@ -77,14 +67,10 @@ public class ActiveDataTree<T : Any>(
_updates.emit(name)
}
/**
* Copy given data set and mirror its changes to this [ActiveDataTree] in [this@setAndObserve]. Returns an update [Job]
*/
public fun CoroutineScope.setAndObserve(name: Name, dataSet: DataSet<T>): Job = launch {
emit(name, dataSet)
dataSet.updates.collect { nameInBranch ->
emit(name + nameInBranch, dataSet.getData(nameInBranch))
}
override suspend fun meta(name: Name, meta: Meta) {
val item = getItem(name)
if(item is DataTreeItem.Leaf) error("TODO: Can't change meta of existing leaf item.")
data(name + DataTree.META_ITEM_NAME_TOKEN, Data.empty(meta))
}
}
@ -106,13 +92,12 @@ public suspend inline fun <reified T : Any> ActiveDataTree(
crossinline block: suspend ActiveDataTree<T>.() -> Unit,
): ActiveDataTree<T> = ActiveDataTree<T>(typeOf<T>()).apply { block() }
public suspend inline fun <reified T : Any> ActiveDataTree<T>.emit(
name: Name,
noinline block: suspend ActiveDataTree<T>.() -> Unit,
): Unit = emit(name, ActiveDataTree(typeOf<T>(), block))
): Unit = node(name, ActiveDataTree(typeOf<T>(), block))
public suspend inline fun <reified T : Any> ActiveDataTree<T>.emit(
name: String,
noinline block: suspend ActiveDataTree<T>.() -> Unit,
): Unit = emit(Name.parse(name), ActiveDataTree(typeOf<T>(), block))
): Unit = node(Name.parse(name), ActiveDataTree(typeOf<T>(), block))

View File

@ -3,7 +3,6 @@ package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import space.kscience.dataforge.actions.Action
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
@ -37,14 +36,14 @@ public abstract class CachingAction<in T : Any, out R : Any>(
scope: CoroutineScope?,
): DataSet<R> = ActiveDataTree<R>(outputType) {
coroutineScope {
populate(transform(dataSet, meta))
populateWith(transform(dataSet, meta))
}
scope?.let {
dataSet.updates.collect {
//clear old nodes
remove(it)
//collect new items
populate(scope.transform(dataSet, meta, it))
populateWith(scope.transform(dataSet, meta, it))
//FIXME if the target is data, updates are fired twice
}
}

View File

@ -15,6 +15,11 @@ public interface DataSet<out T : Any> {
*/
public val dataType: KType
/**
* Meta-data associated with this node. If no meta is provided, returns [Meta.EMPTY].
*/
public val meta: Meta
/**
* Traverse this provider or its child. The order is not guaranteed.
*/
@ -25,6 +30,7 @@ public interface DataSet<out T : Any> {
*/
public suspend fun getData(name: Name): Data<T>?
/**
* Get a snapshot of names of top level children of given node. Empty if node does not exist or is a leaf.
*/
@ -40,6 +46,7 @@ public interface DataSet<out T : Any> {
*/
public val EMPTY: DataSet<Nothing> = object : DataSet<Nothing> {
override val dataType: KType = TYPE_OF_NOTHING
override val meta: Meta get() = Meta.EMPTY
//private val nothing: Nothing get() = error("this is nothing")
@ -65,9 +72,10 @@ public val <T : Any> DataSet<T>.updates: Flow<Name> get() = if (this is ActiveDa
/**
* Flow all data nodes with names starting with [branchName]
*/
public fun <T : Any> DataSet<T>.flowChildren(branchName: Name): Flow<NamedData<T>> = this@flowChildren.flowData().filter {
public fun <T : Any> DataSet<T>.flowChildren(branchName: Name): Flow<NamedData<T>> =
this@flowChildren.flowData().filter {
it.name.startsWith(branchName)
}
}
/**
* Start computation for all goals in data node and return a job for the whole node

View File

@ -2,11 +2,11 @@ package space.kscience.dataforge.data
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.isEmpty
import space.kscience.dataforge.names.plus
import kotlin.reflect.KType
@ -18,12 +18,12 @@ public interface DataSetBuilder<in T : Any> {
*/
public suspend fun remove(name: Name)
public suspend fun emit(name: Name, data: Data<T>?)
public suspend fun data(name: Name, data: Data<T>?)
/**
* Set a current state of given [dataSet] into a branch [name]. Does not propagate updates
*/
public suspend fun emit(name: Name, dataSet: DataSet<T>) {
public suspend fun node(name: Name, dataSet: DataSet<T>) {
//remove previous items
if (name != Name.EMPTY) {
remove(name)
@ -31,27 +31,29 @@ public interface DataSetBuilder<in T : Any> {
//Set new items
dataSet.flowData().collect {
emit(name + it.name, it.data)
data(name + it.name, it.data)
}
}
/**
* Append data to node
* Set meta for the given node
*/
public suspend infix fun String.put(data: Data<T>): Unit = emit(Name.parse(this), data)
public suspend fun meta(name: Name, meta: Meta)
/**
* Append node
*/
public suspend infix fun String.put(dataSet: DataSet<T>): Unit = emit(Name.parse(this), dataSet)
/**
* Build and append node
*/
public suspend infix fun String.put(block: suspend DataSetBuilder<T>.() -> Unit): Unit = emit(Name.parse(this), block)
}
private class SubSetBuilder<in T : Any>(
/**
* Define meta in this [DataSet]
*/
public suspend fun <T : Any> DataSetBuilder<T>.meta(value: Meta): Unit = meta(Name.EMPTY, value)
/**
* Define meta in this [DataSet]
*/
public suspend fun <T : Any> DataSetBuilder<T>.meta(mutableMeta: MutableMeta.() -> Unit): Unit = meta(Meta(mutableMeta))
@PublishedApi
internal class SubSetBuilder<in T : Any>(
private val parent: DataSetBuilder<T>,
private val branch: Name,
) : DataSetBuilder<T> {
@ -61,33 +63,42 @@ private class SubSetBuilder<in T : Any>(
parent.remove(branch + name)
}
override suspend fun emit(name: Name, data: Data<T>?) {
parent.emit(branch + name, data)
override suspend fun data(name: Name, data: Data<T>?) {
parent.data(branch + name, data)
}
override suspend fun emit(name: Name, dataSet: DataSet<T>) {
parent.emit(branch + name, dataSet)
override suspend fun node(name: Name, dataSet: DataSet<T>) {
parent.node(branch + name, dataSet)
}
override suspend fun meta(name: Name, meta: Meta) {
parent.meta(branch + name, meta)
}
}
public suspend fun <T : Any> DataSetBuilder<T>.emit(name: Name, block: suspend DataSetBuilder<T>.() -> Unit) {
SubSetBuilder(this, name).apply { block() }
public suspend inline fun <T : Any> DataSetBuilder<T>.node(
name: Name,
crossinline block: suspend DataSetBuilder<T>.() -> Unit,
) {
if (name.isEmpty()) block() else SubSetBuilder(this, name).block()
}
public suspend fun <T : Any> DataSetBuilder<T>.emit(name: String, data: Data<T>) {
emit(Name.parse(name), data)
public suspend fun <T : Any> DataSetBuilder<T>.data(name: String, value: Data<T>) {
data(Name.parse(name), value)
}
public suspend fun <T : Any> DataSetBuilder<T>.emit(name: String, set: DataSet<T>) {
this.emit(Name.parse(name), set)
public suspend fun <T : Any> DataSetBuilder<T>.node(name: String, set: DataSet<T>) {
node(Name.parse(name), set)
}
public suspend fun <T : Any> DataSetBuilder<T>.emit(name: String, block: suspend DataSetBuilder<T>.() -> Unit): Unit =
this@emit.emit(Name.parse(name), block)
public suspend inline fun <T : Any> DataSetBuilder<T>.node(
name: String,
crossinline block: suspend DataSetBuilder<T>.() -> Unit,
): Unit = node(Name.parse(name), block)
public suspend fun <T : Any> DataSetBuilder<T>.emit(data: NamedData<T>) {
emit(data.name, data.data)
public suspend fun <T : Any> DataSetBuilder<T>.set(value: NamedData<T>) {
data(value.name, value.data)
}
/**
@ -99,7 +110,7 @@ public suspend inline fun <reified T : Any> DataSetBuilder<T>.produce(
noinline producer: suspend () -> T,
) {
val data = Data(meta, block = producer)
emit(name, data)
data(name, data)
}
public suspend inline fun <reified T : Any> DataSetBuilder<T>.produce(
@ -108,7 +119,7 @@ public suspend inline fun <reified T : Any> DataSetBuilder<T>.produce(
noinline producer: suspend () -> T,
) {
val data = Data(meta, block = producer)
emit(name, data)
data(name, data)
}
/**
@ -117,36 +128,34 @@ public suspend inline fun <reified T : Any> DataSetBuilder<T>.produce(
public suspend inline fun <reified T : Any> DataSetBuilder<T>.static(
name: String,
data: T,
meta: Meta = Meta.EMPTY
): Unit =
emit(name, Data.static(data, meta))
meta: Meta = Meta.EMPTY,
): Unit = data(name, Data.static(data, meta))
public suspend inline fun <reified T : Any> DataSetBuilder<T>.static(
name: Name,
data: T,
meta: Meta = Meta.EMPTY
): Unit =
emit(name, Data.static(data, meta))
meta: Meta = Meta.EMPTY,
): Unit = data(name, Data.static(data, meta))
public suspend inline fun <reified T : Any> DataSetBuilder<T>.static(
name: String,
data: T,
mutableMeta: MutableMeta.() -> Unit,
): Unit = emit(Name.parse(name), Data.static(data, Meta(mutableMeta)))
): Unit = data(Name.parse(name), Data.static(data, Meta(mutableMeta)))
/**
* Update data with given node data and meta with node meta.
*/
@DFExperimental
public suspend fun <T : Any> DataSetBuilder<T>.populate(tree: DataSet<T>): Unit = coroutineScope {
public suspend fun <T : Any> DataSetBuilder<T>.populateFrom(tree: DataSet<T>): Unit = coroutineScope {
tree.flowData().collect {
//TODO check if the place is occupied
emit(it.name, it.data)
data(it.name, it.data)
}
}
public suspend fun <T : Any> DataSetBuilder<T>.populate(flow: Flow<NamedData<T>>) {
public suspend fun <T : Any> DataSetBuilder<T>.populateWith(flow: Flow<NamedData<T>>) {
flow.collect {
emit(it.name, it.data)
data(it.name, it.data)
}
}

View File

@ -4,6 +4,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.Type
import space.kscience.dataforge.names.*
import kotlin.collections.component1
@ -11,8 +12,16 @@ import kotlin.collections.component2
import kotlin.reflect.KType
public sealed class DataTreeItem<out T : Any> {
public class Node<out T : Any>(public val tree: DataTree<T>) : DataTreeItem<T>()
public class Leaf<out T : Any>(public val data: Data<T>) : DataTreeItem<T>()
public abstract val meta: Meta
public class Node<out T : Any>(public val tree: DataTree<T>) : DataTreeItem<T>() {
override val meta: Meta get() = tree.meta
}
public class Leaf<out T : Any>(public val data: Data<T>) : DataTreeItem<T>() {
override val meta: Meta get() = data.meta
}
}
public val <T : Any> DataTreeItem<T>.type: KType
@ -28,13 +37,15 @@ public val <T : Any> DataTreeItem<T>.type: KType
public interface DataTree<out T : Any> : DataSet<T> {
/**
* Children items of this [DataTree] provided asynchronously
* Top-level children items of this [DataTree]
*/
public suspend fun items(): Map<NameToken, DataTreeItem<T>>
public val items: Map<NameToken, DataTreeItem<T>>
override val meta: Meta get() = items[META_ITEM_NAME_TOKEN]?.meta ?: Meta.EMPTY
override fun flowData(): Flow<NamedData<T>> = flow {
items().forEach { (token, childItem: DataTreeItem<T>) ->
if(!token.body.startsWith("@")) {
items.forEach { (token, childItem: DataTreeItem<T>) ->
if (!token.body.startsWith("@")) {
when (childItem) {
is DataTreeItem.Leaf -> emit(childItem.data.named(token.asName()))
is DataTreeItem.Node -> emitAll(childItem.tree.flowData().map { it.named(token + it.name) })
@ -44,28 +55,33 @@ public interface DataTree<out T : Any> : DataSet<T> {
}
override suspend fun listTop(prefix: Name): List<Name> =
getItem(prefix).tree?.items()?.keys?.map { prefix + it } ?: emptyList()
getItem(prefix).tree?.items?.keys?.map { prefix + it } ?: emptyList()
override suspend fun getData(name: Name): Data<T>? = when (name.length) {
0 -> null
1 -> items()[name.firstOrNull()!!].data
else -> items()[name.firstOrNull()!!].tree?.getData(name.cutFirst())
1 -> items[name.firstOrNull()!!].data
else -> items[name.firstOrNull()!!].tree?.getData(name.cutFirst())
}
public companion object {
public const val TYPE: String = "dataTree"
/**
* A name token used to designate tree node meta
*/
public val META_ITEM_NAME_TOKEN: NameToken = NameToken("@meta")
}
}
public suspend fun <T: Any> DataSet<T>.getData(name: String): Data<T>? = getData(Name.parse(name))
public suspend fun <T : Any> DataSet<T>.getData(name: String): Data<T>? = getData(Name.parse(name))
/**
* Get a [DataTreeItem] with given [name] or null if the item does not exist
*/
public tailrec suspend fun <T : Any> DataTree<T>.getItem(name: Name): DataTreeItem<T>? = when (name.length) {
public tailrec fun <T : Any> DataTree<T>.getItem(name: Name): DataTreeItem<T>? = when (name.length) {
0 -> DataTreeItem.Node(this)
1 -> items()[name.firstOrNull()]
else -> items()[name.firstOrNull()!!].tree?.getItem(name.cutFirst())
1 -> items[name.firstOrNull()]
else -> items[name.firstOrNull()!!].tree?.getItem(name.cutFirst())
}
public val <T : Any> DataTreeItem<T>?.tree: DataTree<T>? get() = (this as? DataTreeItem.Node<T>)?.tree
@ -75,7 +91,7 @@ public val <T : Any> DataTreeItem<T>?.data: Data<T>? get() = (this as? DataTreeI
* Flow of all children including nodes
*/
public fun <T : Any> DataTree<T>.itemFlow(): Flow<Pair<Name, DataTreeItem<T>>> = flow {
items().forEach { (head, item) ->
items.forEach { (head, item) ->
emit(head.asName() to item)
if (item is DataTreeItem.Node) {
val subSequence = item.tree.itemFlow()
@ -92,5 +108,9 @@ public fun <T : Any> DataTree<T>.itemFlow(): Flow<Pair<Name, DataTreeItem<T>>> =
public fun <T : Any> DataTree<T>.branch(branchName: Name): DataTree<T> = object : DataTree<T> {
override val dataType: KType get() = this@branch.dataType
override suspend fun items(): Map<NameToken, DataTreeItem<T>> = getItem(branchName).tree?.items() ?: emptyMap()
override val meta: Meta
get() = getItem(branchName)?.meta ?: Meta.EMPTY
override val items: Map<NameToken, DataTreeItem<T>>
get() = getItem(branchName).tree?.items ?: emptyMap()
}

View File

@ -16,7 +16,6 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
@ -46,7 +45,7 @@ public interface GroupRule {
set.flowData().collect { data ->
val tagValue = data.meta[key]?.string ?: defaultTagValue
map.getOrPut(tagValue) { ActiveDataTree(set.dataType) }.emit(data.name, data.data)
map.getOrPut(tagValue) { ActiveDataTree(set.dataType) }.data(data.name, data.data)
}
scope.launch {
@ -55,7 +54,7 @@ public interface GroupRule {
@Suppress("NULLABLE_EXTENSION_OPERATOR_WITH_SAFE_CALL_RECEIVER")
val tagValue = data?.meta?.get(key)?.string ?: defaultTagValue
map.getOrPut(tagValue) { ActiveDataTree(set.dataType) }.emit(name, data)
map.getOrPut(tagValue) { ActiveDataTree(set.dataType) }.data(name, data)
}
}

View File

@ -1,7 +1,7 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.collect
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.*
import kotlin.reflect.KType
@ -12,15 +12,16 @@ internal class StaticDataTree<T : Any>(
override val dataType: KType,
) : DataSetBuilder<T>, DataTree<T> {
private val items: MutableMap<NameToken, DataTreeItem<T>> = HashMap()
private val _items: MutableMap<NameToken, DataTreeItem<T>> = HashMap()
override suspend fun items(): Map<NameToken, DataTreeItem<T>> = items.filter { !it.key.body.startsWith("@") }
override val items: Map<NameToken, DataTreeItem<T>>
get() = _items.filter { !it.key.body.startsWith("@") }
override suspend fun remove(name: Name) {
when (name.length) {
0 -> error("Can't remove root tree node")
1 -> items.remove(name.firstOrNull()!!)
else -> (items[name.firstOrNull()!!].tree as? StaticDataTree<T>)?.remove(name.cutFirst())
1 -> _items.remove(name.firstOrNull()!!)
else -> (_items[name.firstOrNull()!!].tree as? StaticDataTree<T>)?.remove(name.cutFirst())
}
}
@ -28,8 +29,8 @@ internal class StaticDataTree<T : Any>(
0 -> this
1 -> {
val itemName = name.firstOrNull()!!
(items[itemName].tree as? StaticDataTree<T>) ?: StaticDataTree<T>(dataType).also {
items[itemName] = DataTreeItem.Node(it)
(_items[itemName].tree as? StaticDataTree<T>) ?: StaticDataTree<T>(dataType).also {
_items[itemName] = DataTreeItem.Node(it)
}
}
else -> getOrCreateNode(name.cutLast()).getOrCreateNode(name.lastOrNull()!!.asName())
@ -40,25 +41,31 @@ internal class StaticDataTree<T : Any>(
if (item == null) {
remove(name)
} else {
getOrCreateNode(name.cutLast()).items[name.lastOrNull()!!] = item
getOrCreateNode(name.cutLast())._items[name.lastOrNull()!!] = item
}
}
override suspend fun emit(name: Name, data: Data<T>?) {
override suspend fun data(name: Name, data: Data<T>?) {
set(name, data?.let { DataTreeItem.Leaf(it) })
}
override suspend fun emit(name: Name, dataSet: DataSet<T>) {
override suspend fun node(name: Name, dataSet: DataSet<T>) {
if (dataSet is StaticDataTree) {
set(name, DataTreeItem.Node(dataSet))
} else {
coroutineScope {
dataSet.flowData().collect {
emit(name + it.name, it.data)
data(name + it.name, it.data)
}
}
}
}
override suspend fun meta(name: Name, meta: Meta) {
val item = getItem(name)
if(item is DataTreeItem.Leaf) TODO("Can't change meta of existing leaf item.")
data(name + DataTree.META_ITEM_NAME_TOKEN, Data.empty(meta))
}
}
@Suppress("FunctionName")
@ -73,6 +80,6 @@ public suspend inline fun <reified T : Any> DataTree(
): DataTree<T> = DataTree(typeOf<T>(), block)
@OptIn(DFExperimental::class)
public suspend fun <T : Any> DataSet<T>.seal(): DataTree<T> = DataTree(dataType){
populate(this@seal)
public suspend fun <T : Any> DataSet<T>.seal(): DataTree<T> = DataTree(dataType) {
populateFrom(this@seal)
}

View File

@ -4,6 +4,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.isEmpty
@ -18,8 +19,11 @@ import kotlin.reflect.KType
public fun <T : Any> DataSet<T>.filter(
predicate: suspend (Name, Data<T>) -> Boolean,
): ActiveDataSet<T> = object : ActiveDataSet<T> {
override val dataType: KType get() = this@filter.dataType
override val meta: Meta get() = this@filter.meta
override fun flowData(): Flow<NamedData<T>> =
this@filter.flowData().filter { predicate(it.name, it.data) }
@ -38,8 +42,12 @@ public fun <T : Any> DataSet<T>.filter(
*/
public fun <T : Any> DataSet<T>.withNamePrefix(prefix: Name): DataSet<T> = if (prefix.isEmpty()) this
else object : ActiveDataSet<T> {
override val dataType: KType get() = this@withNamePrefix.dataType
override val meta: Meta get() = this@withNamePrefix.meta
override fun flowData(): Flow<NamedData<T>> = this@withNamePrefix.flowData().map { it.data.named(prefix + it.name) }
override suspend fun getData(name: Name): Data<T>? =
@ -56,6 +64,8 @@ public fun <T : Any> DataSet<T>.branch(branchName: Name): DataSet<T> = if (branc
} else object : ActiveDataSet<T> {
override val dataType: KType get() = this@branch.dataType
override val meta: Meta get() = this@branch.meta
override fun flowData(): Flow<NamedData<T>> = this@branch.flowData().mapNotNull {
it.name.removeHeadOrNull(branchName)?.let { name ->
it.data.named(name)

View File

@ -1,20 +0,0 @@
package space.kscience.dataforge.data
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
/**
* Get a metadata node for this set if it is present
*/
public suspend fun DataSet<*>.getMeta(): Meta? = getData(DataSet.META_KEY)?.meta
/**
* Add meta-data node to a [DataSet]
*/
public suspend fun DataSetBuilder<*>.meta(meta: Meta): Unit = emit(DataSet.META_KEY, Data.empty(meta))
/**
* Add meta-data node to a [DataSet]
*/
public suspend fun DataSetBuilder<*>.meta(mutableMeta: MutableMeta.() -> Unit): Unit = meta(Meta(mutableMeta))

View File

@ -1,6 +1,9 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.fold
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.meta.seal
@ -143,7 +146,7 @@ public suspend fun <T : Any, R : Any> DataSet<T>.map(
metaTransform: MutableMeta.() -> Unit = {},
block: suspend (T) -> R,
): DataTree<R> = DataTree<R>(outputType) {
populate(
populateWith(
flowData().map {
val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal()
Data(outputType, newMeta, coroutineContext, listOf(it)) {

View File

@ -0,0 +1,40 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.plus
/**
* Append data to node
*/
context(DataSetBuilder<T>) public suspend infix fun <T : Any> String.put(data: Data<T>): Unit =
data(Name.parse(this), data)
/**
* Append node
*/
context(DataSetBuilder<T>) public suspend infix fun <T : Any> String.put(dataSet: DataSet<T>): Unit =
node(Name.parse(this), dataSet)
/**
* Build and append node
*/
context(DataSetBuilder<T>) public suspend infix fun <T : Any> String.put(
block: suspend DataSetBuilder<T>.() -> Unit,
): Unit = node(Name.parse(this), block)
/**
* Copy given data set and mirror its changes to this [ActiveDataTree] in [this@setAndObserve]. Returns an update [Job]
*/
context(DataSetBuilder<T>) public fun <T : Any> CoroutineScope.setAndWatch(
name: Name,
dataSet: DataSet<T>,
): Job = launch {
node(name, dataSet)
dataSet.updates.collect { nameInBranch ->
data(name + nameInBranch, dataSet.getData(nameInBranch))
}
}

View File

@ -4,11 +4,12 @@ import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import space.kscience.dataforge.actions.Action
import space.kscience.dataforge.actions.map
import space.kscience.dataforge.misc.DFExperimental
import kotlin.test.assertEquals
@Suppress("EXPERIMENTAL_API_USAGE")
class ActionsTest {
val data: DataTree<Int> = runBlocking {
@OptIn(DFExperimental::class)
internal class ActionsTest {
private val data: DataTree<Int> = runBlocking {
DataTree {
repeat(10) {
static(it.toString(), it)
@ -32,6 +33,7 @@ class ActionsTest {
val plusOne = Action.map<Int, Int> {
result { it + 1 }
}
val datum = runBlocking {
val result = plusOne.execute(data, scope = this)
result.getData("1")?.await()

View File

@ -1,7 +1,6 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.collect
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.asName
import kotlin.test.Test
@ -43,7 +42,7 @@ internal class DataTreeBuilderTest {
static("b", "b")
}
static("root", "root")
populate(updateData)
populateFrom(updateData)
}
runBlocking {
@ -72,7 +71,7 @@ internal class DataTreeBuilderTest {
}
}
val rootNode = ActiveDataTree<Int> {
setAndObserve("sub".asName(), subNode)
setAndWatch("sub".asName(), subNode)
}
launch {

View File

@ -38,7 +38,7 @@ public suspend inline fun <T : Any, reified R : Any> TaskResultBuilder<R>.pipeFr
action(it, data.name, meta)
}
emit(data.name, res)
data(data.name, res)
}
}

View File

@ -80,15 +80,15 @@ public suspend fun <T : Any> DataSetBuilder<T>.file(
val data = readDataFile(path, formatResolver)
val name = data.meta[Envelope.ENVELOPE_NAME_KEY].string
?: path.fileName.toString().replace(".df", "")
emit(name, data)
data(name, data)
}
} else {
//otherwise, read as directory
plugin.run {
val data = readDataDirectory(path, formatResolver)
val name = data.getMeta()?.get(Envelope.ENVELOPE_NAME_KEY).string
val name = data.meta[Envelope.ENVELOPE_NAME_KEY].string
?: path.fileName.toString().replace(".df", "")
emit(name, data)
node(name, data)
}
}
}
@ -143,7 +143,7 @@ public suspend fun <T : Any> IOPlugin.writeDataDirectory(
} else if (!Files.isDirectory(path)) {
error("Can't write a node into file")
}
tree.items().forEach { (token, item) ->
tree.items.forEach { (token, item) ->
val childPath = path.resolve(token.toString())
when (item) {
is DataTreeItem.Node -> {
@ -159,11 +159,9 @@ public suspend fun <T : Any> IOPlugin.writeDataDirectory(
}
}
}
val treeMeta = tree.getMeta()
if (treeMeta != null) {
val treeMeta = tree.meta
writeMetaFile(path, treeMeta, metaFormat ?: JsonMetaFormat)
}
}
}
@ -192,7 +190,7 @@ private suspend fun <T : Any> ZipOutputStream.writeNode(
val entry = ZipEntry("$name/")
putNextEntry(entry)
closeEntry()
treeItem.tree.items().forEach { (token, item) ->
treeItem.tree.items.forEach { (token, item) ->
val childName = "$name/$token"
writeNode(childName, item, dataFormat, envelopeFormat)
}

View File

@ -19,13 +19,13 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
val result: Data<Int> = selectedData.flowData().foldToData(0) { result, data ->
result + data.await()
}
emit("result", result)
data("result", result)
}
val singleData by task<Int> {
workspace.data.select<Int>().getData("myData[12]")?.let {
emit("result", it)
data("result", it)
}
}

View File

@ -22,7 +22,7 @@ import kotlin.test.assertEquals
class FileDataTest {
val dataNode = runBlocking {
DataTree<String> {
emit("dir") {
node("dir") {
static("a", "Some string") {
"content" put "Some string"
}

View File

@ -39,7 +39,7 @@ class SimpleWorkspaceTest {
override val tag: PluginTag = PluginTag("test")
val test by task<Any> {
populate(
populateFrom(
workspace.data.map {
it.also {
logger.info { "Test: $it" }
@ -66,7 +66,7 @@ class SimpleWorkspaceTest {
val filterOne by task<Int> {
workspace.data.selectOne<Int>("myData[12]")?.let { source ->
emit(source.name, source.map { it })
data(source.name, source.map { it })
}
}
@ -106,7 +106,7 @@ class SimpleWorkspaceTest {
val newData: Data<Int> = data.combine(linearData.getData(data.name)!!) { l, r ->
l + r
}
emit(data.name, newData)
data(data.name, newData)
}
}
@ -115,7 +115,7 @@ class SimpleWorkspaceTest {
val res = from(square).foldToData(0) { l, r ->
l + r.await()
}
emit("sum", res)
data("sum", res)
}
val averageByGroup by task<Int> {
@ -125,13 +125,13 @@ class SimpleWorkspaceTest {
l + r.await()
}
emit("even", evenSum)
data("even", evenSum)
val oddSum = workspace.data.filter { name, _ ->
name.toString().toInt() % 2 == 1
}.select<Int>().foldToData(0) { l, r ->
l + r.await()
}
emit("odd", oddSum)
data("odd", oddSum)
}
val delta by task<Int> {
@ -141,7 +141,7 @@ class SimpleWorkspaceTest {
val res = even.combine(odd) { l, r ->
l - r
}
emit("res", res)
data("res", res)
}
val customPipe by task<Int> {
@ -149,8 +149,7 @@ class SimpleWorkspaceTest {
val meta = data.meta.toMutableMeta().apply {
"newValue" put 22
}
emit(data.name + "new", data.map { (data.meta["value"].int ?: 0) + it })
data(data.name + "new", data.map { (data.meta["value"].int ?: 0) + it })
}
}