DataSet flow to sequence
This commit is contained in:
parent
eaa9d40d60
commit
77857289f0
@ -11,6 +11,7 @@
|
|||||||
- KTor 2.0
|
- KTor 2.0
|
||||||
- DataTree `items` call is blocking.
|
- DataTree `items` call is blocking.
|
||||||
- DataSet `getData` is no longer suspended and renamed to `get`
|
- DataSet `getData` is no longer suspended and renamed to `get`
|
||||||
|
- DataSet operates with sequences of data instead of flows
|
||||||
|
|
||||||
### Deprecated
|
### Deprecated
|
||||||
|
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package space.kscience.dataforge.actions
|
package space.kscience.dataforge.actions
|
||||||
|
|
||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.flow.map
|
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
import space.kscience.dataforge.data.*
|
import space.kscience.dataforge.data.*
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
@ -50,7 +49,7 @@ internal class MapAction<in T : Any, out R : Any>(
|
|||||||
meta: Meta,
|
meta: Meta,
|
||||||
scope: CoroutineScope?,
|
scope: CoroutineScope?,
|
||||||
): DataSet<R> {
|
): DataSet<R> {
|
||||||
suspend fun mapOne(data: NamedData<T>): NamedData<R> {
|
fun mapOne(data: NamedData<T>): NamedData<R> {
|
||||||
// Creating a new environment for action using **old** name, old meta and task meta
|
// Creating a new environment for action using **old** name, old meta and task meta
|
||||||
val env = ActionEnv(data.name, data.meta, meta)
|
val env = ActionEnv(data.name, data.meta, meta)
|
||||||
|
|
||||||
@ -75,16 +74,16 @@ internal class MapAction<in T : Any, out R : Any>(
|
|||||||
return newData.named(newName)
|
return newData.named(newName)
|
||||||
}
|
}
|
||||||
|
|
||||||
val flow = dataSet.flowData().map(::mapOne)
|
val sequence = dataSet.dataSequence().map(::mapOne)
|
||||||
|
|
||||||
return ActiveDataTree(outputType) {
|
return ActiveDataTree(outputType) {
|
||||||
populateWith(flow)
|
populateWith(sequence)
|
||||||
scope?.launch {
|
scope?.launch {
|
||||||
dataSet.updates.collect { name ->
|
dataSet.updates.collect { name ->
|
||||||
//clear old nodes
|
//clear old nodes
|
||||||
remove(name)
|
remove(name)
|
||||||
//collect new items
|
//collect new items
|
||||||
populateWith(dataSet.flowChildren(name).map(::mapOne))
|
populateWith(dataSet.children(name).map(::mapOne))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,6 @@ package space.kscience.dataforge.actions
|
|||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.flow
|
import kotlinx.coroutines.flow.flow
|
||||||
import kotlinx.coroutines.flow.fold
|
|
||||||
import space.kscience.dataforge.data.*
|
import space.kscience.dataforge.data.*
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.meta.MutableMeta
|
import space.kscience.dataforge.meta.MutableMeta
|
||||||
@ -84,7 +83,7 @@ internal class ReduceAction<T : Any, R : Any>(
|
|||||||
|
|
||||||
override fun CoroutineScope.transform(set: DataSet<T>, meta: Meta, key: Name): Flow<NamedData<R>> = flow {
|
override fun CoroutineScope.transform(set: DataSet<T>, meta: Meta, key: Name): Flow<NamedData<R>> = flow {
|
||||||
ReduceGroupBuilder<T, R>(inputType, this@transform, meta).apply(action).buildGroups(set).forEach { group ->
|
ReduceGroupBuilder<T, R>(inputType, this@transform, meta).apply(action).buildGroups(set).forEach { group ->
|
||||||
val dataFlow: Map<Name, Data<T>> = group.set.flowData().fold(HashMap()) { acc, value ->
|
val dataFlow: Map<Name, Data<T>> = group.set.dataSequence().fold(HashMap()) { acc, value ->
|
||||||
acc.apply {
|
acc.apply {
|
||||||
acc[value.name] = value.data
|
acc[value.name] = value.data
|
||||||
}
|
}
|
||||||
|
@ -2,10 +2,6 @@ package space.kscience.dataforge.actions
|
|||||||
|
|
||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.FlowPreview
|
import kotlinx.coroutines.FlowPreview
|
||||||
import kotlinx.coroutines.flow.Flow
|
|
||||||
import kotlinx.coroutines.flow.asFlow
|
|
||||||
import kotlinx.coroutines.flow.flatMapConcat
|
|
||||||
import kotlinx.coroutines.flow.map
|
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
import space.kscience.dataforge.data.*
|
import space.kscience.dataforge.data.*
|
||||||
import space.kscience.dataforge.meta.Laminate
|
import space.kscience.dataforge.meta.Laminate
|
||||||
@ -58,14 +54,14 @@ internal class SplitAction<T : Any, R : Any>(
|
|||||||
scope: CoroutineScope?,
|
scope: CoroutineScope?,
|
||||||
): DataSet<R> {
|
): DataSet<R> {
|
||||||
|
|
||||||
suspend fun splitOne(data: NamedData<T>): Flow<NamedData<R>> {
|
fun splitOne(data: NamedData<T>): Sequence<NamedData<R>> {
|
||||||
val laminate = Laminate(data.meta, meta)
|
val laminate = Laminate(data.meta, meta)
|
||||||
|
|
||||||
val split = SplitBuilder<T, R>(data.name, data.meta).apply(action)
|
val split = SplitBuilder<T, R>(data.name, data.meta).apply(action)
|
||||||
|
|
||||||
|
|
||||||
// apply individual fragment rules to result
|
// apply individual fragment rules to result
|
||||||
return split.fragments.entries.asFlow().map { (fragmentName, rule) ->
|
return split.fragments.entries.asSequence().map { (fragmentName, rule) ->
|
||||||
val env = SplitBuilder.FragmentRule<T, R>(fragmentName, laminate.toMutableMeta()).apply(rule)
|
val env = SplitBuilder.FragmentRule<T, R>(fragmentName, laminate.toMutableMeta()).apply(rule)
|
||||||
//data.map<R>(outputType, meta = env.meta) { env.result(it) }.named(fragmentName)
|
//data.map<R>(outputType, meta = env.meta) { env.result(it) }.named(fragmentName)
|
||||||
@OptIn(DFInternal::class) Data(outputType, meta = env.meta, dependencies = listOf(data)) {
|
@OptIn(DFInternal::class) Data(outputType, meta = env.meta, dependencies = listOf(data)) {
|
||||||
@ -75,13 +71,13 @@ internal class SplitAction<T : Any, R : Any>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
return ActiveDataTree<R>(outputType) {
|
return ActiveDataTree<R>(outputType) {
|
||||||
populateWith(dataSet.flowData().flatMapConcat(transform = ::splitOne))
|
populateWith(dataSet.dataSequence().flatMap (transform = ::splitOne))
|
||||||
scope?.launch {
|
scope?.launch {
|
||||||
dataSet.updates.collect { name ->
|
dataSet.updates.collect { name ->
|
||||||
//clear old nodes
|
//clear old nodes
|
||||||
remove(name)
|
remove(name)
|
||||||
//collect new items
|
//collect new items
|
||||||
populateWith(dataSet.flowChildren(name).flatMapConcat(transform = ::splitOne))
|
populateWith(dataSet.children(name).flatMap(transform = ::splitOne))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,9 @@
|
|||||||
package space.kscience.dataforge.data
|
package space.kscience.dataforge.data
|
||||||
|
|
||||||
import kotlinx.coroutines.*
|
import kotlinx.coroutines.*
|
||||||
import kotlinx.coroutines.flow.*
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
import kotlinx.coroutines.flow.emptyFlow
|
||||||
|
import kotlinx.coroutines.flow.mapNotNull
|
||||||
import space.kscience.dataforge.data.Data.Companion.TYPE_OF_NOTHING
|
import space.kscience.dataforge.data.Data.Companion.TYPE_OF_NOTHING
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.meta.set
|
import space.kscience.dataforge.meta.set
|
||||||
@ -23,7 +25,7 @@ public interface DataSet<out T : Any> {
|
|||||||
/**
|
/**
|
||||||
* Traverse this provider or its child. The order is not guaranteed.
|
* Traverse this provider or its child. The order is not guaranteed.
|
||||||
*/
|
*/
|
||||||
public fun flowData(): Flow<NamedData<T>>
|
public fun dataSequence(): Sequence<NamedData<T>>
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get data with given name.
|
* Get data with given name.
|
||||||
@ -34,8 +36,8 @@ public interface DataSet<out T : Any> {
|
|||||||
/**
|
/**
|
||||||
* Get a snapshot of names of top level children of given node. Empty if node does not exist or is a leaf.
|
* Get a snapshot of names of top level children of given node. Empty if node does not exist or is a leaf.
|
||||||
*/
|
*/
|
||||||
public suspend fun listTop(prefix: Name = Name.EMPTY): List<Name> =
|
public fun listTop(prefix: Name = Name.EMPTY): List<Name> =
|
||||||
flowData().map { it.name }.filter { it.startsWith(prefix) && (it.length == prefix.length + 1) }.toList()
|
dataSequence().map { it.name }.filter { it.startsWith(prefix) && (it.length == prefix.length + 1) }.toList()
|
||||||
// By default, traverses the whole tree. Could be optimized in descendants
|
// By default, traverses the whole tree. Could be optimized in descendants
|
||||||
|
|
||||||
public companion object {
|
public companion object {
|
||||||
@ -50,13 +52,15 @@ public interface DataSet<out T : Any> {
|
|||||||
|
|
||||||
//private val nothing: Nothing get() = error("this is nothing")
|
//private val nothing: Nothing get() = error("this is nothing")
|
||||||
|
|
||||||
override fun flowData(): Flow<NamedData<Nothing>> = emptyFlow()
|
override fun dataSequence(): Sequence<NamedData<Nothing>> = emptySequence()
|
||||||
|
|
||||||
override fun get(name: Name): Data<Nothing>? = null
|
override fun get(name: Name): Data<Nothing>? = null
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public operator fun <T: Any> DataSet<T>.get(name:String): Data<T>? = get(name.parseAsName())
|
||||||
|
|
||||||
public interface ActiveDataSet<T : Any> : DataSet<T> {
|
public interface ActiveDataSet<T : Any> : DataSet<T> {
|
||||||
/**
|
/**
|
||||||
* A flow of updated item names. Updates are propagated in a form of [Flow] of names of updated nodes.
|
* A flow of updated item names. Updates are propagated in a form of [Flow] of names of updated nodes.
|
||||||
@ -72,8 +76,8 @@ public val <T : Any> DataSet<T>.updates: Flow<Name> get() = if (this is ActiveDa
|
|||||||
/**
|
/**
|
||||||
* Flow all data nodes with names starting with [branchName]
|
* Flow all data nodes with names starting with [branchName]
|
||||||
*/
|
*/
|
||||||
public fun <T : Any> DataSet<T>.flowChildren(branchName: Name): Flow<NamedData<T>> =
|
public fun <T : Any> DataSet<T>.children(branchName: Name): Sequence<NamedData<T>> =
|
||||||
this@flowChildren.flowData().filter {
|
this@children.dataSequence().filter {
|
||||||
it.name.startsWith(branchName)
|
it.name.startsWith(branchName)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -81,7 +85,7 @@ public fun <T : Any> DataSet<T>.flowChildren(branchName: Name): Flow<NamedData<T
|
|||||||
* Start computation for all goals in data node and return a job for the whole node
|
* Start computation for all goals in data node and return a job for the whole node
|
||||||
*/
|
*/
|
||||||
public fun <T : Any> DataSet<T>.startAll(coroutineScope: CoroutineScope): Job = coroutineScope.launch {
|
public fun <T : Any> DataSet<T>.startAll(coroutineScope: CoroutineScope): Job = coroutineScope.launch {
|
||||||
flowData().map {
|
dataSequence().map {
|
||||||
it.launch(this@launch)
|
it.launch(this@launch)
|
||||||
}.toList().joinAll()
|
}.toList().joinAll()
|
||||||
}
|
}
|
||||||
@ -89,7 +93,7 @@ public fun <T : Any> DataSet<T>.startAll(coroutineScope: CoroutineScope): Job =
|
|||||||
public suspend fun <T : Any> DataSet<T>.join(): Unit = coroutineScope { startAll(this).join() }
|
public suspend fun <T : Any> DataSet<T>.join(): Unit = coroutineScope { startAll(this).join() }
|
||||||
|
|
||||||
public suspend fun DataSet<*>.toMeta(): Meta = Meta {
|
public suspend fun DataSet<*>.toMeta(): Meta = Meta {
|
||||||
flowData().collect {
|
dataSequence().forEach {
|
||||||
if (it.name.endsWith(DataSet.META_KEY)) {
|
if (it.name.endsWith(DataSet.META_KEY)) {
|
||||||
set(it.name, it.meta)
|
set(it.name, it.meta)
|
||||||
} else {
|
} else {
|
||||||
|
@ -30,7 +30,7 @@ public interface DataSetBuilder<in T : Any> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//Set new items
|
//Set new items
|
||||||
dataSet.flowData().collect {
|
dataSet.dataSequence().forEach {
|
||||||
data(name + it.name, it.data)
|
data(name + it.name, it.data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -148,7 +148,7 @@ public suspend inline fun <reified T : Any> DataSetBuilder<T>.static(
|
|||||||
*/
|
*/
|
||||||
@DFExperimental
|
@DFExperimental
|
||||||
public suspend fun <T : Any> DataSetBuilder<T>.populateFrom(tree: DataSet<T>): Unit = coroutineScope {
|
public suspend fun <T : Any> DataSetBuilder<T>.populateFrom(tree: DataSet<T>): Unit = coroutineScope {
|
||||||
tree.flowData().collect {
|
tree.dataSequence().forEach {
|
||||||
//TODO check if the place is occupied
|
//TODO check if the place is occupied
|
||||||
data(it.name, it.data)
|
data(it.name, it.data)
|
||||||
}
|
}
|
||||||
@ -159,3 +159,9 @@ public suspend fun <T : Any> DataSetBuilder<T>.populateWith(flow: Flow<NamedData
|
|||||||
data(it.name, it.data)
|
data(it.name, it.data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public suspend fun <T : Any> DataSetBuilder<T>.populateWith(sequence: Sequence<NamedData<T>>) {
|
||||||
|
sequence.forEach {
|
||||||
|
data(it.name, it.data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -43,18 +43,18 @@ public interface DataTree<out T : Any> : DataSet<T> {
|
|||||||
|
|
||||||
override val meta: Meta get() = items[META_ITEM_NAME_TOKEN]?.meta ?: Meta.EMPTY
|
override val meta: Meta get() = items[META_ITEM_NAME_TOKEN]?.meta ?: Meta.EMPTY
|
||||||
|
|
||||||
override fun flowData(): Flow<NamedData<T>> = flow {
|
override fun dataSequence(): Sequence<NamedData<T>> = sequence {
|
||||||
items.forEach { (token, childItem: DataTreeItem<T>) ->
|
items.forEach { (token, childItem: DataTreeItem<T>) ->
|
||||||
if (!token.body.startsWith("@")) {
|
if (!token.body.startsWith("@")) {
|
||||||
when (childItem) {
|
when (childItem) {
|
||||||
is DataTreeItem.Leaf -> emit(childItem.data.named(token.asName()))
|
is DataTreeItem.Leaf -> yield(childItem.data.named(token.asName()))
|
||||||
is DataTreeItem.Node -> emitAll(childItem.tree.flowData().map { it.named(token + it.name) })
|
is DataTreeItem.Node -> yieldAll(childItem.tree.dataSequence().map { it.named(token + it.name) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun listTop(prefix: Name): List<Name> =
|
override fun listTop(prefix: Name): List<Name> =
|
||||||
getItem(prefix).tree?.items?.keys?.map { prefix + it } ?: emptyList()
|
getItem(prefix).tree?.items?.keys?.map { prefix + it } ?: emptyList()
|
||||||
|
|
||||||
override fun get(name: Name): Data<T>? = when (name.length) {
|
override fun get(name: Name): Data<T>? = when (name.length) {
|
||||||
|
@ -43,7 +43,7 @@ public interface GroupRule {
|
|||||||
): Map<String, DataSet<T>> {
|
): Map<String, DataSet<T>> {
|
||||||
val map = HashMap<String, ActiveDataTree<T>>()
|
val map = HashMap<String, ActiveDataTree<T>>()
|
||||||
|
|
||||||
set.flowData().collect { data ->
|
set.dataSequence().forEach { data ->
|
||||||
val tagValue = data.meta[key]?.string ?: defaultTagValue
|
val tagValue = data.meta[key]?.string ?: defaultTagValue
|
||||||
map.getOrPut(tagValue) { ActiveDataTree(set.dataType) }.data(data.name, data.data)
|
map.getOrPut(tagValue) { ActiveDataTree(set.dataType) }.data(data.name, data.data)
|
||||||
}
|
}
|
||||||
|
@ -54,7 +54,7 @@ internal class StaticDataTree<T : Any>(
|
|||||||
set(name, DataTreeItem.Node(dataSet))
|
set(name, DataTreeItem.Node(dataSet))
|
||||||
} else {
|
} else {
|
||||||
coroutineScope {
|
coroutineScope {
|
||||||
dataSet.flowData().collect {
|
dataSet.dataSequence().forEach {
|
||||||
data(name + it.name, it.data)
|
data(name + it.name, it.data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,8 +24,8 @@ public fun <T : Any> DataSet<T>.filter(
|
|||||||
|
|
||||||
override val meta: Meta get() = this@filter.meta
|
override val meta: Meta get() = this@filter.meta
|
||||||
|
|
||||||
override fun flowData(): Flow<NamedData<T>> =
|
override fun dataSequence(): Sequence<NamedData<T>> =
|
||||||
this@filter.flowData().filter { predicate(it.name, it.data) }
|
this@filter.dataSequence().filter { predicate(it.name, it.data) }
|
||||||
|
|
||||||
override fun get(name: Name): Data<T>? = this@filter.get(name)?.takeIf {
|
override fun get(name: Name): Data<T>? = this@filter.get(name)?.takeIf {
|
||||||
predicate(name, it)
|
predicate(name, it)
|
||||||
@ -48,7 +48,8 @@ else object : ActiveDataSet<T> {
|
|||||||
override val meta: Meta get() = this@withNamePrefix.meta
|
override val meta: Meta get() = this@withNamePrefix.meta
|
||||||
|
|
||||||
|
|
||||||
override fun flowData(): Flow<NamedData<T>> = this@withNamePrefix.flowData().map { it.data.named(prefix + it.name) }
|
override fun dataSequence(): Sequence<NamedData<T>> =
|
||||||
|
this@withNamePrefix.dataSequence().map { it.data.named(prefix + it.name) }
|
||||||
|
|
||||||
override fun get(name: Name): Data<T>? =
|
override fun get(name: Name): Data<T>? =
|
||||||
name.removeHeadOrNull(name)?.let { this@withNamePrefix.get(it) }
|
name.removeHeadOrNull(name)?.let { this@withNamePrefix.get(it) }
|
||||||
@ -66,7 +67,7 @@ public fun <T : Any> DataSet<T>.branch(branchName: Name): DataSet<T> = if (branc
|
|||||||
|
|
||||||
override val meta: Meta get() = this@branch.meta
|
override val meta: Meta get() = this@branch.meta
|
||||||
|
|
||||||
override fun flowData(): Flow<NamedData<T>> = this@branch.flowData().mapNotNull {
|
override fun dataSequence(): Sequence<NamedData<T>> = this@branch.dataSequence().mapNotNull {
|
||||||
it.name.removeHeadOrNull(branchName)?.let { name ->
|
it.name.removeHeadOrNull(branchName)?.let { name ->
|
||||||
it.data.named(name)
|
it.data.named(name)
|
||||||
}
|
}
|
||||||
|
@ -1,9 +1,7 @@
|
|||||||
package space.kscience.dataforge.data
|
package space.kscience.dataforge.data
|
||||||
|
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.fold
|
|
||||||
import kotlinx.coroutines.flow.map
|
import kotlinx.coroutines.flow.map
|
||||||
import kotlinx.coroutines.flow.toList
|
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.meta.MutableMeta
|
import space.kscience.dataforge.meta.MutableMeta
|
||||||
import space.kscience.dataforge.meta.seal
|
import space.kscience.dataforge.meta.seal
|
||||||
@ -100,11 +98,11 @@ public inline fun <K, T : Any, reified R : Any> Map<K, Data<T>>.reduceToData(
|
|||||||
* Transform a [Flow] of [NamedData] to a single [Data].
|
* Transform a [Flow] of [NamedData] to a single [Data].
|
||||||
*/
|
*/
|
||||||
@DFInternal
|
@DFInternal
|
||||||
public suspend fun <T : Any, R : Any> Flow<NamedData<T>>.reduceToData(
|
public inline fun <T : Any, R : Any> Sequence<NamedData<T>>.reduceToData(
|
||||||
outputType: KType,
|
outputType: KType,
|
||||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
meta: Meta = Meta.EMPTY,
|
meta: Meta = Meta.EMPTY,
|
||||||
transformation: suspend (Flow<NamedData<T>>) -> R,
|
crossinline transformation: suspend (Sequence<NamedData<T>>) -> R,
|
||||||
): Data<R> = Data(
|
): Data<R> = Data(
|
||||||
outputType,
|
outputType,
|
||||||
meta,
|
meta,
|
||||||
@ -115,10 +113,10 @@ public suspend fun <T : Any, R : Any> Flow<NamedData<T>>.reduceToData(
|
|||||||
}
|
}
|
||||||
|
|
||||||
@OptIn(DFInternal::class)
|
@OptIn(DFInternal::class)
|
||||||
public suspend inline fun <T : Any, reified R : Any> Flow<NamedData<T>>.reduceToData(
|
public inline fun <T : Any, reified R : Any> Sequence<NamedData<T>>.reduceToData(
|
||||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
meta: Meta = Meta.EMPTY,
|
meta: Meta = Meta.EMPTY,
|
||||||
noinline transformation: suspend (Flow<NamedData<T>>) -> R,
|
crossinline transformation: suspend (Sequence<NamedData<T>>) -> R,
|
||||||
): Data<R> = reduceToData(typeOf<R>(), coroutineContext, meta) {
|
): Data<R> = reduceToData(typeOf<R>(), coroutineContext, meta) {
|
||||||
transformation(it)
|
transformation(it)
|
||||||
}
|
}
|
||||||
@ -126,15 +124,15 @@ public suspend inline fun <T : Any, reified R : Any> Flow<NamedData<T>>.reduceTo
|
|||||||
/**
|
/**
|
||||||
* Fold a flow of named data into a single [Data]
|
* Fold a flow of named data into a single [Data]
|
||||||
*/
|
*/
|
||||||
public suspend inline fun <T : Any, reified R : Any> Flow<NamedData<T>>.foldToData(
|
public inline fun <T : Any, reified R : Any> Sequence<NamedData<T>>.foldToData(
|
||||||
initial: R,
|
initial: R,
|
||||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
meta: Meta = Meta.EMPTY,
|
meta: Meta = Meta.EMPTY,
|
||||||
noinline block: suspend (result: R, data: NamedData<T>) -> R,
|
crossinline block: suspend (result: R, data: NamedData<T>) -> R,
|
||||||
): Data<R> = reduceToData(
|
): Data<R> = reduceToData(
|
||||||
coroutineContext, meta
|
coroutineContext, meta
|
||||||
) {
|
) {
|
||||||
it.fold(initial, block)
|
it.fold(initial) { acc, t -> block(acc, t) }
|
||||||
}
|
}
|
||||||
|
|
||||||
//DataSet operations
|
//DataSet operations
|
||||||
@ -147,7 +145,7 @@ public suspend fun <T : Any, R : Any> DataSet<T>.map(
|
|||||||
block: suspend (T) -> R,
|
block: suspend (T) -> R,
|
||||||
): DataTree<R> = DataTree<R>(outputType) {
|
): DataTree<R> = DataTree<R>(outputType) {
|
||||||
populateWith(
|
populateWith(
|
||||||
flowData().map {
|
dataSequence().map {
|
||||||
val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal()
|
val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal()
|
||||||
Data(outputType, newMeta, coroutineContext, listOf(it)) {
|
Data(outputType, newMeta, coroutineContext, listOf(it)) {
|
||||||
block(it.await())
|
block(it.await())
|
||||||
@ -165,20 +163,20 @@ public suspend inline fun <T : Any, reified R : Any> DataSet<T>.map(
|
|||||||
|
|
||||||
public suspend fun <T : Any> DataSet<T>.forEach(block: suspend (NamedData<T>) -> Unit) {
|
public suspend fun <T : Any> DataSet<T>.forEach(block: suspend (NamedData<T>) -> Unit) {
|
||||||
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
|
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
|
||||||
flowData().collect {
|
dataSequence().forEach {
|
||||||
block(it)
|
block(it)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public suspend inline fun <T : Any, reified R : Any> DataSet<T>.reduceToData(
|
public inline fun <T : Any, reified R : Any> DataSet<T>.reduceToData(
|
||||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
meta: Meta = Meta.EMPTY,
|
meta: Meta = Meta.EMPTY,
|
||||||
noinline transformation: suspend (Flow<NamedData<T>>) -> R,
|
crossinline transformation: suspend (Sequence<NamedData<T>>) -> R,
|
||||||
): Data<R> = flowData().reduceToData(coroutineContext, meta, transformation)
|
): Data<R> = dataSequence().reduceToData(coroutineContext, meta, transformation)
|
||||||
|
|
||||||
public suspend inline fun <T : Any, reified R : Any> DataSet<T>.foldToData(
|
public inline fun <T : Any, reified R : Any> DataSet<T>.foldToData(
|
||||||
initial: R,
|
initial: R,
|
||||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
meta: Meta = Meta.EMPTY,
|
meta: Meta = Meta.EMPTY,
|
||||||
noinline block: suspend (result: R, data: NamedData<T>) -> R,
|
crossinline block: suspend (result: R, data: NamedData<T>) -> R,
|
||||||
): Data<R> = flowData().foldToData(initial, coroutineContext, meta, block)
|
): Data<R> = dataSequence().foldToData(initial, coroutineContext, meta, block)
|
@ -2,7 +2,6 @@ package space.kscience.dataforge.data
|
|||||||
|
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.filter
|
import kotlinx.coroutines.flow.filter
|
||||||
import kotlinx.coroutines.flow.map
|
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.misc.DFExperimental
|
import space.kscience.dataforge.misc.DFExperimental
|
||||||
import space.kscience.dataforge.names.Name
|
import space.kscience.dataforge.names.Name
|
||||||
@ -45,19 +44,19 @@ public fun <R : Any> DataSet<*>.select(
|
|||||||
&& (namePattern == null || name.matches(namePattern))
|
&& (namePattern == null || name.matches(namePattern))
|
||||||
&& filter(name, datum.meta)
|
&& filter(name, datum.meta)
|
||||||
|
|
||||||
override fun flowData(): Flow<NamedData<R>> = this@select.flowData().filter {
|
override fun dataSequence(): Sequence<NamedData<R>> = this@select.dataSequence().filter {
|
||||||
checkDatum(it.name, it.data)
|
checkDatum(it.name, it.data)
|
||||||
}.map {
|
}.map {
|
||||||
@Suppress("UNCHECKED_CAST")
|
@Suppress("UNCHECKED_CAST")
|
||||||
it as NamedData<R>
|
it as NamedData<R>
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun get(name: Name): Data<R>? = this@select.get(name)?.let { datum ->
|
override fun get(name: Name): Data<R>? = this@select[name]?.let { datum ->
|
||||||
if (checkDatum(name, datum)) datum.castOrNull(type) else null
|
if (checkDatum(name, datum)) datum.castOrNull(type) else null
|
||||||
}
|
}
|
||||||
|
|
||||||
override val updates: Flow<Name> = this@select.updates.filter {
|
override val updates: Flow<Name> = this@select.updates.filter {
|
||||||
val datum = this@select.get(it) ?: return@filter false
|
val datum = this@select[it] ?: return@filter false
|
||||||
checkDatum(it, datum)
|
checkDatum(it, datum)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,5 @@
|
|||||||
package space.kscience.dataforge.workspace
|
package space.kscience.dataforge.workspace
|
||||||
|
|
||||||
import kotlinx.coroutines.flow.Flow
|
|
||||||
import kotlinx.coroutines.flow.map
|
|
||||||
import space.kscience.dataforge.data.DataSet
|
import space.kscience.dataforge.data.DataSet
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.names.Name
|
import space.kscience.dataforge.names.Name
|
||||||
@ -25,7 +23,7 @@ public interface TaskResult<out T : Any> : DataSet<T> {
|
|||||||
*/
|
*/
|
||||||
public val taskMeta: Meta
|
public val taskMeta: Meta
|
||||||
|
|
||||||
override fun flowData(): Flow<TaskData<T>>
|
override fun dataSequence(): Sequence<TaskData<T>>
|
||||||
override fun get(name: Name): TaskData<T>?
|
override fun get(name: Name): TaskData<T>?
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -36,7 +34,7 @@ private class TaskResultImpl<out T : Any>(
|
|||||||
override val taskMeta: Meta,
|
override val taskMeta: Meta,
|
||||||
) : TaskResult<T>, DataSet<T> by dataSet {
|
) : TaskResult<T>, DataSet<T> by dataSet {
|
||||||
|
|
||||||
override fun flowData(): Flow<TaskData<T>> = dataSet.flowData().map {
|
override fun dataSequence(): Sequence<TaskData<T>> = dataSet.dataSequence().map {
|
||||||
workspace.wrapData(it, it.name, taskName, taskMeta)
|
workspace.wrapData(it, it.name, taskName, taskMeta)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
package space.kscience.dataforge.workspace
|
package space.kscience.dataforge.workspace
|
||||||
|
|
||||||
import kotlinx.coroutines.flow.single
|
|
||||||
import kotlinx.coroutines.runBlocking
|
import kotlinx.coroutines.runBlocking
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
import space.kscience.dataforge.context.PluginFactory
|
import space.kscience.dataforge.context.PluginFactory
|
||||||
@ -16,7 +15,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
|
|||||||
|
|
||||||
val allData by task<Int> {
|
val allData by task<Int> {
|
||||||
val selectedData = workspace.data.select<Int>()
|
val selectedData = workspace.data.select<Int>()
|
||||||
val result: Data<Int> = selectedData.flowData().foldToData(0) { result, data ->
|
val result: Data<Int> = selectedData.dataSequence().foldToData(0) { result, data ->
|
||||||
result + data.await()
|
result + data.await()
|
||||||
}
|
}
|
||||||
data("result", result)
|
data("result", result)
|
||||||
@ -58,7 +57,7 @@ class DataPropagationTest {
|
|||||||
fun testAllData() {
|
fun testAllData() {
|
||||||
runBlocking {
|
runBlocking {
|
||||||
val node = testWorkspace.produce("Test.allData")
|
val node = testWorkspace.produce("Test.allData")
|
||||||
assertEquals(4950, node.flowData().single().await())
|
assertEquals(4950, node.dataSequence().single().await())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -66,7 +65,7 @@ class DataPropagationTest {
|
|||||||
fun testSingleData() {
|
fun testSingleData() {
|
||||||
runBlocking {
|
runBlocking {
|
||||||
val node = testWorkspace.produce("Test.singleData")
|
val node = testWorkspace.produce("Test.singleData")
|
||||||
assertEquals(12, node.flowData().single().await())
|
assertEquals(12, node.dataSequence().single().await())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -2,8 +2,6 @@
|
|||||||
|
|
||||||
package space.kscience.dataforge.workspace
|
package space.kscience.dataforge.workspace
|
||||||
|
|
||||||
import kotlinx.coroutines.flow.first
|
|
||||||
import kotlinx.coroutines.flow.single
|
|
||||||
import kotlinx.coroutines.runBlocking
|
import kotlinx.coroutines.runBlocking
|
||||||
import org.junit.jupiter.api.Timeout
|
import org.junit.jupiter.api.Timeout
|
||||||
import space.kscience.dataforge.context.*
|
import space.kscience.dataforge.context.*
|
||||||
@ -161,7 +159,7 @@ class SimpleWorkspaceTest {
|
|||||||
fun testWorkspace() {
|
fun testWorkspace() {
|
||||||
runBlocking {
|
runBlocking {
|
||||||
val node = workspace.runBlocking("sum")
|
val node = workspace.runBlocking("sum")
|
||||||
val res = node.flowData().single()
|
val res = node.dataSequence().single()
|
||||||
assertEquals(328350, res.await())
|
assertEquals(328350, res.await())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -171,7 +169,7 @@ class SimpleWorkspaceTest {
|
|||||||
fun testMetaPropagation() {
|
fun testMetaPropagation() {
|
||||||
runBlocking {
|
runBlocking {
|
||||||
val node = workspace.produce("sum") { "testFlag" put true }
|
val node = workspace.produce("sum") { "testFlag" put true }
|
||||||
val res = node.flowData().single().await()
|
val res = node.dataSequence().single().await()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -194,7 +192,7 @@ class SimpleWorkspaceTest {
|
|||||||
fun testFilter() {
|
fun testFilter() {
|
||||||
runBlocking {
|
runBlocking {
|
||||||
val node = workspace.produce("filterOne")
|
val node = workspace.produce("filterOne")
|
||||||
assertEquals(12, node.flowData().first().await())
|
assertEquals(12, node.dataSequence().first().await())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user