WIP full data refactor

This commit is contained in:
Alexander Nozik 2021-01-10 17:46:53 +03:00
parent 9ed4245d84
commit 23fae9794f
20 changed files with 190 additions and 126 deletions

View File

@ -6,7 +6,7 @@ import hep.dataforge.names.startsWith
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlin.reflect.KType
import kotlin.reflect.KClass
/**
* Remove all values with keys starting with [name]
@ -20,7 +20,7 @@ internal fun MutableMap<Name, *>.removeWhatStartsWith(name: Name) {
* An action that caches results on-demand and recalculates them on source push
*/
public abstract class CachingAction<in T : Any, out R : Any>(
public val outputType: KType,
public val outputType: KClass<out R>,
) : Action<T, R> {
protected abstract fun CoroutineScope.transform(

View File

@ -1,6 +1,7 @@
package hep.dataforge.data
import hep.dataforge.meta.DFExperimental
import hep.dataforge.meta.Meta
import hep.dataforge.meta.set
import hep.dataforge.names.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
@ -46,28 +47,6 @@ public interface DataSet<out T : Any> {
}
}
/**
* A stateless filtered [DataSet]
*/
@DFExperimental
public fun <T : Any> DataSet<T>.filter(
predicate: suspend (Name, Data<T>) -> Boolean,
): DataSet<T> = object : DataSet<T> {
override val dataType: KClass<out T> get() = this@filter.dataType
override fun flow(): Flow<NamedData<T>> =
this@filter.flow().filter { predicate(it.name, it.data) }
override suspend fun getData(name: Name): Data<T>? = this@filter.getData(name)?.takeIf {
predicate(name, it)
}
override val updates: Flow<Name> = this@filter.updates.filter flowFilter@{ name ->
val theData = this@filter.getData(name) ?: return@flowFilter false
predicate(name, theData)
}
}
/**
* Flow all data nodes with names starting with [branchName]
*/
@ -75,40 +54,6 @@ public fun <T : Any> DataSet<T>.flowChildren(branchName: Name): Flow<NamedData<T
it.name.startsWith(branchName)
}
/**
* Get a subset of data starting with a given [branchName]
*/
public fun <T : Any> DataSet<T>.branch(branchName: Name): DataSet<T> = if (branchName.isEmpty()) this
else object : DataSet<T> {
override val dataType: KClass<out T> get() = this@branch.dataType
override fun flow(): Flow<NamedData<T>> = this@branch.flow().mapNotNull {
it.name.removeHeadOrNull(branchName)?.let { name ->
it.data.named(name)
}
}
override suspend fun getData(name: Name): Data<T>? = this@branch.getData(branchName + name)
override val updates: Flow<Name> get() = this@branch.updates.mapNotNull { it.removeHeadOrNull(branchName) }
}
/**
* Generate a wrapper data set with a given name prefix appended to all names
*/
public fun <T : Any> DataSet<T>.withNamePrefix(prefix: Name): DataSet<T> = if (prefix.isEmpty()) this
else object : DataSet<T> {
override val dataType: KClass<out T> get() = this@withNamePrefix.dataType
override fun flow(): Flow<NamedData<T>> = this@withNamePrefix.flow().map { it.data.named(prefix + it.name) }
override suspend fun getData(name: Name): Data<T>? =
name.removeHeadOrNull(name)?.let { this@withNamePrefix.getData(it) }
override val updates: Flow<Name> get() = this@withNamePrefix.updates.map { prefix + it }
}
/**
* Start computation for all goals in data node and return a job for the whole node
*/
@ -119,3 +64,16 @@ public fun <T : Any> DataSet<T>.startAll(coroutineScope: CoroutineScope): Job =
}
public suspend fun <T : Any> DataSet<T>.join(): Unit = coroutineScope { startAll(this).join() }
public suspend fun DataSet<*>.toMeta(): Meta = Meta {
flow().collect {
if (it.name.endsWith(DataSet.META_KEY)) {
set(it.name, it.meta)
} else {
it.name put {
"type" put it.type.simpleName
"meta" put it.meta
}
}
}
}

View File

@ -97,10 +97,10 @@ public fun <T : Any> DataTree<T>.itemFlow(): Flow<Pair<Name, DataTreeItem<T>>> =
* Get a branch of this [DataTree] with a given [branchName].
* The difference from similar method for [DataSet] is that internal logic is more simple and the return value is a [DataTree]
*/
public fun <T : Any> DataTree<T>.branch(branchName: Name): DataTree<T> = object : DataTree<T> {
override val dataType: KClass<out T> get() = this@branch.dataType
public operator fun <T : Any> DataTree<T>.get(branchName: Name): DataTree<T> = object : DataTree<T> {
override val dataType: KClass<out T> get() = this@get.dataType
override val updates: Flow<Name> = this@branch.updates.mapNotNull { it.removeHeadOrNull(branchName) }
override val updates: Flow<Name> = this@get.updates.mapNotNull { it.removeHeadOrNull(branchName) }
override suspend fun items(): Map<NameToken, DataTreeItem<T>> = getItem(branchName).tree?.items() ?: emptyMap()
}

View File

@ -50,7 +50,7 @@ public class MapAction<T : Any, out R : Any>(
//applying transformation from builder
val builder = MapActionBuilder<T, R>(
data.name,
data.meta.builder(), // using data meta
data.meta.toMutableMeta(), // using data meta
meta
).apply(block)

View File

@ -155,5 +155,5 @@ public suspend fun <T : Any> DataSet<T>.toMutableTree(
}.launchIn(scope)
}
public fun <T : Any> MutableDataTree<T>.branch(branchName: Name): MutableDataTree<T> =
(this as DataTree<T>).branch(branchName) as MutableDataTree<T>
public fun <T : Any> MutableDataTree<T>.get(branchName: Name): MutableDataTree<T> =
(this as DataTree<T>).get(branchName) as MutableDataTree<T>

View File

@ -3,7 +3,7 @@ package hep.dataforge.data
import hep.dataforge.meta.Laminate
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
import hep.dataforge.meta.builder
import hep.dataforge.meta.toMutableMeta
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import kotlinx.coroutines.CoroutineScope
@ -58,7 +58,7 @@ public class SplitAction<T : Any, R : Any>(
// apply individual fragment rules to result
return split.fragments.entries.asFlow().map { (fragmentName, rule) ->
val env = SplitBuilder.FragmentRule<T, R>(fragmentName, laminate.builder()).apply(rule)
val env = SplitBuilder.FragmentRule<T, R>(fragmentName, laminate.toMutableMeta()).apply(rule)
data.map(outputType, meta = env.meta) { env.result(it) }.named(fragmentName)
}
}

View File

@ -0,0 +1,72 @@
package hep.dataforge.data
import hep.dataforge.meta.DFExperimental
import hep.dataforge.names.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlin.reflect.KClass
/**
* A stateless filtered [DataSet]
*/
@DFExperimental
public fun <T : Any> DataSet<T>.filter(
predicate: suspend (Name, Data<T>) -> Boolean,
): DataSet<T> = object : DataSet<T> {
override val dataType: KClass<out T> get() = this@filter.dataType
override fun flow(): Flow<NamedData<T>> =
this@filter.flow().filter { predicate(it.name, it.data) }
override suspend fun getData(name: Name): Data<T>? = this@filter.getData(name)?.takeIf {
predicate(name, it)
}
override val updates: Flow<Name> = this@filter.updates.filter flowFilter@{ name ->
val theData = this@filter.getData(name) ?: return@flowFilter false
predicate(name, theData)
}
}
/**
* Generate a wrapper data set with a given name prefix appended to all names
*/
public fun <T : Any> DataSet<T>.withNamePrefix(prefix: Name): DataSet<T> = if (prefix.isEmpty()) this
else object : DataSet<T> {
override val dataType: KClass<out T> get() = this@withNamePrefix.dataType
override fun flow(): Flow<NamedData<T>> = this@withNamePrefix.flow().map { it.data.named(prefix + it.name) }
override suspend fun getData(name: Name): Data<T>? =
name.removeHeadOrNull(name)?.let { this@withNamePrefix.getData(it) }
override val updates: Flow<Name> get() = this@withNamePrefix.updates.map { prefix + it }
}
/**
* Get a subset of data starting with a given [branchName]
*/
public operator fun <T : Any> DataSet<T>.get(branchName: Name): DataSet<T> = if (branchName.isEmpty()) this
else object : DataSet<T> {
override val dataType: KClass<out T> get() = this@get.dataType
override fun flow(): Flow<NamedData<T>> = this@get.flow().mapNotNull {
it.name.removeHeadOrNull(branchName)?.let { name ->
it.data.named(name)
}
}
override suspend fun getData(name: Name): Data<T>? = this@get.getData(branchName + name)
override val updates: Flow<Name> get() = this@get.updates.mapNotNull { it.removeHeadOrNull(branchName) }
}
public operator fun <T : Any> DataSet<T>.get(branchName: String): DataSet<T> = this@get.get(branchName.toName())
@DFExperimental
public suspend fun <T : Any> DataSet<T>.rootData(): Data<T>? = getData(Name.EMPTY)

View File

@ -1,7 +1,5 @@
package hep.dataforge.data
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import kotlinx.coroutines.runBlocking
import kotlin.reflect.KClass
import kotlin.reflect.full.isSubclassOf
@ -61,10 +59,3 @@ public fun <R : Any> DataSet<*>.cast(type: KClass<out R>): DataSet<R> =
*/
internal fun <R : Any> DataSet<*>.canCast(type: KClass<out R>): Boolean =
type.isSubclassOf(this.dataType)
public operator fun <T : Any> DataTree<T>.get(name: Name): DataTreeItem<T>? = runBlocking {
getItem(name)
}
public operator fun <T : Any> DataTree<T>.get(name: String): DataTreeItem<T>? = get(name.toName())

View File

@ -12,7 +12,7 @@ import kotlin.jvm.JvmName
*/
@DFBuilder
public class MetaBuilder : AbstractMutableMeta<MetaBuilder>() {
override fun wrapNode(meta: Meta): MetaBuilder = if (meta is MetaBuilder) meta else meta.builder()
override fun wrapNode(meta: Meta): MetaBuilder = if (meta is MetaBuilder) meta else meta.toMutableMeta()
override fun empty(): MetaBuilder = MetaBuilder()
public infix fun String.put(item: MetaItem?) {
@ -121,13 +121,13 @@ public class MetaBuilder : AbstractMutableMeta<MetaBuilder>() {
/**
* For safety, builder always copies the initial meta even if it is builder itself
*/
public fun Meta.builder(): MetaBuilder {
public fun Meta.toMutableMeta(): MetaBuilder {
return MetaBuilder().also { builder ->
items.mapValues { entry ->
val item = entry.value
builder[entry.key.asName()] = when (item) {
is MetaItemValue -> item.value
is MetaItemNode -> MetaItemNode(item.node.builder())
is MetaItemNode -> MetaItemNode(item.node.toMutableMeta())
}
}
}

View File

@ -116,7 +116,7 @@ public inline class MetaTransformation(public val transformations: Collection<Tr
* Transform a meta, replacing all elements found in rules with transformed entries
*/
public fun apply(source: Meta): Meta =
source.builder().apply {
source.toMutableMeta().apply {
transformations.forEach { rule ->
rule.selectItems(source).forEach { name ->
remove(name)

View File

@ -22,6 +22,14 @@ public interface DataPlacement: MetaRepr {
override fun toMeta(): Meta = Meta{"from" put "*"}
}
public fun into(target: Name): DataPlacement = DataPlacementScheme{
to = target.toString()
}
public fun into(target: String): DataPlacement = DataPlacementScheme{
to = target
}
}
}

View File

@ -3,7 +3,7 @@ package hep.dataforge.workspace
import hep.dataforge.data.DataSet
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaRepr
import hep.dataforge.meta.builder
import hep.dataforge.meta.toMutableMeta
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import hep.dataforge.names.plus
@ -48,7 +48,7 @@ public class ExternalTaskDependency<T : Any>(
override val name: Name get() = EXTERNAL_TASK_NAME + task.name
override fun toMeta(): Meta = placement.toMeta().builder().apply {
override fun toMeta(): Meta = placement.toMeta().toMutableMeta().apply {
"name" put name.toString()
"task" put task.toString()
"meta" put meta

View File

@ -123,8 +123,8 @@ public class TaskModelBuilder(public val name: Name, meta: Meta = Meta.EMPTY) :
/**
* Meta for current task. By default uses the whole input meta
*/
public var meta: MetaBuilder = meta.builder()
private val dependencies: HashSet<Dependency> = HashSet<Dependency>()
public var meta: MetaBuilder = meta.toMutableMeta()
private val dependencies: HashSet<Dependency> = HashSet()
override val defaultMeta: Meta get() = meta

View File

@ -57,7 +57,7 @@ public class TaskBuilder<R : Any>(private val name: Name, public val type: KClas
) {
dataTransforms += { context, model, data ->
val env = TaskEnv(Name.EMPTY, model.meta, context, data)
val startData = data.branch(from)
val startData = data.get(from)
env.block(startData).withNamePrefix(to)
}
}

View File

@ -52,7 +52,7 @@ public fun WorkspaceBuilder.target(name: String, block: MetaBuilder.() -> Unit)
*/
public fun WorkspaceBuilder.target(name: String, base: String, block: MetaBuilder.() -> Unit) {
val parentTarget = targets[base] ?: error("Base target with name $base not found")
targets[name] = parentTarget.builder()
targets[name] = parentTarget.toMutableMeta()
.apply { "@baseTarget" put base }
.apply(block)
.seal()

View File

@ -15,7 +15,9 @@ import java.nio.file.StandardOpenOption
import java.nio.file.spi.FileSystemProvider
import java.util.zip.ZipEntry
import java.util.zip.ZipOutputStream
import kotlin.reflect.KClass
import kotlin.reflect.KType
import kotlin.reflect.typeOf
import kotlin.streams.toList
//public typealias FileFormatResolver<T> = (Path, Meta) -> IOFormat<T>
@ -25,6 +27,18 @@ public interface FileFormatResolver<T: Any>{
public operator fun invoke(path: Path, meta: Meta): IOFormat<T>
}
@PublishedApi
internal inline fun <reified T : Any> IOPlugin.formatResolver(): FileFormatResolver<T> =
object : FileFormatResolver<T> {
override val type: KType = typeOf<T>()
override fun invoke(path: Path, meta: Meta): IOFormat<T> =
resolveIOFormat<T>() ?: error("Can't resolve IO format for ${T::class}")
}
private val <T : Any> FileFormatResolver<T>.kClass: KClass<T>
get() = type.classifier as? KClass<T> ?: error("Format resolver actual type does not correspond to type parameter")
private fun newZFS(path: Path): FileSystem {
val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" }
?: error("Zip file system provider not found")
@ -51,9 +65,7 @@ public fun <T : Any> IOPlugin.readDataFile(
}
@DFExperimental
public inline fun <reified T : Any> IOPlugin.readDataFile(path: Path): Data<T> = readDataFile(path) { _, _ ->
resolveIOFormat<T>() ?: error("Can't resolve IO format for ${T::class}")
}
public inline fun <reified T : Any> IOPlugin.readDataFile(path: Path): Data<T> = readDataFile(path, formatResolver())
/**
* Add file/directory-based data tree item
@ -98,7 +110,7 @@ public fun <T : Any> IOPlugin.readDataDirectory(
return readDataDirectory(fs.rootDirectories.first(), formatResolver)
}
if (!Files.isDirectory(path)) error("Provided path $path is not a directory")
return DataTree.static(formatResolver.type) {
return DataTree.static(formatResolver.kClass) {
Files.list(path).toList().forEach { path ->
val fileName = path.fileName.toString()
if (fileName.startsWith(IOPlugin.META_FILE_NAME)) {
@ -114,9 +126,7 @@ public fun <T : Any> IOPlugin.readDataDirectory(
@DFExperimental
public inline fun <reified T : Any> IOPlugin.readDataDirectory(path: Path): DataTree<T> =
readDataDirectory(path) { _, _ ->
resolveIOFormat<T>() ?: error("Can't resolve IO format for ${T::class}")
}
readDataDirectory(path, formatResolver())
/**
* Write data tree to existing directory or create a new one using default [java.nio.file.FileSystem] provider
@ -138,7 +148,7 @@ public suspend fun <T : Any> IOPlugin.writeDataDirectory(
tree.items().forEach { (token, item) ->
val childPath = path.resolve(token.toString())
when (item) {
is DataItem.Node -> {
is DataTreeItem.Node -> {
writeDataDirectory(childPath, item.tree, format, envelopeFormat)
}
is DataTreeItem.Leaf -> {

View File

@ -0,0 +1,9 @@
package hep.dataforge.workspace
import hep.dataforge.data.DataSet
import hep.dataforge.meta.MetaBuilder
import kotlinx.coroutines.runBlocking
public fun Workspace.runBlocking(task: String, block: MetaBuilder.() -> Unit = {}): DataSet<Any> = runBlocking{
run(task, block)
}

View File

@ -20,10 +20,10 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
val testAllData = task("allData", Int::class) {
model {
allData()
data()
}
transform<Int> { data ->
DataTree.dynamic {
DataTree.dynamic(context) {
val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair }
data("result", result)
}
@ -36,7 +36,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
data("myData\\[12\\]")
}
transform<Int> { data ->
DataTree.dynamic {
DataTree.dynamic(context) {
val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair }
data("result", result)
}
@ -48,7 +48,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
data(pattern = "myData.*")
}
transform<Int> { data ->
DataTree.dynamic {
DataTree.dynamic(context) {
val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair }
data("result", result)
}
@ -80,19 +80,25 @@ class DataPropagationTest {
@Test
fun testAllData() {
runBlocking {
val node = testWorkspace.run("Test.allData")
assertEquals(4950, node.first()!!.value())
}
}
@Test
fun testAllRegexData() {
runBlocking {
val node = testWorkspace.run("Test.allRegexData")
assertEquals(4950, node.first()!!.value())
}
}
@Test
fun testSingleData() {
runBlocking {
val node = testWorkspace.run("Test.singleData")
assertEquals(12, node.first()!!.value())
}
}
}

View File

@ -12,6 +12,7 @@ import kotlinx.io.Output
import kotlinx.io.text.readUtf8String
import kotlinx.io.text.writeUtf8String
import java.nio.file.Files
import java.nio.file.Path
import kotlin.reflect.KType
import kotlin.reflect.typeOf
import kotlin.test.Test
@ -49,6 +50,13 @@ class FileDataTest {
}
object StringFormatResolver: FileFormatResolver<String>{
override val type: KType = typeOf<String>()
override fun invoke(path: Path, meta: Meta): IOFormat<String> =StringIOFormat
}
@Test
@DFExperimental
fun testDataWriteRead() {
@ -58,9 +66,9 @@ class FileDataTest {
writeDataDirectory(dir, dataNode, StringIOFormat)
}
println(dir.toUri().toString())
val reconstructed = readDataDirectory(dir, String::class) { _, _ -> StringIOFormat }
assertEquals(dataNode["dir.a"]?.meta, reconstructed["dir.a"]?.meta)
assertEquals(dataNode["b"]?.data?.get(), reconstructed["b"]?.data?.value())
val reconstructed = readDataDirectory(dir,StringFormatResolver)
assertEquals(dataNode["dir.a"].data?.meta, reconstructed["dir.a"].data?.meta)
assertEquals(dataNode["b"]?.data?.value(), reconstructed["b"]?.data?.value())
}
}
@ -74,9 +82,9 @@ class FileDataTest {
writeZip(zip, dataNode, StringIOFormat)
}
println(zip.toUri().toString())
val reconstructed = readDataDirectory(zip, String::class) { _, _ -> StringIOFormat }
assertEquals(dataNode["dir.a"]?.meta, reconstructed["dir.a"]?.meta)
assertEquals(dataNode["b"]?.data?.get(), reconstructed["b"]?.data?.value())
val reconstructed = readDataDirectory(zip, StringFormatResolver)
assertEquals(dataNode["dir.a"].data?.meta, reconstructed["dir.a"].data?.meta)
assertEquals(dataNode["b"]?.data?.value(), reconstructed["b"]?.data?.value())
}
}
}

View File

@ -85,13 +85,13 @@ class SimpleWorkspaceTest {
val fullSquare = task<Int>("fullsquare") {
model {
val squareDep = dependsOn(square, placement = "square")
val linearDep = dependsOn(linear, placement = "linear")
val squareDep = dependsOn(square, placement = DataPlacement.into("square"))
val linearDep = dependsOn(linear, placement = DataPlacement.into("linear"))
}
transform<Int> { data ->
val squareNode = data["square"].tree!!.filterIsInstance<Int>() //squareDep()
val linearNode = data["linear"].tree!!.filterIsInstance<Int>() //linearDep()
DataTree.dynamic<Int> {
val squareNode = data["square"].filterIsInstance<Int>() //squareDep()
val linearNode = data["linear"].filterIsInstance<Int>() //linearDep()
DataTree.dynamic<Int>(context) {
squareNode.flow().collect {
val newData: Data<Int> = Data {
val squareValue = squareNode.getData(it.name)!!.value()
@ -142,7 +142,7 @@ class SimpleWorkspaceTest {
val customPipeTask = task<Int>("custom") {
mapAction<Int> {
meta = meta.builder().apply {
meta = meta.toMutableMeta().apply {
"newValue" put 22
}
name += "new"
@ -157,14 +157,14 @@ class SimpleWorkspaceTest {
@Test
fun testWorkspace() {
val node = workspace.run("sum")
val node = workspace.runBlocking("sum")
val res = node.first()
assertEquals(328350, res?.value())
}
@Test
fun testMetaPropagation() {
val node = workspace.run("sum") { "testFlag" put true }
val node = workspace.runBlocking("sum") { "testFlag" put true }
val res = node.first()?.value()
}
@ -177,13 +177,15 @@ class SimpleWorkspaceTest {
@Test
fun testFullSquare() {
runBlocking {
val node = workspace.run("fullsquare")
println(node.toMeta())
}
}
@Test
fun testGather() {
val node = workspace.run("filterOne")
val node = workspace.runBlocking("filterOne")
runBlocking {
assertEquals(12, node.first()?.value())
}