Move DataNode builders
This commit is contained in:
parent
8b1d5eb69e
commit
13c0d189bb
@ -19,6 +19,10 @@ allprojects {
|
||||
}
|
||||
}
|
||||
|
||||
apiValidation{
|
||||
validationDisabled = true
|
||||
}
|
||||
|
||||
subprojects {
|
||||
apply(plugin = "ru.mipt.npm.publish")
|
||||
}
|
||||
|
@ -163,18 +163,15 @@ public abstract interface class hep/dataforge/data/DataNode : hep/dataforge/meta
|
||||
public abstract fun getItems ()Ljava/util/Map;
|
||||
public abstract fun getMeta ()Lhep/dataforge/meta/Meta;
|
||||
public abstract fun getType ()Lkotlin/reflect/KClass;
|
||||
public abstract fun startAll (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/Job;
|
||||
public abstract fun toMeta ()Lhep/dataforge/meta/Meta;
|
||||
}
|
||||
|
||||
public final class hep/dataforge/data/DataNode$Companion {
|
||||
public static final field TYPE Ljava/lang/String;
|
||||
public final fun builder (Lkotlin/reflect/KClass;)Lhep/dataforge/data/DataTreeBuilder;
|
||||
public final fun invoke (Lkotlin/reflect/KClass;Lkotlin/jvm/functions/Function1;)Lhep/dataforge/data/DataTree;
|
||||
}
|
||||
|
||||
public final class hep/dataforge/data/DataNode$DefaultImpls {
|
||||
public static fun startAll (Lhep/dataforge/data/DataNode;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/Job;
|
||||
public static fun toMeta (Lhep/dataforge/data/DataNode;)Lhep/dataforge/meta/Meta;
|
||||
}
|
||||
|
||||
@ -189,13 +186,13 @@ public final class hep/dataforge/data/DataNodeKt {
|
||||
public static final fun itemSequence (Lhep/dataforge/data/DataNode;)Lkotlin/sequences/Sequence;
|
||||
public static final fun iterator (Lhep/dataforge/data/DataNode;)Ljava/util/Iterator;
|
||||
public static final fun join (Lhep/dataforge/data/DataNode;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
|
||||
public static final fun startAll (Lhep/dataforge/data/DataNode;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/Job;
|
||||
}
|
||||
|
||||
public final class hep/dataforge/data/DataTree : hep/dataforge/data/DataNode {
|
||||
public fun getItems ()Ljava/util/Map;
|
||||
public fun getMeta ()Lhep/dataforge/meta/Meta;
|
||||
public fun getType ()Lkotlin/reflect/KClass;
|
||||
public fun startAll (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/Job;
|
||||
public fun toMeta ()Lhep/dataforge/meta/Meta;
|
||||
}
|
||||
|
||||
@ -219,6 +216,7 @@ public final class hep/dataforge/data/DataTreeBuilder {
|
||||
}
|
||||
|
||||
public final class hep/dataforge/data/DataTreeBuilderKt {
|
||||
public static final fun DataTree (Lkotlin/reflect/KClass;Lkotlin/jvm/functions/Function1;)Lhep/dataforge/data/DataTree;
|
||||
public static final fun builder (Lhep/dataforge/data/DataNode;)Lhep/dataforge/data/DataTreeBuilder;
|
||||
public static final fun datum (Lhep/dataforge/data/DataTreeBuilder;Lhep/dataforge/names/Name;Lhep/dataforge/data/Data;)V
|
||||
public static final fun datum (Lhep/dataforge/data/DataTreeBuilder;Ljava/lang/String;Lhep/dataforge/data/Data;)V
|
||||
@ -303,7 +301,6 @@ public final class hep/dataforge/data/JoinGroup {
|
||||
|
||||
public final class hep/dataforge/data/MapAction : hep/dataforge/data/Action {
|
||||
public fun <init> (Lkotlin/reflect/KClass;Lkotlin/jvm/functions/Function1;)V
|
||||
public final fun getOutputType ()Lkotlin/reflect/KClass;
|
||||
public fun invoke (Lhep/dataforge/data/DataNode;Lhep/dataforge/meta/Meta;)Lhep/dataforge/data/DataNode;
|
||||
public fun isTerminal ()Z
|
||||
}
|
||||
@ -335,7 +332,6 @@ public final class hep/dataforge/data/NamedData : hep/dataforge/data/Data {
|
||||
|
||||
public final class hep/dataforge/data/ReduceAction : hep/dataforge/data/Action {
|
||||
public fun <init> (Lkotlin/reflect/KClass;Lkotlin/jvm/functions/Function1;)V
|
||||
public final fun getOutputType ()Lkotlin/reflect/KClass;
|
||||
public fun invoke (Lhep/dataforge/data/DataNode;Lhep/dataforge/meta/Meta;)Lhep/dataforge/data/DataNode;
|
||||
public fun isTerminal ()Z
|
||||
}
|
||||
@ -356,7 +352,6 @@ public final class hep/dataforge/data/ReduceGroupBuilder {
|
||||
|
||||
public final class hep/dataforge/data/SplitAction : hep/dataforge/data/Action {
|
||||
public fun <init> (Lkotlin/reflect/KClass;Lkotlin/jvm/functions/Function1;)V
|
||||
public final fun getOutputType ()Lkotlin/reflect/KClass;
|
||||
public fun invoke (Lhep/dataforge/data/DataNode;Lhep/dataforge/meta/Meta;)Lhep/dataforge/data/DataNode;
|
||||
public fun isTerminal ()Z
|
||||
}
|
||||
@ -391,7 +386,6 @@ public final class hep/dataforge/data/TypeFilteredDataNode : hep/dataforge/data/
|
||||
public fun getMeta ()Lhep/dataforge/meta/Meta;
|
||||
public final fun getOrigin ()Lhep/dataforge/data/DataNode;
|
||||
public fun getType ()Lkotlin/reflect/KClass;
|
||||
public fun startAll (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/Job;
|
||||
public fun toMeta ()Lhep/dataforge/meta/Meta;
|
||||
}
|
||||
|
||||
|
@ -44,8 +44,14 @@ public interface DataNode<out T : Any> : MetaRepr {
|
||||
*/
|
||||
public val type: KClass<out T>
|
||||
|
||||
/**
|
||||
* Children items of this data node
|
||||
*/
|
||||
public val items: Map<NameToken, DataItem<T>>
|
||||
|
||||
/**
|
||||
* Meta for this node
|
||||
*/
|
||||
public val meta: Meta
|
||||
|
||||
override fun toMeta(): Meta = Meta {
|
||||
@ -58,39 +64,33 @@ public interface DataNode<out T : Any> : MetaRepr {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start computation for all goals in data node and return a job for the whole node
|
||||
*/
|
||||
@Suppress("DeferredResultUnused")
|
||||
public fun CoroutineScope.startAll(): Job = launch {
|
||||
items.values.forEach {
|
||||
when (it) {
|
||||
is DataItem.Node<*> -> it.node.run { startAll() }
|
||||
is DataItem.Leaf<*> -> it.data.run { startAsync(this@launch) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public companion object {
|
||||
public const val TYPE: String = "dataNode"
|
||||
|
||||
public operator fun <T : Any> invoke(type: KClass<out T>, block: DataTreeBuilder<T>.() -> Unit): DataTree<T> =
|
||||
DataTreeBuilder(type).apply(block).build()
|
||||
|
||||
public inline operator fun <reified T : Any> invoke(noinline block: DataTreeBuilder<T>.() -> Unit): DataTree<T> =
|
||||
DataTreeBuilder(T::class).apply(block).build()
|
||||
|
||||
public fun <T : Any> builder(type: KClass<out T>): DataTreeBuilder<T> = DataTreeBuilder(type)
|
||||
}
|
||||
}
|
||||
|
||||
public suspend fun <T: Any> DataNode<T>.join(): Unit = coroutineScope { startAll().join() }
|
||||
/**
|
||||
* Start computation for all goals in data node and return a job for the whole node
|
||||
*/
|
||||
@Suppress("DeferredResultUnused")
|
||||
public fun <T : Any> DataNode<T>.startAll(coroutineScope: CoroutineScope): Job = coroutineScope.launch {
|
||||
items.values.forEach {
|
||||
when (it) {
|
||||
is DataItem.Node<*> -> it.node.run { startAll(this@launch) }
|
||||
is DataItem.Leaf<*> -> it.data.run { this.startAsync(this@launch) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public suspend fun <T: Any> DataNode<T>.join(): Unit = coroutineScope { startAll(this).join() }
|
||||
|
||||
public val <T : Any> DataItem<T>?.node: DataNode<T>? get() = (this as? DataItem.Node<T>)?.node
|
||||
public val <T : Any> DataItem<T>?.data: Data<T>? get() = (this as? DataItem.Leaf<T>)?.data
|
||||
|
||||
public operator fun <T : Any> DataNode<T>.get(name: Name): DataItem<T>? = when (name.length) {
|
||||
0 -> error("Empty name")
|
||||
0 -> DataItem.Node(this)
|
||||
1 -> items[name.firstOrNull()]
|
||||
else -> get(name.firstOrNull()!!.asName()).node?.get(name.cutFirst())
|
||||
}
|
||||
@ -127,7 +127,8 @@ public fun <T : Any> DataNode<T>.dataSequence(): Sequence<Pair<Name, Data<T>>> =
|
||||
}
|
||||
}
|
||||
|
||||
public fun <T : Any> DataNode<T>.filter(predicate: (Name, Data<T>) -> Boolean): DataNode<T> = DataNode.invoke(type) {
|
||||
@DFExperimental
|
||||
public fun <T : Any> DataNode<T>.filter(predicate: (Name, Data<T>) -> Boolean): DataNode<T> = DataTree(type) {
|
||||
dataSequence().forEach { (name, data) ->
|
||||
if (predicate(name, data)) {
|
||||
this[name] = data
|
||||
@ -137,6 +138,5 @@ public fun <T : Any> DataNode<T>.filter(predicate: (Name, Data<T>) -> Boolean):
|
||||
|
||||
public fun <T : Any> DataNode<T>.first(): Data<T>? = dataSequence().firstOrNull()?.second
|
||||
|
||||
|
||||
public operator fun <T : Any> DataNode<T>.iterator(): Iterator<Pair<Name, DataItem<T>>> = itemSequence().iterator()
|
||||
|
||||
|
@ -119,6 +119,14 @@ public class DataTreeBuilder<T : Any>(public val type: KClass<out T>) {
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("FunctionName")
|
||||
public fun <T : Any> DataTree(type: KClass<out T>, block: DataTreeBuilder<T>.() -> Unit): DataTree<T> =
|
||||
DataTreeBuilder(type).apply(block).build()
|
||||
|
||||
@Suppress("FunctionName")
|
||||
public inline fun <reified T : Any> DataTree(noinline block: DataTreeBuilder<T>.() -> Unit): DataTree<T> =
|
||||
DataTreeBuilder(T::class).apply(block).build()
|
||||
|
||||
|
||||
public fun <T : Any> DataTreeBuilder<T>.datum(name: Name, data: Data<T>) {
|
||||
this[name] = data
|
||||
@ -149,11 +157,11 @@ public fun <T : Any> DataTreeBuilder<T>.node(name: String, node: DataNode<T>) {
|
||||
}
|
||||
|
||||
public inline fun <reified T : Any> DataTreeBuilder<T>.node(name: Name, noinline block: DataTreeBuilder<T>.() -> Unit) {
|
||||
this[name] = DataNode(T::class, block)
|
||||
this[name] = DataTree(T::class, block)
|
||||
}
|
||||
|
||||
public inline fun <reified T : Any> DataTreeBuilder<T>.node(name: String, noinline block: DataTreeBuilder<T>.() -> Unit) {
|
||||
this[name.toName()] = DataNode(T::class, block)
|
||||
this[name.toName()] = DataTree(T::class, block)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -30,11 +30,11 @@ public class MapActionBuilder<T, R>(public var name: Name, public var meta: Meta
|
||||
|
||||
|
||||
public class MapAction<T : Any, out R : Any>(
|
||||
public val outputType: KClass<out R>,
|
||||
private val outputType: KClass<out R>,
|
||||
private val block: MapActionBuilder<T, R>.() -> Unit
|
||||
) : Action<T, R> {
|
||||
|
||||
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> = DataNode(outputType) {
|
||||
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> = DataTree(outputType) {
|
||||
node.dataSequence().forEach { (name, data) ->
|
||||
/*
|
||||
* Creating a new environment for action using **old** name, old meta and task meta
|
||||
|
@ -72,11 +72,11 @@ public class ReduceGroupBuilder<T : Any, R : Any>(public val actionMeta: Meta) {
|
||||
* The same rules as for KPipe
|
||||
*/
|
||||
public class ReduceAction<T : Any, R : Any>(
|
||||
public val outputType: KClass<out R>,
|
||||
private val outputType: KClass<out R>,
|
||||
private val action: ReduceGroupBuilder<T, R>.() -> Unit
|
||||
) : Action<T, R> {
|
||||
|
||||
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> = DataNode(outputType) {
|
||||
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> = DataTree(outputType) {
|
||||
ReduceGroupBuilder<T, R>(meta).apply(action).buildGroups(node).forEach { group ->
|
||||
|
||||
//val laminate = Laminate(group.meta, meta)
|
||||
|
@ -33,11 +33,11 @@ public class SplitBuilder<T : Any, R : Any>(public val name: Name, public val me
|
||||
}
|
||||
|
||||
public class SplitAction<T : Any, R : Any>(
|
||||
public val outputType: KClass<out R>,
|
||||
private val outputType: KClass<out R>,
|
||||
private val action: SplitBuilder<T, R>.() -> Unit
|
||||
) : Action<T, R> {
|
||||
|
||||
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> = DataNode(outputType) {
|
||||
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> = DataTree(outputType) {
|
||||
node.dataSequence().forEach { (name, data) ->
|
||||
|
||||
val laminate = Laminate(data.meta, meta)
|
||||
|
@ -7,14 +7,14 @@ import kotlin.test.assertTrue
|
||||
internal class DataTreeBuilderTest{
|
||||
@Test
|
||||
fun testDataUpdate(){
|
||||
val updateData = DataNode<Any>{
|
||||
val updateData = DataTree<Any>{
|
||||
"update" put {
|
||||
"a" put Data.static("a")
|
||||
"b" put Data.static("b")
|
||||
}
|
||||
}
|
||||
|
||||
val node = DataNode<Any>{
|
||||
val node = DataTree<Any>{
|
||||
node("primary"){
|
||||
static("a","a")
|
||||
static("b","b")
|
||||
|
@ -2,6 +2,7 @@ package hep.dataforge.workspace
|
||||
|
||||
import hep.dataforge.data.DataFilter
|
||||
import hep.dataforge.data.DataNode
|
||||
import hep.dataforge.data.DataTree
|
||||
import hep.dataforge.data.filter
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.MetaBuilder
|
||||
@ -24,7 +25,7 @@ public class DataDependency(private val filter: DataFilter, private val placemen
|
||||
return if (placement.isEmpty()) {
|
||||
result
|
||||
} else {
|
||||
DataNode.invoke(Any::class) { this[placement] = result }
|
||||
DataTree(Any::class) { this[placement] = result }
|
||||
}
|
||||
}
|
||||
|
||||
@ -38,7 +39,7 @@ public class AllDataDependency(private val placement: Name = Name.EMPTY) : Depen
|
||||
override fun apply(workspace: Workspace): DataNode<Any> = if (placement.isEmpty()) {
|
||||
workspace.data
|
||||
} else {
|
||||
DataNode.invoke(Any::class) { this[placement] = workspace.data }
|
||||
DataTree(Any::class) { this[placement] = workspace.data }
|
||||
}
|
||||
|
||||
override fun toMeta(): MetaBuilder = Meta {
|
||||
@ -65,7 +66,7 @@ public abstract class TaskDependency<out T : Any>(
|
||||
return if (placement.isEmpty()) {
|
||||
result
|
||||
} else {
|
||||
DataNode(task.type) { this[placement] = result }
|
||||
DataTree(task.type) { this[placement] = result }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -22,7 +22,6 @@ public interface WorkspaceBuilder {
|
||||
public fun build(): Workspace
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Set the context for future workspcace
|
||||
*/
|
||||
|
@ -37,7 +37,7 @@ private fun newZFS(path: Path): FileSystem {
|
||||
public fun <T : Any> IOPlugin.readDataFile(
|
||||
path: Path,
|
||||
type: KClass<out T>,
|
||||
formatResolver: FileFormatResolver<T>
|
||||
formatResolver: FileFormatResolver<T>,
|
||||
): Data<T> {
|
||||
val envelope = readEnvelopeFile(path, true) ?: error("Can't read data from $path")
|
||||
val format = formatResolver(path, envelope.meta)
|
||||
@ -57,7 +57,7 @@ public inline fun <reified T : Any> IOPlugin.readDataFile(path: Path): Data<T> =
|
||||
public fun <T : Any> DataTreeBuilder<T>.file(
|
||||
plugin: IOPlugin,
|
||||
path: Path,
|
||||
formatResolver: FileFormatResolver<T>
|
||||
formatResolver: FileFormatResolver<T>,
|
||||
) {
|
||||
//If path is a single file or a special directory, read it as single datum
|
||||
if (!Files.isDirectory(path) || Files.list(path).allMatch { it.fileName.toString().startsWith("@") }) {
|
||||
@ -85,7 +85,7 @@ public fun <T : Any> DataTreeBuilder<T>.file(
|
||||
public fun <T : Any> IOPlugin.readDataDirectory(
|
||||
path: Path,
|
||||
type: KClass<out T>,
|
||||
formatResolver: FileFormatResolver<T>
|
||||
formatResolver: FileFormatResolver<T>,
|
||||
): DataNode<T> {
|
||||
//read zipped data node
|
||||
if (path.fileName != null && path.fileName.toString().endsWith(".zip")) {
|
||||
@ -94,7 +94,7 @@ public fun <T : Any> IOPlugin.readDataDirectory(
|
||||
return readDataDirectory(fs.rootDirectories.first(), type, formatResolver)
|
||||
}
|
||||
if (!Files.isDirectory(path)) error("Provided path $path is not a directory")
|
||||
return DataNode(type) {
|
||||
return DataTree(type) {
|
||||
Files.list(path).forEach { path ->
|
||||
val fileName = path.fileName.toString()
|
||||
if (fileName.startsWith(IOPlugin.META_FILE_NAME)) {
|
||||
@ -121,7 +121,7 @@ public suspend fun <T : Any> IOPlugin.writeDataDirectory(
|
||||
node: DataNode<T>,
|
||||
format: IOFormat<T>,
|
||||
envelopeFormat: EnvelopeFormat? = null,
|
||||
metaFormat: MetaFormatFactory? = null
|
||||
metaFormat: MetaFormatFactory? = null,
|
||||
) {
|
||||
withContext(Dispatchers.IO) {
|
||||
if (!Files.exists(path)) {
|
||||
@ -156,7 +156,7 @@ private suspend fun <T : Any> ZipOutputStream.writeNode(
|
||||
name: String,
|
||||
item: DataItem<T>,
|
||||
dataFormat: IOFormat<T>,
|
||||
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat
|
||||
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat,
|
||||
) {
|
||||
withContext(Dispatchers.IO) {
|
||||
when (item) {
|
||||
@ -187,7 +187,7 @@ suspend fun <T : Any> IOPlugin.writeZip(
|
||||
path: Path,
|
||||
node: DataNode<T>,
|
||||
format: IOFormat<T>,
|
||||
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat
|
||||
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat,
|
||||
) {
|
||||
withContext(Dispatchers.IO) {
|
||||
val actualFile = if (path.toString().endsWith(".zip")) {
|
||||
@ -195,7 +195,10 @@ suspend fun <T : Any> IOPlugin.writeZip(
|
||||
} else {
|
||||
path.resolveSibling(path.fileName.toString() + ".zip")
|
||||
}
|
||||
val fos = Files.newOutputStream(actualFile, StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)
|
||||
val fos = Files.newOutputStream(actualFile,
|
||||
StandardOpenOption.WRITE,
|
||||
StandardOpenOption.CREATE,
|
||||
StandardOpenOption.TRUNCATE_EXISTING)
|
||||
val zos = ZipOutputStream(fos)
|
||||
zos.use {
|
||||
it.writeNode("", DataItem.Node(node), format, envelopeFormat)
|
||||
|
@ -19,7 +19,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
|
||||
allData()
|
||||
}
|
||||
transform<Int> { data ->
|
||||
return@transform DataNode {
|
||||
DataTree {
|
||||
val result = data.dataSequence().map { it.second.get() }.reduce { acc, pair -> acc + pair }
|
||||
set("result".asName(), Data { result })
|
||||
}
|
||||
@ -32,7 +32,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
|
||||
data("myData\\[12\\]")
|
||||
}
|
||||
transform<Int> { data ->
|
||||
return@transform DataNode {
|
||||
DataTree {
|
||||
val result = data.dataSequence().map { it.second.get() }.reduce { acc, pair -> acc + pair }
|
||||
set("result".asName(), Data { result })
|
||||
}
|
||||
@ -44,7 +44,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
|
||||
data(pattern = "myData.*")
|
||||
}
|
||||
transform<Int> { data ->
|
||||
return@transform DataNode {
|
||||
DataTree{
|
||||
val result = data.dataSequence().map { it.second.get() }.reduce { acc, pair -> acc + pair }
|
||||
set("result".asName(), Data { result })
|
||||
}
|
||||
|
@ -17,7 +17,7 @@ import kotlin.test.assertEquals
|
||||
|
||||
|
||||
class FileDataTest {
|
||||
val dataNode = DataNode<String> {
|
||||
val dataNode = DataTree<String> {
|
||||
node("dir") {
|
||||
static("a", "Some string") {
|
||||
"content" put "Some string"
|
||||
|
@ -74,10 +74,10 @@ class SimpleWorkspaceTest {
|
||||
val squareDep = dependsOn(square, placement = "square")
|
||||
val linearDep = dependsOn(linear, placement = "linear")
|
||||
}
|
||||
transform { data ->
|
||||
transform<Int> { data ->
|
||||
val squareNode = data["square"].node!!.cast<Int>()//squareDep()
|
||||
val linearNode = data["linear"].node!!.cast<Int>()//linearDep()
|
||||
return@transform DataNode(Int::class) {
|
||||
DataTree<Int> {
|
||||
squareNode.dataSequence().forEach { (name, _) ->
|
||||
val newData = Data {
|
||||
val squareValue = squareNode[name].data!!.get()
|
||||
|
Loading…
Reference in New Issue
Block a user