Some name refactoring

This commit is contained in:
Alexander Nozik 2021-11-24 20:26:29 +03:00
parent d178c4ff0d
commit c423dc214e
22 changed files with 55 additions and 45 deletions

View File

@ -4,7 +4,7 @@ plugins {
allprojects {
group = "space.kscience"
version = "0.5.2-dev-3"
version = "0.5.2-dev-4"
repositories{
mavenCentral()
}

View File

@ -76,7 +76,7 @@ internal class MapAction<in T : Any, out R : Any>(
return newData.named(newName)
}
val flow = dataSet.flow().map(::mapOne)
val flow = dataSet.flowData().map(::mapOne)
return ActiveDataTree(outputType) {
populate(flow)

View File

@ -84,7 +84,7 @@ internal class ReduceAction<T : Any, R : Any>(
override fun CoroutineScope.transform(set: DataSet<T>, meta: Meta, key: Name): Flow<NamedData<R>> = flow {
ReduceGroupBuilder<T, R>(inputType, this@transform, meta).apply(action).buildGroups(set).forEach { group ->
val dataFlow: Map<Name, Data<T>> = group.set.flow().fold(HashMap()) { acc, value ->
val dataFlow: Map<Name, Data<T>> = group.set.flowData().fold(HashMap()) { acc, value ->
acc.apply {
acc[value.name] = value.data
}

View File

@ -72,7 +72,7 @@ internal class SplitAction<T : Any, R : Any>(
}
return ActiveDataTree<R>(outputType) {
populate(dataSet.flow().flatMapConcat(transform = ::splitOne))
populate(dataSet.flowData().flatMapConcat(transform = ::splitOne))
scope?.launch {
dataSet.updates.collect { name ->
//clear old nodes

View File

@ -17,10 +17,8 @@ public interface DataSet<out T : Any> {
/**
* Traverse this provider or its child. The order is not guaranteed.
* [root] points to a root name for traversal. If it is empty, traverse this source, if it points to a [Data],
* return flow, that contains single [Data], if it points to a node with children, return children.
*/
public fun flow(): Flow<NamedData<T>>
public fun flowData(): Flow<NamedData<T>>
/**
* Get data with given name.
@ -31,7 +29,7 @@ public interface DataSet<out T : Any> {
* Get a snapshot of names of top level children of given node. Empty if node does not exist or is a leaf.
*/
public suspend fun listTop(prefix: Name = Name.EMPTY): List<Name> =
flow().map { it.name }.filter { it.startsWith(prefix) && (it.length == prefix.length + 1) }.toList()
flowData().map { it.name }.filter { it.startsWith(prefix) && (it.length == prefix.length + 1) }.toList()
// By default, traverses the whole tree. Could be optimized in descendants
public companion object {
@ -45,7 +43,7 @@ public interface DataSet<out T : Any> {
//private val nothing: Nothing get() = error("this is nothing")
override fun flow(): Flow<NamedData<Nothing>> = emptyFlow()
override fun flowData(): Flow<NamedData<Nothing>> = emptyFlow()
override suspend fun getData(name: Name): Data<Nothing>? = null
}
@ -67,7 +65,7 @@ public val <T : Any> DataSet<T>.updates: Flow<Name> get() = if (this is ActiveDa
/**
* Flow all data nodes with names starting with [branchName]
*/
public fun <T : Any> DataSet<T>.flowChildren(branchName: Name): Flow<NamedData<T>> = this@flowChildren.flow().filter {
public fun <T : Any> DataSet<T>.flowChildren(branchName: Name): Flow<NamedData<T>> = this@flowChildren.flowData().filter {
it.name.startsWith(branchName)
}
@ -75,7 +73,7 @@ public fun <T : Any> DataSet<T>.flowChildren(branchName: Name): Flow<NamedData<T
* Start computation for all goals in data node and return a job for the whole node
*/
public fun <T : Any> DataSet<T>.startAll(coroutineScope: CoroutineScope): Job = coroutineScope.launch {
flow().map {
flowData().map {
it.launch(this@launch)
}.toList().joinAll()
}
@ -83,7 +81,7 @@ public fun <T : Any> DataSet<T>.startAll(coroutineScope: CoroutineScope): Job =
public suspend fun <T : Any> DataSet<T>.join(): Unit = coroutineScope { startAll(this).join() }
public suspend fun DataSet<*>.toMeta(): Meta = Meta {
flow().collect {
flowData().collect {
if (it.name.endsWith(DataSet.META_KEY)) {
set(it.name, it.meta)
} else {

View File

@ -30,7 +30,7 @@ public interface DataSetBuilder<in T : Any> {
}
//Set new items
dataSet.flow().collect {
dataSet.flowData().collect {
emit(name + it.name, it.data)
}
}
@ -139,7 +139,7 @@ public suspend inline fun <reified T : Any> DataSetBuilder<T>.static(
*/
@DFExperimental
public suspend fun <T : Any> DataSetBuilder<T>.populate(tree: DataSet<T>): Unit = coroutineScope {
tree.flow().collect {
tree.flowData().collect {
//TODO check if the place is occupied
emit(it.name, it.data)
}

View File

@ -32,12 +32,12 @@ public interface DataTree<out T : Any> : DataSet<T> {
*/
public suspend fun items(): Map<NameToken, DataTreeItem<T>>
override fun flow(): Flow<NamedData<T>> = flow {
override fun flowData(): Flow<NamedData<T>> = flow {
items().forEach { (token, childItem: DataTreeItem<T>) ->
if(!token.body.startsWith("@")) {
when (childItem) {
is DataTreeItem.Leaf -> emit(childItem.data.named(token.asName()))
is DataTreeItem.Node -> emitAll(childItem.tree.flow().map { it.named(token + it.name) })
is DataTreeItem.Node -> emitAll(childItem.tree.flowData().map { it.named(token + it.name) })
}
}
}

View File

@ -44,7 +44,7 @@ public interface GroupRule {
): Map<String, DataSet<T>> {
val map = HashMap<String, ActiveDataTree<T>>()
set.flow().collect { data ->
set.flowData().collect { data ->
val tagValue = data.meta[key]?.string ?: defaultTagValue
map.getOrPut(tagValue) { ActiveDataTree(set.dataType) }.emit(data.name, data.data)
}

View File

@ -53,7 +53,7 @@ internal class StaticDataTree<T : Any>(
set(name, DataTreeItem.Node(dataSet))
} else {
coroutineScope {
dataSet.flow().collect {
dataSet.flowData().collect {
emit(name + it.name, it.data)
}
}

View File

@ -20,8 +20,8 @@ public fun <T : Any> DataSet<T>.filter(
): ActiveDataSet<T> = object : ActiveDataSet<T> {
override val dataType: KType get() = this@filter.dataType
override fun flow(): Flow<NamedData<T>> =
this@filter.flow().filter { predicate(it.name, it.data) }
override fun flowData(): Flow<NamedData<T>> =
this@filter.flowData().filter { predicate(it.name, it.data) }
override suspend fun getData(name: Name): Data<T>? = this@filter.getData(name)?.takeIf {
predicate(name, it)
@ -40,7 +40,7 @@ public fun <T : Any> DataSet<T>.withNamePrefix(prefix: Name): DataSet<T> = if (p
else object : ActiveDataSet<T> {
override val dataType: KType get() = this@withNamePrefix.dataType
override fun flow(): Flow<NamedData<T>> = this@withNamePrefix.flow().map { it.data.named(prefix + it.name) }
override fun flowData(): Flow<NamedData<T>> = this@withNamePrefix.flowData().map { it.data.named(prefix + it.name) }
override suspend fun getData(name: Name): Data<T>? =
name.removeHeadOrNull(name)?.let { this@withNamePrefix.getData(it) }
@ -56,7 +56,7 @@ public fun <T : Any> DataSet<T>.branch(branchName: Name): DataSet<T> = if (branc
} else object : ActiveDataSet<T> {
override val dataType: KType get() = this@branch.dataType
override fun flow(): Flow<NamedData<T>> = this@branch.flow().mapNotNull {
override fun flowData(): Flow<NamedData<T>> = this@branch.flowData().mapNotNull {
it.name.removeHeadOrNull(branchName)?.let { name ->
it.data.named(name)
}

View File

@ -144,7 +144,7 @@ public suspend fun <T : Any, R : Any> DataSet<T>.map(
block: suspend (T) -> R,
): DataTree<R> = DataTree<R>(outputType) {
populate(
flow().map {
flowData().map {
val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal()
Data(outputType, newMeta, coroutineContext, listOf(it)) {
block(it.await())
@ -162,7 +162,7 @@ public suspend inline fun <T : Any, reified R : Any> DataSet<T>.map(
public suspend fun <T : Any> DataSet<T>.forEach(block: suspend (NamedData<T>) -> Unit) {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
flow().collect {
flowData().collect {
block(it)
}
}
@ -171,11 +171,11 @@ public suspend inline fun <T : Any, reified R : Any> DataSet<T>.reduceToData(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
noinline transformation: suspend (Flow<NamedData<T>>) -> R,
): Data<R> = flow().reduceToData(coroutineContext, meta, transformation)
): Data<R> = flowData().reduceToData(coroutineContext, meta, transformation)
public suspend inline fun <T : Any, reified R : Any> DataSet<T>.foldToData(
initial: R,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
noinline block: suspend (result: R, data: NamedData<T>) -> R,
): Data<R> = flow().foldToData(initial, coroutineContext, meta, block)
): Data<R> = flowData().foldToData(initial, coroutineContext, meta, block)

View File

@ -43,7 +43,7 @@ public fun <R : Any> DataSet<*>.select(
&& (namePattern == null || name.matches(namePattern))
&& filter(name, datum.meta)
override fun flow(): Flow<NamedData<R>> = this@select.flow().filter {
override fun flowData(): Flow<NamedData<R>> = this@select.flowData().filter {
checkDatum(it.name, it.data)
}.map {
@Suppress("UNCHECKED_CAST")

View File

@ -54,6 +54,7 @@ public fun Binary.toByteArray(): ByteArray = if (this is ByteArrayBinary) {
}
}
//TODO optimize for file-based Inputs
public fun Input.readBinary(size: Int): Binary {
val array = readBytes(size)
return ByteArrayBinary(array)

View File

@ -29,7 +29,6 @@ public class IOPlugin(meta: Meta) : AbstractPlugin(meta) {
}
}
public val metaFormatFactories: Collection<MetaFormatFactory> by lazy {
context.gather<MetaFormatFactory>(META_FORMAT_TYPE).values
}

View File

@ -155,7 +155,7 @@ public fun MutableMeta.getOrCreate(key: String): MutableMeta = getOrCreate(Name.
public interface MutableTypedMeta<M : MutableTypedMeta<M>> : TypedMeta<M>, MutableMeta {
/**
* Zero-copy attach or replace existing node. Node is used with any additional state, listeners, etc.
* Zero-copy (if possible) attach or replace existing node. Node is used with any additional state, listeners, etc.
* In some cases it is possible to have the same node as a child to several others
*/
@DFExperimental

View File

@ -32,7 +32,7 @@ public open class Scheme : Described, MetaRepr, MutableMetaProvider, Configurabl
internal fun wrap(
newMeta: MutableMeta,
preserveDefault: Boolean = false
preserveDefault: Boolean = false,
) {
if (preserveDefault) {
defaultMeta = targetMeta.seal()
@ -120,7 +120,11 @@ public open class Scheme : Described, MetaRepr, MutableMetaProvider, Configurabl
@DFExperimental
override fun attach(name: Name, node: ObservableMutableMeta) {
TODO("Not yet implemented")
//TODO implement zero-copy attachment
setMeta(name, meta)
node.onChange(this) { changeName ->
setMeta(name + changeName, this[changeName])
}
}
}
@ -139,6 +143,12 @@ public fun <T : Scheme> T.retarget(provider: MutableMeta): T = apply {
*/
public inline operator fun <T : Scheme> T.invoke(block: T.() -> Unit): T = apply(block)
/**
* Create a copy of given [Scheme]
*/
public inline fun <T : Scheme> T.copy(spec: SchemeSpec<T>, block: T.() -> Unit = {}): T =
spec.read(meta.copy()).apply(block)
/**
* A specification for simplified generation of wrappers
*/

View File

@ -201,4 +201,6 @@ public fun Name.removeHeadOrNull(head: Name): Name? = if (startsWith(head)) {
Name(tokens.subList(head.length, length))
} else {
null
}
}
public fun String.parseAsName(): Name = Name.parse(this)

View File

@ -25,7 +25,7 @@ public interface TaskResult<out T : Any> : DataSet<T> {
*/
public val taskMeta: Meta
override fun flow(): Flow<TaskData<T>>
override fun flowData(): Flow<TaskData<T>>
override suspend fun getData(name: Name): TaskData<T>?
}
@ -36,7 +36,7 @@ private class TaskResultImpl<out T : Any>(
override val taskMeta: Meta,
) : TaskResult<T>, DataSet<T> by dataSet {
override fun flow(): Flow<TaskData<T>> = dataSet.flow().map {
override fun flowData(): Flow<TaskData<T>> = dataSet.flowData().map {
workspace.wrapData(it, it.name, taskName, taskMeta)
}

View File

@ -11,11 +11,11 @@ public fun WorkspaceBuilder.data(builder: suspend DataSetBuilder<Any>.() -> Unit
buildData(builder)
}
public inline fun <reified T: Any> TaskResultBuilder<*>.selectData(namePattern: Name? = null): DataSelector<T> = object : DataSelector<T> {
public inline fun <reified T: Any> TaskResultBuilder<*>.data(namePattern: Name? = null): DataSelector<T> = object : DataSelector<T> {
override suspend fun select(workspace: Workspace, meta: Meta): DataSet<T> = workspace.data.select(namePattern)
}
public suspend inline fun <reified T : Any> TaskResultBuilder<*>.from(
public suspend inline fun <reified T : Any> TaskResultBuilder<*>.fromTask(
task: Name,
taskMeta: Meta = Meta.EMPTY,
): DataSet<T> = workspace.produce(task, taskMeta).select()

View File

@ -16,7 +16,7 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
val allData by task<Int> {
val selectedData = workspace.data.select<Int>()
val result: Data<Int> = selectedData.flow().foldToData(0) { result, data ->
val result: Data<Int> = selectedData.flowData().foldToData(0) { result, data ->
result + data.await()
}
emit("result", result)
@ -58,7 +58,7 @@ class DataPropagationTest {
fun testAllData() {
runBlocking {
val node = testWorkspace.produce("Test.allData")
assertEquals(4950, node.flow().single().await())
assertEquals(4950, node.flowData().single().await())
}
}
@ -66,7 +66,7 @@ class DataPropagationTest {
fun testSingleData() {
runBlocking {
val node = testWorkspace.produce("Test.singleData")
assertEquals(12, node.flow().single().await())
assertEquals(12, node.flowData().single().await())
}
}
}

View File

@ -71,7 +71,7 @@ class SimpleWorkspaceTest {
}
val square by task<Int> {
pipeFrom(selectData<Int>()) { arg, name, meta ->
pipeFrom(data<Int>()) { arg, name, meta ->
if (meta["testFlag"].boolean == true) {
println("flag")
}
@ -89,7 +89,7 @@ class SimpleWorkspaceTest {
}
val linear by task<Int> {
pipeFrom(selectData<Int>()) { arg, name, _ ->
pipeFrom(data<Int>()) { arg, name, _ ->
workspace.logger.info { "Starting linear on $name" }
arg * 2 + 1
}
@ -162,7 +162,7 @@ class SimpleWorkspaceTest {
fun testWorkspace() {
runBlocking {
val node = workspace.runBlocking("sum")
val res = node.flow().single()
val res = node.flowData().single()
assertEquals(328350, res.await())
}
}
@ -172,7 +172,7 @@ class SimpleWorkspaceTest {
fun testMetaPropagation() {
runBlocking {
val node = workspace.produce("sum") { "testFlag" put true }
val res = node.flow().single().await()
val res = node.flowData().single().await()
}
}
@ -195,7 +195,7 @@ class SimpleWorkspaceTest {
fun testFilter() {
runBlocking {
val node = workspace.produce("filterOne")
assertEquals(12, node.flow().first().await())
assertEquals(12, node.flowData().first().await())
}
}
}

View File

@ -5,7 +5,7 @@ pluginManagement {
gradlePluginPortal()
}
val toolsVersion = "0.10.5"
val toolsVersion = "0.10.7"
plugins {
id("ru.mipt.npm.gradle.project") version toolsVersion