Rework data. Split static state and dynamic state

This commit is contained in:
Alexander Nozik 2024-12-31 13:58:21 +03:00
parent 425f9801a5
commit 6634ece349
29 changed files with 486 additions and 429 deletions
dataforge-data/src
commonMain/kotlin/space/kscience/dataforge
commonTest/kotlin/space/kscience/dataforge/data
jvmMain/kotlin/space/kscience/dataforge/data
jvmTest/kotlin/space/kscience/dataforge/data
dataforge-workspace/src
commonMain/kotlin/space/kscience/dataforge/workspace
jvmMain/kotlin/space/kscience/dataforge/workspace
jvmTest/kotlin/space/kscience/dataforge/workspace

@ -35,16 +35,16 @@ public abstract class AbstractAction<T, R>(
* Update part of the data set using provided data
*
* @param source the source data tree in case we need several data items to update
* @param meta the metadata used for the whole data tree
* @param actionMeta the metadata used for the whole data tree
* @param updatedData an updated item
*/
protected open suspend fun DataSink<R>.update(
source: DataTree<T>,
meta: Meta,
updatedData: DataUpdate<T>,
actionMeta: Meta,
updateName: Name,
) {
//by default regenerate the whole data set
putAll(generate(source, meta))
putAll(generate(source, actionMeta))
}
@OptIn(UnsafeKType::class)
@ -64,8 +64,8 @@ public abstract class AbstractAction<T, R>(
}
with(updateSink) {
source.updates.collect { du: DataUpdate<T> ->
update(source, meta, du)
source.updates.collect {
update(source, meta, it)
}
}
}

@ -85,8 +85,8 @@ public class MapAction<T, R>(
}
override fun DataBuilderScope<R>.generate(source: DataTree<T>, meta: Meta): Map<Name, Data<R>> = buildMap {
source.forEach {
val (name, data) = mapOne(it.name, it.data, meta)
source.forEach { data ->
val (name, data) = mapOne(data.name, data, meta)
if (data != null) {
check(name !in keys) { "Data with key $name already exist in the result" }
put(name, data)
@ -96,10 +96,10 @@ public class MapAction<T, R>(
override suspend fun DataSink<R>.update(
source: DataTree<T>,
meta: Meta,
updatedData: DataUpdate<T>,
actionMeta: Meta,
updateName: Name,
) {
val (name, data) = mapOne(updatedData.name, updatedData.data, meta)
val (name, data) = mapOne(updateName, source.read(updateName), actionMeta)
put(name, data)
}
}

@ -95,7 +95,7 @@ internal class ReduceAction<T, R>(
ReduceGroupBuilder<T, R>(meta, outputType).apply(action).buildGroups(source).forEach { group ->
val dataFlow: Map<Name, Data<T>> = group.data.asSequence().fold(HashMap()) { acc, value ->
acc.apply {
acc[value.name] = value.data
acc[value.name] = value
}
}

@ -80,7 +80,7 @@ internal class SplitAction<T, R>(
meta: Meta
): Map<Name, Data<R>> = buildMap {
source.forEach {
splitOne(it.name, it.data, meta).forEach { (name, data) ->
splitOne(it.name, it, meta).forEach { (name, data) ->
check(name !in keys) { "Data with key $name already exist in the result" }
if (data != null) {
put(name, data)
@ -91,10 +91,10 @@ internal class SplitAction<T, R>(
override suspend fun DataSink<R>.update(
source: DataTree<T>,
meta: Meta,
updatedData: DataUpdate<T>,
actionMeta: Meta,
updateName: Name,
) {
putAll(splitOne(updatedData.name, updatedData.data, meta))
putAll(splitOne(updateName, source.read(updateName), actionMeta))
}
}

@ -18,23 +18,25 @@ public fun interface DataFilter {
}
public fun DataFilter.accepts(update: DataUpdate<*>): Boolean = accepts(update.name, update.data?.meta, update.type)
//public fun DataFilter.accepts(update: DataUpdate<*>): Boolean = accepts(update.name, update.data?.meta, update.type)
public fun <T, DU : DataUpdate<T>> Sequence<DU>.filterData(predicate: DataFilter): Sequence<DU> = filter { data ->
predicate.accepts(data)
}
public fun <T, DU : DataUpdate<T>> Flow<DU>.filterData(predicate: DataFilter): Flow<DU> = filter { data ->
predicate.accepts(data)
}
//public fun <T, DU : DataUpdate<T>> Sequence<DU>.filterData(predicate: DataFilter): Sequence<DU> = filter { data ->
// predicate.accepts(data)
//}
//
//public fun <T, DU : DataUpdate<T>> Flow<DU>.filterData(predicate: DataFilter): Flow<DU> = filter { data ->
// predicate.accepts(data)
//}
public fun <T> DataSource<T>.filterData(
predicate: DataFilter,
dataFilter: DataFilter,
): DataSource<T> = object : DataSource<T> {
override val dataType: KType get() = this@filterData.dataType
override fun read(name: Name): Data<T>? =
this@filterData.read(name)?.takeIf { predicate.accepts(name, it.meta, it.type) }
this@filterData.read(name)?.takeIf {
dataFilter.accepts(name, it.meta, it.type)
}
}
/**
@ -43,8 +45,12 @@ public fun <T> DataSource<T>.filterData(
public fun <T> ObservableDataSource<T>.filterData(
predicate: DataFilter,
): ObservableDataSource<T> = object : ObservableDataSource<T> {
override val updates: Flow<DataUpdate<T>>
get() = this@filterData.updates.filter { predicate.accepts(it) }
override val updates: Flow<Name>
get() = this@filterData.updates.filter {
val data = read(it)
predicate.accepts(it, data?.meta, data?.type ?: dataType)
}
override val dataType: KType get() = this@filterData.dataType
@ -70,8 +76,11 @@ internal class FilteredDataTree<T>(
?.filter { !it.value.isEmpty() }
?: emptyMap()
override val updates: Flow<DataUpdate<T>>
get() = source.updates.filter { filter.accepts(it) }
override val updates: Flow<Name>
get() = source.updates.filter {
val data = read(it)
filter.accepts(it, data?.meta, data?.type ?: dataType)
}
}

@ -8,14 +8,17 @@ import space.kscience.dataforge.names.*
import kotlin.reflect.KType
import kotlin.reflect.typeOf
public interface DataBuilderScope<in T>{
public companion object: DataBuilderScope<Nothing>
public interface DataBuilderScope<in T> {
public companion object : DataBuilderScope<Nothing>
}
@Suppress("UNCHECKED_CAST")
public fun <T> DataBuilderScope(): DataBuilderScope<T> = DataBuilderScope as DataBuilderScope<T>
public fun interface DataSink<in T>: DataBuilderScope<T> {
/**
* Asynchronous data sink
*/
public fun interface DataSink<in T> : DataBuilderScope<T> {
/**
* Put data and notify listeners if needed
*/
@ -59,7 +62,7 @@ private class MutableDataTreeRoot<T>(
) : MutableDataTree<T> {
override val items = HashMap<NameToken, MutableDataTree<T>>()
override val updates = MutableSharedFlow<DataUpdate<T>>(extraBufferCapacity = 100)
override val updates = MutableSharedFlow<Name>(extraBufferCapacity = 100)
inner class MutableDataTreeBranch(val branchName: Name) : MutableDataTree<T> {
@ -67,10 +70,8 @@ private class MutableDataTreeRoot<T>(
override val items = HashMap<NameToken, MutableDataTree<T>>()
override val updates: Flow<DataUpdate<T>> = this@MutableDataTreeRoot.updates.mapNotNull { update ->
update.name.removeFirstOrNull(branchName)?.let {
DataUpdate(update.data?.type ?: dataType, it, update.data)
}
override val updates: Flow<Name> = this@MutableDataTreeRoot.updates.mapNotNull { update ->
update.removeFirstOrNull(branchName)
}
override val dataType: KType get() = this@MutableDataTreeRoot.dataType
@ -80,7 +81,7 @@ private class MutableDataTreeRoot<T>(
override suspend fun put(token: NameToken, data: Data<T>?) {
this.data = data
this@MutableDataTreeRoot.updates.emit(DataUpdate(data?.type ?: dataType, branchName + token, data))
this@MutableDataTreeRoot.updates.emit(branchName + token)
}
}
@ -92,7 +93,7 @@ private class MutableDataTreeRoot<T>(
override suspend fun put(token: NameToken, data: Data<T>?) {
this.data = data
updates.emit(DataUpdate(data?.type ?: dataType, token.asName(), data))
updates.emit(token.asName())
}
}

@ -1,8 +1,6 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.*
import kotlin.contracts.contract
@ -31,13 +29,17 @@ public interface DataSource<out T> {
public interface ObservableDataSource<out T> : DataSource<T> {
/**
* Flow updates made to the data. Updates are considered critical. The producer will suspend unless all updates are consumed.
* Names of updated elements.
* Data updates with the same names could be glued together.
*
* Updates are considered critical.
* The producer will suspend unless all updates are consumed.
*/
public val updates: Flow<DataUpdate<T>>
public val updates: Flow<Name>
}
public suspend fun <T> ObservableDataSource<T>.awaitData(name: Name): Data<T> {
return read(name) ?: updates.first { it.name == name && it.data != null }.data!!
return read(name) ?: updates.filter { it == name }.map { read(name) }.filterNotNull().first()
}
public suspend fun <T> ObservableDataSource<T>.awaitData(name: String): Data<T> =
@ -59,7 +61,7 @@ public interface DataTree<out T> : ObservableDataSource<T> {
/**
* Flow updates made to the data
*/
override val updates: Flow<DataUpdate<T>>
override val updates: Flow<Name>
public companion object {
private object EmptyDataTree : DataTree<Nothing> {
@ -68,7 +70,7 @@ public interface DataTree<out T> : ObservableDataSource<T> {
override val dataType: KType = typeOf<Unit>()
override fun read(name: Name): Data<Nothing>? = null
override val updates: Flow<DataUpdate<Nothing>> get() = emptyFlow()
override val updates: Flow<Name> get() = emptyFlow()
}
public val EMPTY: DataTree<Nothing> = EmptyDataTree

@ -32,7 +32,7 @@ public interface Goal<out T> {
public companion object
}
public fun Goal<*>.launch(coroutineScope: CoroutineScope): Job = async(coroutineScope)
public fun Goal<*>.launchIn(coroutineScope: CoroutineScope): Job = async(coroutineScope)
public suspend fun <T> Goal<T>.await(): T = coroutineScope { async(this).await() }

@ -3,43 +3,16 @@ package space.kscience.dataforge.data
import space.kscience.dataforge.meta.isEmpty
import space.kscience.dataforge.misc.Named
import space.kscience.dataforge.names.Name
import kotlin.reflect.KType
/**
* An interface implementing a data update event.
*
* If [data] is null, then corresponding element should be removed.
*/
public interface DataUpdate<out T> : Named {
public val type: KType
override val name: Name
public val data: Data<T>?
}
public fun <T> DataUpdate(type: KType, name: Name, data: Data<T>?): DataUpdate<T> = object : DataUpdate<T> {
override val type: KType = type
override val name: Name = name
override val data: Data<T>? = data
override fun toString(): String {
return "DataUpdate(type=$type, name=$name, data=$data)"
}
}
/**
* A data coupled to a name.
*/
public interface NamedData<out T> : DataUpdate<T>, Data<T> {
override val data: Data<T>
}
public interface NamedData<out T> : Data<T>, Named
public operator fun NamedData<*>.component1(): Name = name
public operator fun <T> NamedData<T>.component2(): Data<T> = data
private class NamedDataImpl<T>(
override val name: Name,
override val data: Data<T>,
val data: Data<T>,
) : Data<T> by data, NamedData<T> {
override fun toString(): String = buildString {
append("NamedData(name=\"$name\"")
@ -54,7 +27,7 @@ private class NamedDataImpl<T>(
}
public fun <T> Data<T>.named(name: Name): NamedData<T> = if (this is NamedData) {
NamedDataImpl(name, this.data)
NamedDataImpl(name, this)
} else {
NamedDataImpl(name, this)
}

@ -0,0 +1,63 @@
package space.kscience.dataforge.data
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.misc.UnsafeKType
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.parseAsName
import space.kscience.dataforge.names.plus
import kotlin.reflect.KType
import kotlin.reflect.typeOf
public fun interface StaticDataBuilder<T> : DataBuilderScope<T> {
public fun put(name: Name, data: Data<T>)
}
private class DataMapBuilder<T> : StaticDataBuilder<T> {
val map = mutableMapOf<Name, Data<T>>()
override fun put(name: Name, data: Data<T>) {
if (map.containsKey(name)) {
error("Duplicate key '$name'")
} else {
map.put(name, data)
}
}
}
public fun <T> StaticDataBuilder<T>.put(name: String, data: Data<T>) {
put(name.parseAsName(), data)
}
public inline fun <T, reified T1 : T> StaticDataBuilder<T>.putValue(
name: String,
value: T1,
metaBuilder: MutableMeta.() -> Unit = {}
) {
put(name, Data(value, Meta(metaBuilder)))
}
public fun <T> StaticDataBuilder<T>.putAll(prefix: Name, block: StaticDataBuilder<T>.() -> Unit) {
val map = DataMapBuilder<T>().apply(block).map
map.forEach { (name, data) ->
put(prefix + name, data)
}
}
public fun <T> StaticDataBuilder<T>.putAll(prefix: String, block: StaticDataBuilder<T>.() -> Unit) =
putAll(prefix.parseAsName(), block)
public fun <T> StaticDataBuilder<T>.putAll(prefix: String, tree: DataTree<T>) {
tree.forEach { data ->
put(prefix + data.name, data)
}
}
@UnsafeKType
public fun <T> DataTree.Companion.static(type: KType, block: StaticDataBuilder<T>.() -> Unit): DataTree<T> =
DataMapBuilder<T>().apply(block).map.asTree(type)
@OptIn(UnsafeKType::class)
public inline fun <reified T> DataTree.Companion.static(noinline block: StaticDataBuilder<T>.() -> Unit): DataTree<T> =
static(typeOf<T>(), block)

@ -9,7 +9,7 @@ import space.kscience.dataforge.names.plus
public suspend fun <T> DataSink<T>.put(value: NamedData<T>) {
put(value.name, value.data)
put(value.name, value)
}
public inline fun <T> DataSink<T>.putAll(
@ -89,7 +89,7 @@ public suspend inline fun <reified T> DataSink<T>.putValue(
public suspend fun <T> DataSink<T>.putAll(sequence: Sequence<NamedData<T>>) {
sequence.forEach {
put(it.name, it.data)
put(it)
}
}
@ -99,19 +99,27 @@ public suspend fun <T> DataSink<T>.putAll(map: Map<Name, Data<T>?>) {
}
}
public suspend fun <T> DataSink<T>.putAll(tree: DataTree<T>) {
putAll(tree.asSequence())
}
//public suspend fun <T> DataSink<T>.putAll(tree: DataTree<T>) {
// putAll(tree.asSequence())
//}
/**
* Copy given data set and mirror its changes to this [DataSink]. Suspends indefinitely.
* Suspends indefinitely.
*/
public suspend fun <T : Any> DataSink<T>.putAllAndWatch(
source: DataTree<T>,
branchName: Name = Name.EMPTY,
public suspend fun <T : Any> DataSink<T>.watch(
source: ObservableDataSource<T>,
prefix: Name = Name.EMPTY,
) {
putAll(branchName, source)
// putAll(branchName, source)
source.updates.collect {
put(branchName + it.name, it.data)
put(prefix + it, source.read(it))
}
}
public suspend fun <T : Any> MutableDataTree<T>.putAllAndWatch(
source: DataTree<T>,
prefix: Name = Name.EMPTY,
) {
putAll(prefix, source)
watch(source,prefix)
}

@ -200,40 +200,44 @@ public inline fun <T, reified R> Iterable<NamedData<T>>.foldNamedToData(
@UnsafeKType
public fun <T, R> DataTree<T>.map(
public fun <T, R> DataTree<T>.transformEach(
outputType: KType,
scope: CoroutineScope,
metaTransform: MutableMeta.() -> Unit = {},
metaTransform: MutableMeta.(name: Name) -> Unit = {},
compute: suspend (NamedValueWithMeta<T>) -> R,
): DataTree<R> = DataTree<R>(
outputType,
scope,
initialData = asSequence().associate { namedData: NamedData<T> ->
val newMeta = namedData.meta.toMutableMeta().apply(metaTransform).seal()
val newMeta = namedData.meta.toMutableMeta().apply {
metaTransform(namedData.name)
}.seal()
val newData = Data(outputType, newMeta, scope.coroutineContext, listOf(namedData)) {
compute(namedData.awaitWithMeta())
}
namedData.name to newData
}
) {
updates.collect { update ->
val data: Data<T>? = update.data
if (data == null) put(update.name, null) else {
val newMeta = data.meta.toMutableMeta().apply(metaTransform).seal()
updates.collect { name ->
val data: Data<T>? = read(name)
if (data == null) put(name, null) else {
val newMeta = data.meta.toMutableMeta().apply {
metaTransform(name)
}.seal()
val d = Data(outputType, newMeta, scope.coroutineContext, listOf(data)) {
compute(NamedValueWithMeta(update.name, data.await(), data.meta))
compute(NamedValueWithMeta(name, data.await(), data.meta))
}
put(update.name, d)
put(name, d)
}
}
}
@OptIn(UnsafeKType::class)
public inline fun <T, reified R> DataTree<T>.map(
public inline fun <T, reified R> DataTree<T>.transformEach(
scope: CoroutineScope,
noinline metaTransform: MutableMeta.() -> Unit = {},
noinline metaTransform: MutableMeta.(name: Name) -> Unit = {},
noinline block: suspend (NamedValueWithMeta<T>) -> R,
): DataTree<R> = map(typeOf<R>(), scope, metaTransform, block)
): DataTree<R> = transformEach(typeOf<R>(), scope, metaTransform, block)
public inline fun <T> DataTree<T>.forEach(block: (NamedData<T>) -> Unit) {
asSequence().forEach(block)

@ -17,7 +17,7 @@ import kotlin.reflect.typeOf
private class FlatDataTree<T>(
override val dataType: KType,
private val dataSet: Map<Name, Data<T>>,
private val sourceUpdates: SharedFlow<DataUpdate<T>>,
private val sourceUpdates: SharedFlow<Name>,
private val prefix: Name,
) : DataTree<T> {
override val data: Data<T>? get() = dataSet[prefix]
@ -29,10 +29,9 @@ private class FlatDataTree<T>(
override fun read(name: Name): Data<T>? = dataSet[prefix + name]
override val updates: Flow<DataUpdate<T>> =
sourceUpdates.mapNotNull { update ->
update.name.removeFirstOrNull(prefix)?.let { DataUpdate(dataType, it, update.data) }
}
override val updates: Flow<Name> = sourceUpdates.mapNotNull { update ->
update.removeFirstOrNull(prefix)
}
}
/**
@ -47,7 +46,7 @@ private class DataTreeBuilder<T>(
private val mutex = Mutex()
private val updatesFlow = MutableSharedFlow<DataUpdate<T>>()
private val updatesFlow = MutableSharedFlow<Name>()
override suspend fun put(name: Name, data: Data<T>?) {
@ -58,7 +57,7 @@ private class DataTreeBuilder<T>(
map[name] = data
}
}
updatesFlow.emit(DataUpdate(data?.type ?: type, name, data))
updatesFlow.emit(name)
}
public fun build(): DataTree<T> = FlatDataTree(type, map, updatesFlow, Name.EMPTY)
@ -74,7 +73,7 @@ public fun <T> DataTree(
initialData: Map<Name, Data<T>> = emptyMap(),
updater: suspend DataSink<T>.() -> Unit,
): DataTree<T> = DataTreeBuilder<T>(dataType, initialData).apply {
scope.launch{
scope.launch {
updater()
}
}.build()
@ -89,6 +88,13 @@ public inline fun <reified T> DataTree(
noinline updater: suspend DataSink<T>.() -> Unit,
): DataTree<T> = DataTree(typeOf<T>(), scope, initialData, updater)
@UnsafeKType
public fun <T> DataTree(type: KType, data: Map<Name, Data<T>>): DataTree<T> =
DataTreeBuilder(type, data).build()
@OptIn(UnsafeKType::class)
public inline fun <reified T> DataTree(data: Map<Name, Data<T>>): DataTree<T> =
DataTree(typeOf<T>(), data)
/**
* Represent this flat data map as a [DataTree] without copying it
@ -106,7 +112,7 @@ public inline fun <reified T> Map<Name, Data<T>>.asTree(): DataTree<T> = asTree(
@UnsafeKType
public fun <T> Sequence<NamedData<T>>.toTree(type: KType): DataTree<T> =
DataTreeBuilder(type, associate { it.name to it.data }).build()
DataTreeBuilder(type, associate { it.name to it }).build()
/**

@ -12,7 +12,7 @@ import kotlin.time.Duration.Companion.milliseconds
internal class DataTreeBuilderTest {
@Test
fun testTreeBuild() = runTest(timeout = 500.milliseconds) {
val node = DataTree<Any> {
val node = DataTree.static<Any> {
putAll("primary") {
putValue("a", "a")
putValue("b", "b")
@ -29,20 +29,18 @@ internal class DataTreeBuilderTest {
@Test
fun testDataUpdate() = runTest(timeout = 500.milliseconds) {
val updateData = DataTree<Any> {
putAll("update") {
put("a", Data.wrapValue("a"))
put("b", Data.wrapValue("b"))
}
val updateData = DataTree.static<Any> {
put("a", Data.wrapValue("a"))
put("b", Data.wrapValue("b"))
}
val node = DataTree<Any> {
val node = DataTree.static<Any> {
putAll("primary") {
putValue("a", "a")
putValue("b", "b")
}
putValue("root", "root")
putAll(updateData)
putAll("update", updateData)
}
assertEquals("a", node["update.a"]?.await())
@ -56,11 +54,11 @@ internal class DataTreeBuilderTest {
val subNode = MutableDataTree<Int>()
val rootNode = MutableDataTree<Int>() {
job = launch { putAllAndWatch(subNode,"sub".asName())}
job = launch { putAllAndWatch(subNode, "sub".asName()) }
}
repeat(10) {
subNode.updateValue("value[$it]", it)
subNode.putValue("value[$it]", it)
}
assertEquals(9, subNode.awaitData("value[9]").await())

@ -1,7 +1,5 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.Name
import kotlin.reflect.KType
@ -22,15 +20,15 @@ private fun <R> Data<*>.castOrNull(type: KType): Data<R>? =
}
}
@Suppress("UNCHECKED_CAST")
@DFInternal
public fun <R> Sequence<DataUpdate<*>>.filterByDataType(type: KType): Sequence<NamedData<R>> =
filter { it.type.isSubtypeOf(type) } as Sequence<NamedData<R>>
@Suppress("UNCHECKED_CAST")
@DFInternal
public fun <R> Flow<DataUpdate<*>>.filterByDataType(type: KType): Flow<NamedData<R>> =
filter { it.type.isSubtypeOf(type) } as Flow<NamedData<R>>
//@Suppress("UNCHECKED_CAST")
//@DFInternal
//public fun <R> Sequence<DataUpdate<*>>.filterByDataType(type: KType): Sequence<NamedData<R>> =
// filter { it.type.isSubtypeOf(type) } as Sequence<NamedData<R>>
//
//@Suppress("UNCHECKED_CAST")
//@DFInternal
//public fun <R> Flow<DataUpdate<*>>.filterByDataType(type: KType): Flow<NamedData<R>> =
// filter { it.type.isSubtypeOf(type) } as Flow<NamedData<R>>
/**
* Select all data matching given type and filters. Does not modify paths

@ -19,7 +19,7 @@ internal class ActionsTest {
result { it + 1 }
}
val data: DataTree<Int> = DataTree {
val data: DataTree<Int> = DataTree.static {
repeat(10) {
putValue(it.toString(), it)
}
@ -42,7 +42,7 @@ internal class ActionsTest {
val result: DataTree<Int> = plusOne(source)
repeat(10) {
source.updateValue(it.toString(), it)
source.putValue(it.toString(), it)
}
assertEquals(2, result.awaitData("1").await())

@ -1,9 +1,9 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.withContext
import space.kscience.dataforge.data.DataSink
import space.kscience.dataforge.data.DataBuilderScope
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.GoalExecutionRestriction
import space.kscience.dataforge.data.MutableDataTree
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaReader
import space.kscience.dataforge.meta.MetaRepr
@ -62,12 +62,12 @@ public interface TaskWithSpec<T, C : Any> : Task<T> {
// block: C.() -> Unit = {},
//): TaskResult<T> = execute(workspace, taskName, spec(block))
public class TaskResultBuilder<T>(
public class TaskResultScope<T>(
public val resultType: KType,
public val workspace: Workspace,
public val taskName: Name,
public val taskMeta: Meta,
private val dataSink: DataSink<T>,
) : DataSink<T> by dataSink
) : DataBuilderScope<T>
/**
* Create a [Task] that composes a result using [builder]. Only data from the workspace could be used.
@ -77,10 +77,11 @@ public class TaskResultBuilder<T>(
* @param descriptor of meta accepted by this task
* @param builder for resulting data set
*/
@UnsafeKType
public fun <T : Any> Task(
resultType: KType,
descriptor: MetaDescriptor? = null,
builder: suspend TaskResultBuilder<T>.() -> Unit,
builder: suspend TaskResultScope<T>.() -> DataTree<T>,
): Task<T> = object : Task<T> {
override val descriptor: MetaDescriptor? = descriptor
@ -89,23 +90,19 @@ public fun <T : Any> Task(
workspace: Workspace,
taskName: Name,
taskMeta: Meta,
): TaskResult<T> {
): TaskResult<T> = withContext(GoalExecutionRestriction() + workspace.goalLogger) {
//TODO use safe builder and check for external data on add and detects cycles
@OptIn(UnsafeKType::class)
val dataset = MutableDataTree<T>(resultType).apply {
TaskResultBuilder(workspace, taskName, taskMeta, this).apply {
withContext(GoalExecutionRestriction() + workspace.goalLogger) {
builder()
}
}
}
return workspace.wrapResult(dataset, taskName, taskMeta)
val dataset = TaskResultScope<T>(resultType, workspace, taskName, taskMeta).builder()
workspace.wrapResult(dataset, taskName, taskMeta)
}
}
@OptIn(UnsafeKType::class)
public inline fun <reified T : Any> Task(
descriptor: MetaDescriptor? = null,
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
noinline builder: suspend TaskResultScope<T>.() -> DataTree<T>,
): Task<T> = Task(typeOf<T>(), descriptor, builder)
@ -117,13 +114,11 @@ public inline fun <reified T : Any> Task(
* @param specification a specification for task configuration
* @param builder for resulting data set
*/
@Suppress("FunctionName")
public fun <T : Any, C : MetaRepr> Task(
resultType: KType,
specification: MetaReader<C>,
builder: suspend TaskResultBuilder<T>.(C) -> Unit,
builder: suspend TaskResultScope<T>.(C) -> DataTree<T>,
): TaskWithSpec<T, C> = object : TaskWithSpec<T, C> {
override val spec: MetaReader<C> = specification
@ -134,15 +129,15 @@ public fun <T : Any, C : MetaRepr> Task(
): TaskResult<T> = withContext(GoalExecutionRestriction() + workspace.goalLogger) {
//TODO use safe builder and check for external data on add and detects cycles
val taskMeta = configuration.toMeta()
@OptIn(UnsafeKType::class)
val dataset = MutableDataTree<T>(resultType).apply {
TaskResultBuilder(workspace, taskName, taskMeta, this).apply { builder(configuration) }
}
val dataset = TaskResultScope<T>(resultType, workspace, taskName, taskMeta).builder(configuration)
workspace.wrapResult(dataset, taskName, taskMeta)
}
}
public inline fun <reified T : Any, C : MetaRepr> Task(
specification: MetaReader<C>,
noinline builder: suspend TaskResultBuilder<T>.(C) -> Unit,
noinline builder: suspend TaskResultScope<T>.(C) -> DataTree<T>,
): Task<T> = Task(typeOf<T>(), specification, builder)

@ -6,7 +6,7 @@ import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.asSequence
import space.kscience.dataforge.data.launch
import space.kscience.dataforge.data.launchIn
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
@ -33,9 +33,9 @@ public fun <T> Workspace.wrapResult(data: DataTree<T>, taskName: Name, taskMeta:
* Start computation for all data elements of this node.
* The resulting [Job] is completed only when all of them are completed.
*/
public fun TaskResult<*>.launch(scope: CoroutineScope): Job {
public fun TaskResult<*>.launchIn(scope: CoroutineScope): Job {
val jobs = asSequence().map {
it.data.launch(scope)
it.launchIn(scope)
}.toList()
return scope.launch { jobs.joinAll() }
}

@ -4,20 +4,17 @@ import space.kscience.dataforge.actions.Action
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.ContextBuilder
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.data.DataSink
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.MutableDataTree
import space.kscience.dataforge.data.StaticDataBuilder
import space.kscience.dataforge.data.static
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
import space.kscience.dataforge.meta.descriptors.MetaDescriptorBuilder
import space.kscience.dataforge.misc.DFBuilder
import space.kscience.dataforge.misc.UnsafeKType
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import kotlin.collections.set
import kotlin.properties.PropertyDelegateProvider
import kotlin.properties.ReadOnlyProperty
import kotlin.reflect.typeOf
public data class TaskReference<T>(public val taskName: Name, public val task: Task<T>) : DataSelector<T> {
@ -42,7 +39,7 @@ public interface TaskContainer {
public inline fun <reified T : Any> TaskContainer.registerTask(
name: String,
descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
noinline builder: suspend TaskResultScope<T>.() -> DataTree<T>,
): Unit = registerTask(Name.parse(name), Task(MetaDescriptor(descriptorBuilder), builder))
/**
@ -51,7 +48,7 @@ public inline fun <reified T : Any> TaskContainer.registerTask(
public inline fun <reified T : Any> TaskContainer.buildTask(
name: String,
descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
noinline builder: suspend TaskResultScope<T>.() -> DataTree<T>,
): TaskReference<T> {
val theName = Name.parse(name)
val descriptor = MetaDescriptor(descriptorBuilder)
@ -62,7 +59,7 @@ public inline fun <reified T : Any> TaskContainer.buildTask(
public inline fun <reified T : Any> TaskContainer.task(
descriptor: MetaDescriptor,
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
noinline builder: suspend TaskResultScope<T>.() -> DataTree<T>,
): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> = PropertyDelegateProvider { _, property ->
val taskName = Name.parse(property.name)
val task = Task(descriptor, builder)
@ -75,7 +72,7 @@ public inline fun <reified T : Any> TaskContainer.task(
*/
public inline fun <reified T : Any, C : MetaRepr> TaskContainer.task(
specification: MetaReader<C>,
noinline builder: suspend TaskResultBuilder<T>.(C) -> Unit,
noinline builder: suspend TaskResultScope<T>.(C) -> DataTree<T>,
): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> = PropertyDelegateProvider { _, property ->
val taskName = Name.parse(property.name)
val task = Task(specification, builder)
@ -88,7 +85,7 @@ public inline fun <reified T : Any, C : MetaRepr> TaskContainer.task(
*/
public inline fun <reified T : Any> TaskContainer.task(
noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
noinline builder: suspend TaskResultScope<T>.() -> DataTree<T>,
): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> =
task(MetaDescriptor(descriptorBuilder), builder)
@ -102,15 +99,15 @@ public inline fun <T : Any, reified R : Any> TaskContainer.action(
noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<R>>> =
task(MetaDescriptor(descriptorBuilder)) {
result(action.execute(from(selector), taskMeta.copy(metaTransform), workspace))
action.execute(from(selector), taskMeta.copy(metaTransform), workspace)
}
public class WorkspaceBuilder(
private val parentContext: Context = Global,
) : TaskContainer {
private var context: Context? = null
@OptIn(UnsafeKType::class)
private val data = MutableDataTree<Any?>(typeOf<Any?>())
private var data: DataTree<Any?>? = null
private val targets: HashMap<String, Meta> = HashMap()
private val tasks = HashMap<Name, Task<*>>()
private var cache: WorkspaceCache? = null
@ -125,8 +122,8 @@ public class WorkspaceBuilder(
/**
* Define intrinsic data for the workspace
*/
public fun data(builder: DataSink<Any?>.() -> Unit) {
data.apply(builder)
public fun data(builder: StaticDataBuilder<Any?>.() -> Unit) {
data = DataTree.static(builder)
}
/**
@ -152,7 +149,7 @@ public class WorkspaceBuilder(
val postProcess: suspend (TaskResult<*>) -> TaskResult<*> = { result ->
cache?.cache(result) ?: result
}
return WorkspaceImpl(context ?: parentContext, data, targets, tasks, postProcess)
return WorkspaceImpl(context ?: parentContext, data ?: DataTree.EMPTY, targets, tasks, postProcess)
}
}

@ -1,12 +1,13 @@
package space.kscience.dataforge.workspace
import space.kscience.dataforge.actions.Action
import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.forEach
import space.kscience.dataforge.data.putAll
import space.kscience.dataforge.data.transform
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.data.NamedValueWithMeta
import space.kscience.dataforge.data.transformEach
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.meta.copy
import space.kscience.dataforge.meta.remove
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.plus
@ -14,7 +15,7 @@ import space.kscience.dataforge.names.plus
/**
* A task meta without a node corresponding to the task itself (removing a node with name of the task).
*/
public val TaskResultBuilder<*>.defaultDependencyMeta: Meta
public val TaskResultScope<*>.defaultDependencyMeta: Meta
get() = taskMeta.copy {
remove(taskName)
}
@ -25,12 +26,12 @@ public val TaskResultBuilder<*>.defaultDependencyMeta: Meta
* @param selector a workspace data selector. Could be either task selector or initial data selector.
* @param dependencyMeta meta used for selector. The same meta is used for caching. By default, uses [defaultDependencyMeta].
*/
public suspend fun <T> TaskResultBuilder<*>.from(
public suspend fun <T> TaskResultScope<*>.from(
selector: DataSelector<T>,
dependencyMeta: Meta = defaultDependencyMeta,
): DataTree<T> = selector.select(workspace, dependencyMeta)
public suspend inline fun <T, reified P : WorkspacePlugin> TaskResultBuilder<*>.from(
public suspend inline fun <T, reified P : WorkspacePlugin> TaskResultScope<*>.from(
plugin: P,
dependencyMeta: Meta = defaultDependencyMeta,
selectorBuilder: P.() -> TaskReference<T>,
@ -50,7 +51,7 @@ public suspend inline fun <T, reified P : WorkspacePlugin> TaskResultBuilder<*>.
* @param dependencyMeta meta used for selector. The same meta is used for caching. By default, uses [defaultDependencyMeta].
* @param selectorBuilder a builder of task from the plugin.
*/
public suspend inline fun <reified T, reified P : WorkspacePlugin> TaskResultBuilder<*>.from(
public suspend inline fun <reified T, reified P : WorkspacePlugin> TaskResultScope<*>.from(
pluginFactory: PluginFactory<P>,
dependencyMeta: Meta = defaultDependencyMeta,
selectorBuilder: P.() -> TaskReference<T>,
@ -64,7 +65,7 @@ public suspend inline fun <reified T, reified P : WorkspacePlugin> TaskResultBui
return res as TaskResult<T>
}
public val TaskResultBuilder<*>.allData: DataSelector<*>
public val TaskResultScope<*>.allData: DataSelector<*>
get() = DataSelector { workspace, _ -> workspace.data }
/**
@ -77,43 +78,38 @@ public val TaskResultBuilder<*>.allData: DataSelector<*>
* @param action process individual data asynchronously.
*/
@DFExperimental
public suspend inline fun <T, reified R> TaskResultBuilder<R>.transformEach(
public suspend inline fun <T, reified R> TaskResultScope<R>.transformEach(
selector: DataSelector<T>,
dependencyMeta: Meta = defaultDependencyMeta,
dataMetaTransform: MutableMeta.(name: Name) -> Unit = {},
crossinline action: suspend (arg: T, name: Name, meta: Meta) -> R,
) {
from(selector, dependencyMeta).forEach { data ->
val meta = data.meta.toMutableMeta().apply {
taskMeta[taskName]?.let { taskName.put(it) }
dataMetaTransform(data.name)
}
val res = data.transform(meta, workspace.context.coroutineContext) {
action(it, data.name, meta)
}
put(data.name, res)
crossinline dataMetaTransform: MutableMeta.(name: Name) -> Unit = {},
crossinline action: suspend (NamedValueWithMeta<T>) -> R,
): DataTree<R> = from(selector, dependencyMeta).transformEach<T, R>(
workspace.context,
metaTransform = { name ->
taskMeta[taskName]?.let { taskName put it }
dataMetaTransform(name)
}
}
/**
* Set given [dataSet] as a task result.
*/
public fun <T> TaskResultBuilder<T>.result(dataSet: DataTree<T>) {
this.putAll(dataSet)
}
/**
* Use provided [action] to fill the result
*/
@DFExperimental
public suspend inline fun <T, reified R> TaskResultBuilder<R>.actionFrom(
selector: DataSelector<T>,
action: Action<T, R>,
dependencyMeta: Meta = defaultDependencyMeta,
) {
putAll(action.execute(from(selector, dependencyMeta), dependencyMeta, workspace))
action(it)
}
///**
// * Set given [dataSet] as a task result.
// */
//public fun <T> TaskResultBuilder<T>.result(dataSet: DataTree<T>) {
// putAll(dataSet)
//}
///**
// * Use provided [action] to fill the result
// */
//@DFExperimental
//public suspend inline fun <T, reified R> TaskResultScope<R>.actionFrom(
// selector: DataSelector<T>,
// action: Action<T, R>,
// dependencyMeta: Meta = defaultDependencyMeta,
//) {
// putAll(action.execute(from(selector, dependencyMeta), dependencyMeta, workspace))
//}

@ -3,17 +3,25 @@ package space.kscience.dataforge.workspace
import space.kscience.dataforge.actions.AbstractAction
import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import kotlin.reflect.KType
internal class CachingAction<T>(type: KType, private val caching: (NamedData<T>) -> NamedData<T>) :
AbstractAction<T, T>(type) {
override fun DataSink<T>.generate(source: DataTree<T>, meta: Meta) {
internal class CachingAction<T>(
type: KType, private val caching: (NamedData<T>) -> NamedData<T>
) : AbstractAction<T, T>(type) {
override fun DataBuilderScope<T>.generate(
source: DataTree<T>,
meta: Meta
): Map<Name, Data<T>> = buildMap {
source.forEach {
put(caching(it))
val cached = caching(it)
put(cached.name, cached)
}
}
override suspend fun DataSink<T>.update(source: DataTree<T>, meta: Meta, updatedData: DataUpdate<T>) {
put(updatedData.name, updatedData.data?.named(updatedData.name)?.let(caching))
override suspend fun DataSink<T>.update(source: DataTree<T>, actionMeta: Meta, updateName: Name) {
val updatedData = source.read(updateName)
put(updateName, updatedData?.named(updateName)?.let(caching))
}
}

@ -0,0 +1,185 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.StaticData
import space.kscience.dataforge.io.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.copy
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.plus
import java.nio.file.*
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.spi.FileSystemProvider
import kotlin.io.path.*
import kotlin.reflect.KType
import kotlin.reflect.typeOf
public class FileDataTree(
public val io: IOPlugin,
public val path: Path,
private val monitor: Boolean = false
) : DataTree<Binary> {
override val dataType: KType = typeOf<Binary>()
/**
* Read data with supported envelope format and binary format. If the envelope format is null, then read binary directly from file.
* The operation is blocking since it must read the meta header. The reading of envelope body is lazy
*/
private fun readFileAsData(
path: Path,
): Data<Binary> {
val envelope = io.readEnvelopeFile(path, true)
val updatedMeta = envelope.meta.copy {
FILE_PATH_KEY put path.toString()
FILE_EXTENSION_KEY put path.extension
val attributes = path.readAttributes<BasicFileAttributes>()
FILE_UPDATE_TIME_KEY put attributes.lastModifiedTime().toInstant().toString()
FILE_CREATE_TIME_KEY put attributes.creationTime().toInstant().toString()
}
return StaticData(
typeOf<Binary>(),
envelope.data ?: Binary.EMPTY,
updatedMeta
)
}
private fun readFilesFromDirectory(
path: Path
): Map<NameToken, FileDataTree> = path.listDirectoryEntries().filterNot { it.name.startsWith("@") }.associate {
NameToken.parse(it.nameWithoutExtension) to FileDataTree(io, it)
}
override val data: Data<Binary>?
get() = when {
path.isRegularFile() -> {
//TODO process zip
readFileAsData(path)
}
path.isDirectory() -> {
val dataBinary: Binary? = path.resolve(IOPlugin.DATA_FILE_NAME)?.asBinary()
val meta: Meta? = path.find { it.fileName.startsWith(IOPlugin.META_FILE_NAME) }?.let {
io.readMetaFileOrNull(it)
}
if (dataBinary != null || meta != null) {
StaticData(
typeOf<Binary>(),
dataBinary ?: Binary.EMPTY,
meta ?: Meta.EMPTY
)
} else {
null
}
}
else -> {
null
}
}
override val items: Map<NameToken, DataTree<Binary>>
get() = when {
path.isDirectory() -> readFilesFromDirectory(path)
path.isRegularFile() && path.extension == "zip" -> {
//Using an explicit Zip file system to avoid bizarre compatibility bugs
val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" }
?: error("Zip file system provider not found")
val fs = fsProvider.newFileSystem(path, emptyMap<String, Any>())
readFilesFromDirectory(fs.rootDirectories.single())
}
else -> emptyMap()
}
override val updates: Flow<Name> = if (monitor) {
callbackFlow<Name> {
val watchService: WatchService = path.fileSystem.newWatchService()
fun Path.toName() = Name(map { NameToken.parse(it.nameWithoutExtension) })
fun monitor(childPath: Path): Job {
val key: WatchKey = childPath.register(
watchService, arrayOf(
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.ENTRY_CREATE,
)
)
return launch {
while (isActive) {
for (event: WatchEvent<*> in key.pollEvents()) {
val eventPath = event.context() as Path
if (event.kind() === StandardWatchEventKinds.ENTRY_CREATE) {
monitor(eventPath)
} else {
send(eventPath.relativeTo(path).toName())
}
}
key.reset()
}
}
}
monitor(path)
awaitClose {
watchService.close()
}
}.flowOn(Dispatchers.IO).shareIn(io.context, SharingStarted.WhileSubscribed())
} else {
emptyFlow()
}
public companion object {
public val FILE_KEY: Name = "file".asName()
public val FILE_PATH_KEY: Name = FILE_KEY + "path"
public val FILE_EXTENSION_KEY: Name = FILE_KEY + "extension"
public val FILE_CREATE_TIME_KEY: Name = FILE_KEY + "created"
public val FILE_UPDATE_TIME_KEY: Name = FILE_KEY + "updated"
public const val DF_FILE_EXTENSION: String = "df"
public val DEFAULT_IGNORE_EXTENSIONS: Set<String> = setOf(DF_FILE_EXTENSION)
}
}
///**
// * @param resources The names of the resources to read.
// * @param classLoader The class loader to use for loading the resources. By default, it uses the current thread's context class loader.
// */
//@DFExperimental
//public fun DataSink<Binary>.resources(
// io: IOPlugin,
// resource: String,
// vararg otherResources: String,
// classLoader: ClassLoader = Thread.currentThread().contextClassLoader,
//) {
// //create a file system if necessary
// val uri = Thread.currentThread().contextClassLoader.getResource("common")!!.toURI()
// try {
// uri.toPath()
// } catch (e: FileSystemNotFoundException) {
// FileSystems.newFileSystem(uri, mapOf("create" to "true"))
// }
//
// listOf(resource, *otherResources).forEach { r ->
// val path = classLoader.getResource(r)?.toURI()?.toPath() ?: error(
// "Resource with name $r is not resolved"
// )
// io.readAsDataTree(r.asName(), path)
// }
//}

@ -28,7 +28,7 @@ public class InMemoryWorkspaceCache : WorkspaceCache {
val cachedData = cache.getOrPut(TaskResultId(result.taskName, result.taskMeta)){
HashMap()
}.getOrPut(data.name){
data.data
data
}
cachedData.checkType<T>(result.dataType).named(data.name)
}

@ -1,188 +0,0 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.*
import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.DataSink
import space.kscience.dataforge.data.StaticData
import space.kscience.dataforge.io.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.copy
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.plus
import java.nio.file.*
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.spi.FileSystemProvider
import kotlin.io.path.*
import kotlin.reflect.typeOf
public object FileData {
public val FILE_KEY: Name = "file".asName()
public val FILE_PATH_KEY: Name = FILE_KEY + "path"
public val FILE_EXTENSION_KEY: Name = FILE_KEY + "extension"
public val FILE_CREATE_TIME_KEY: Name = FILE_KEY + "created"
public val FILE_UPDATE_TIME_KEY: Name = FILE_KEY + "updated"
public const val DF_FILE_EXTENSION: String = "df"
public val DEFAULT_IGNORE_EXTENSIONS: Set<String> = setOf(DF_FILE_EXTENSION)
}
/**
* Read data with supported envelope format and binary format. If the envelope format is null, then read binary directly from file.
* The operation is blocking since it must read the meta header. The reading of envelope body is lazy
*/
public fun IOPlugin.readFileData(
path: Path,
): Data<Binary> {
val envelope = readEnvelopeFile(path, true)
val updatedMeta = envelope.meta.copy {
FileData.FILE_PATH_KEY put path.toString()
FileData.FILE_EXTENSION_KEY put path.extension
val attributes = path.readAttributes<BasicFileAttributes>()
FileData.FILE_UPDATE_TIME_KEY put attributes.lastModifiedTime().toInstant().toString()
FileData.FILE_CREATE_TIME_KEY put attributes.creationTime().toInstant().toString()
}
return StaticData(
typeOf<Binary>(),
envelope.data ?: Binary.EMPTY,
updatedMeta
)
}
public fun DataSink<Binary>.file(io: IOPlugin, name: Name, path: Path) {
if (!path.isRegularFile()) error("Only regular files could be handled by this function")
put(name, io.readFileData(path))
}
public fun DataSink<Binary>.directory(
io: IOPlugin,
name: Name,
path: Path,
) {
if (!path.isDirectory()) error("Only directories could be handled by this function")
//process root data
var dataBinary: Binary? = null
var meta: Meta? = null
Files.list(path).forEach { childPath ->
val fileName = childPath.fileName.toString()
if (fileName == IOPlugin.DATA_FILE_NAME) {
dataBinary = childPath.asBinary()
} else if (fileName.startsWith(IOPlugin.META_FILE_NAME)) {
meta = io.readMetaFileOrNull(childPath)
} else if (!fileName.startsWith("@")) {
val token = if (childPath.isRegularFile() && childPath.extension in FileData.DEFAULT_IGNORE_EXTENSIONS) {
NameToken(childPath.nameWithoutExtension)
} else {
NameToken(childPath.name)
}
files(io, name + token, childPath)
}
}
//set data if it is relevant
if (dataBinary != null || meta != null) {
put(
name,
StaticData(
typeOf<Binary>(),
dataBinary ?: Binary.EMPTY,
meta ?: Meta.EMPTY
)
)
}
}
public fun DataSink<Binary>.files(
io: IOPlugin,
name: Name,
path: Path,
) {
if (path.isRegularFile() && path.extension == "zip") {
//Using explicit Zip file system to avoid bizarre compatibility bugs
val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" }
?: error("Zip file system provider not found")
val fs = fsProvider.newFileSystem(path, emptyMap<String, Any>())
files(io, name, fs.rootDirectories.first())
}
if (path.isRegularFile()) {
file(io, name, path)
} else {
directory(io, name, path)
}
}
private fun Path.toName() = Name(map { NameToken.parse(it.nameWithoutExtension) })
public fun DataSink<Binary>.monitorFiles(
io: IOPlugin,
name: Name,
path: Path,
scope: CoroutineScope = io.context,
): Job {
files(io, name, path)
return scope.launch(Dispatchers.IO) {
val watchService = path.fileSystem.newWatchService()
path.register(
watchService,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.ENTRY_CREATE
)
do {
val key = watchService.take()
if (key != null) {
for (event: WatchEvent<*> in key.pollEvents()) {
val eventPath = event.context() as Path
if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
put(eventPath.toName(), null)
} else {
val fileName = eventPath.fileName.toString()
if (!fileName.startsWith("@")) {
files(io, name, eventPath)
}
}
}
key.reset()
}
} while (isActive && key != null)
}
}
/**
* @param resources The names of the resources to read.
* @param classLoader The class loader to use for loading the resources. By default, it uses the current thread's context class loader.
*/
@DFExperimental
public fun DataSink<Binary>.resources(
io: IOPlugin,
resource: String,
vararg otherResources: String,
classLoader: ClassLoader = Thread.currentThread().contextClassLoader,
) {
//create a file system if necessary
val uri = Thread.currentThread().contextClassLoader.getResource("common")!!.toURI()
try {
uri.toPath()
} catch (e: FileSystemNotFoundException) {
FileSystems.newFileSystem(uri, mapOf("create" to "true"))
}
listOf(resource,*otherResources).forEach { r ->
val path = classLoader.getResource(r)?.toURI()?.toPath() ?: error(
"Resource with name $r is not resolved"
)
files(io, r.asName(), path)
}
}

@ -15,14 +15,14 @@ import space.kscience.dataforge.names.matches
* Select the whole data set from the workspace filtered by type.
*/
@OptIn(DFExperimental::class)
public inline fun <reified T : Any> TaskResultBuilder<*>.dataByType(namePattern: Name? = null): DataSelector<T> =
public inline fun <reified T : Any> TaskResultScope<*>.dataByType(namePattern: Name? = null): DataSelector<T> =
DataSelector<T> { workspace, _ ->
workspace.data.filterByType { name, _, _ ->
namePattern == null || name.matches(namePattern)
}
}
public suspend inline fun <reified T : Any> TaskResultBuilder<*>.fromTask(
public suspend inline fun <reified T : Any> TaskResultScope<*>.fromTask(
task: Name,
taskMeta: Meta = Meta.EMPTY,
): DataTree<T> = workspace.produce(task, taskMeta).filterByType()

@ -2,7 +2,9 @@ package space.kscience.dataforge.workspace
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import space.kscience.dataforge.data.*
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.forEach
import space.kscience.dataforge.data.meta
import space.kscience.dataforge.io.*
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
@ -32,8 +34,8 @@ public suspend fun <T : Any> IOPlugin.writeDataDirectory(
} else if (!Files.isDirectory(path)) {
error("Can't write a node into file")
}
dataSet.forEach { (name, data) ->
val childPath = path.resolve(name.tokens.joinToString("/") { token -> token.toStringUnescaped() })
dataSet.forEach { data ->
val childPath = path.resolve(data.name.tokens.joinToString("/") { token -> token.toStringUnescaped() })
childPath.parent.createDirectories()
val envelope = data.toEnvelope(format)
if (envelopeFormat != null) {

@ -29,7 +29,7 @@ internal class CachingWorkspaceTest {
inMemoryCache()
val doFirst by task<Any> {
transformEach(allData) { _, name, _ ->
transformEach(allData) { (name, _, _) ->
firstCounter++
println("Done first on $name with flag=${taskMeta["flag"].boolean}")
}
@ -39,7 +39,7 @@ internal class CachingWorkspaceTest {
transformEach(
doFirst,
dependencyMeta = if (taskMeta["flag"].boolean == true) taskMeta else Meta.EMPTY
) { _, name, _ ->
) { (name, _, _) ->
secondCounter++
println("Done second on $name with flag=${taskMeta["flag"].boolean ?: false}")
}
@ -52,11 +52,11 @@ internal class CachingWorkspaceTest {
val secondC = workspace.produce("doSecond")
//use coroutineScope to wait for the result
coroutineScope {
first.launch(this)
secondA.launch(this)
secondB.launch(this)
first.launchIn(this)
secondA.launchIn(this)
secondB.launchIn(this)
//repeat to check caching
secondC.launch(this)
secondC.launchIn(this)
}
assertEquals(10, firstCounter)

@ -52,7 +52,7 @@ class FileDataTest {
io.writeDataDirectory(dir, dataNode, StringIOFormat)
println(dir.toUri().toString())
val data = DataTree {
files(io, Name.EMPTY, dir)
io.readAsDataTree(Name.EMPTY, dir)
}
val reconstructed = data.map { (_, value) -> value.toByteArray().decodeToString() }
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
@ -68,7 +68,7 @@ class FileDataTest {
zip.deleteExisting()
io.writeZip(zip, dataNode, StringIOFormat)
println(zip.toUri().toString())
val reconstructed = DataTree { files(io, Name.EMPTY, zip) }
val reconstructed = DataTree { io.readAsDataTree(Name.EMPTY, zip) }
.map { (_, value) -> value.toByteArray().decodeToString() }
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await())

@ -26,7 +26,7 @@ class FileWorkspaceCacheTest {
}
}
workspace.produce("echo").launch(this)
workspace.produce("echo").launchIn(this)
}
}