Reify types for action builders
This commit is contained in:
parent
665f317e4e
commit
f0820a3bed
@ -5,6 +5,7 @@
|
||||
- Add `specOrNull` delegate to meta and Scheme
|
||||
- Suspended read methods to the `Binary`
|
||||
- Synchronously accessed `meta` to all `DataSet`s
|
||||
- More fine-grained types in Action builders.
|
||||
|
||||
### Changed
|
||||
- `Factory` is now `fun interface` and uses `build` instead of `invoke`. `invoke moved to an extension.
|
||||
|
@ -31,15 +31,20 @@ public class MapActionBuilder<T, R>(
|
||||
public var name: Name,
|
||||
public var meta: MutableMeta,
|
||||
public val actionMeta: Meta,
|
||||
public var outputType: KType
|
||||
@PublishedApi internal var outputType: KType,
|
||||
) {
|
||||
|
||||
public lateinit var result: suspend ActionEnv.(T) -> R
|
||||
|
||||
internal fun <R1 : R> result(outputType: KType, f: suspend ActionEnv.(T) -> R1) {
|
||||
this.outputType = outputType
|
||||
result = f;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate the result of goal
|
||||
*/
|
||||
public inline fun <reified R1: R> result(noinline f: suspend ActionEnv.(T) -> R1) {
|
||||
public inline fun <reified R1 : R> result(noinline f: suspend ActionEnv.(T) -> R1) {
|
||||
outputType = typeOf<R1>()
|
||||
result = f;
|
||||
}
|
||||
|
@ -14,13 +14,23 @@ import kotlin.reflect.KType
|
||||
import kotlin.reflect.typeOf
|
||||
|
||||
|
||||
public class JoinGroup<T : Any, R : Any>(public var name: String, internal val set: DataSet<T>) {
|
||||
public class JoinGroup<T : Any, R : Any>(
|
||||
public var name: String,
|
||||
internal val set: DataSet<T>,
|
||||
@PublishedApi internal var outputType: KType,
|
||||
) {
|
||||
|
||||
public var meta: MutableMeta = MutableMeta()
|
||||
|
||||
public lateinit var result: suspend ActionEnv.(Map<Name, T>) -> R
|
||||
|
||||
public fun result(f: suspend ActionEnv.(Map<Name, T>) -> R) {
|
||||
internal fun <R1 : R> result(outputType: KType, f: suspend ActionEnv.(Map<Name, T>) -> R1) {
|
||||
this.outputType = outputType
|
||||
this.result = f;
|
||||
}
|
||||
|
||||
public inline fun <reified R1 : R> result(noinline f: suspend ActionEnv.(Map<Name, T>) -> R1) {
|
||||
outputType = typeOf<R1>()
|
||||
this.result = f;
|
||||
}
|
||||
|
||||
@ -28,9 +38,9 @@ public class JoinGroup<T : Any, R : Any>(public var name: String, internal val s
|
||||
|
||||
@DFBuilder
|
||||
public class ReduceGroupBuilder<T : Any, R : Any>(
|
||||
private val inputType: KType,
|
||||
private val scope: CoroutineScope,
|
||||
public val actionMeta: Meta,
|
||||
private val outputType: KType
|
||||
) {
|
||||
private val groupRules: MutableList<suspend (DataSet<T>) -> List<JoinGroup<T, R>>> = ArrayList();
|
||||
|
||||
@ -40,7 +50,7 @@ public class ReduceGroupBuilder<T : Any, R : Any>(
|
||||
public fun byValue(tag: String, defaultTag: String = "@default", action: JoinGroup<T, R>.() -> Unit) {
|
||||
groupRules += { node ->
|
||||
GroupRule.byMetaValue(scope, tag, defaultTag).gather(node).map {
|
||||
JoinGroup<T, R>(it.key, it.value).apply(action)
|
||||
JoinGroup<T, R>(it.key, it.value, outputType).apply(action)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -52,7 +62,7 @@ public class ReduceGroupBuilder<T : Any, R : Any>(
|
||||
) {
|
||||
groupRules += { source ->
|
||||
listOf(
|
||||
JoinGroup<T, R>(groupName, source.filter(filter)).apply(action)
|
||||
JoinGroup<T, R>(groupName, source.filter(filter), outputType).apply(action)
|
||||
)
|
||||
}
|
||||
}
|
||||
@ -62,19 +72,17 @@ public class ReduceGroupBuilder<T : Any, R : Any>(
|
||||
*/
|
||||
public fun result(resultName: String, f: suspend ActionEnv.(Map<Name, T>) -> R) {
|
||||
groupRules += { node ->
|
||||
listOf(JoinGroup<T, R>(resultName, node).apply { result(f) })
|
||||
listOf(JoinGroup<T, R>(resultName, node, outputType).apply { result(outputType, f) })
|
||||
}
|
||||
}
|
||||
|
||||
internal suspend fun buildGroups(input: DataSet<T>): List<JoinGroup<T, R>> {
|
||||
return groupRules.flatMap { it.invoke(input) }
|
||||
}
|
||||
internal suspend fun buildGroups(input: DataSet<T>): List<JoinGroup<T, R>> =
|
||||
groupRules.flatMap { it.invoke(input) }
|
||||
|
||||
}
|
||||
|
||||
@PublishedApi
|
||||
internal class ReduceAction<T : Any, R : Any>(
|
||||
private val inputType: KType,
|
||||
outputType: KType,
|
||||
private val action: ReduceGroupBuilder<T, R>.() -> Unit,
|
||||
) : CachingAction<T, R>(outputType) {
|
||||
@ -82,7 +90,7 @@ internal class ReduceAction<T : Any, R : Any>(
|
||||
|
||||
|
||||
override fun CoroutineScope.transform(set: DataSet<T>, meta: Meta, key: Name): Flow<NamedData<R>> = flow {
|
||||
ReduceGroupBuilder<T, R>(inputType, this@transform, meta).apply(action).buildGroups(set).forEach { group ->
|
||||
ReduceGroupBuilder<T, R>(this@transform, meta, outputType).apply(action).buildGroups(set).forEach { group ->
|
||||
val dataFlow: Map<Name, Data<T>> = group.set.dataSequence().fold(HashMap()) { acc, value ->
|
||||
acc.apply {
|
||||
acc[value.name] = value.data
|
||||
@ -95,7 +103,7 @@ internal class ReduceAction<T : Any, R : Any>(
|
||||
|
||||
val env = ActionEnv(Name.parse(groupName), groupMeta, meta)
|
||||
@OptIn(DFInternal::class) val res: Data<R> = dataFlow.reduceToData(
|
||||
outputType,
|
||||
group.outputType,
|
||||
meta = groupMeta
|
||||
) { group.result.invoke(env, it) }
|
||||
|
||||
@ -111,4 +119,4 @@ internal class ReduceAction<T : Any, R : Any>(
|
||||
@Suppress("FunctionName")
|
||||
public inline fun <reified T : Any, reified R : Any> Action.Companion.reduce(
|
||||
noinline builder: ReduceGroupBuilder<T, R>.() -> Unit,
|
||||
): Action<T, R> = ReduceAction(typeOf<T>(), typeOf<R>(), builder)
|
||||
): Action<T, R> = ReduceAction(typeOf<R>(), builder)
|
||||
|
@ -1,7 +1,6 @@
|
||||
package space.kscience.dataforge.actions
|
||||
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.FlowPreview
|
||||
import kotlinx.coroutines.launch
|
||||
import space.kscience.dataforge.data.*
|
||||
import space.kscience.dataforge.meta.Laminate
|
||||
@ -18,10 +17,15 @@ import kotlin.reflect.typeOf
|
||||
|
||||
public class SplitBuilder<T : Any, R : Any>(public val name: Name, public val meta: Meta) {
|
||||
|
||||
public class FragmentRule<T : Any, R : Any>(public val name: Name, public var meta: MutableMeta) {
|
||||
public class FragmentRule<T : Any, R : Any>(
|
||||
public val name: Name,
|
||||
public var meta: MutableMeta,
|
||||
@PublishedApi internal var outputType: KType,
|
||||
) {
|
||||
public lateinit var result: suspend (T) -> R
|
||||
|
||||
public fun result(f: suspend (T) -> R) {
|
||||
public inline fun <reified R1 : R> result(noinline f: suspend (T) -> R1) {
|
||||
this.outputType = typeOf<R1>()
|
||||
result = f;
|
||||
}
|
||||
}
|
||||
@ -47,7 +51,6 @@ internal class SplitAction<T : Any, R : Any>(
|
||||
private val action: SplitBuilder<T, R>.() -> Unit,
|
||||
) : Action<T, R> {
|
||||
|
||||
@OptIn(FlowPreview::class)
|
||||
override suspend fun execute(
|
||||
dataSet: DataSet<T>,
|
||||
meta: Meta,
|
||||
@ -62,7 +65,11 @@ internal class SplitAction<T : Any, R : Any>(
|
||||
|
||||
// apply individual fragment rules to result
|
||||
return split.fragments.entries.asSequence().map { (fragmentName, rule) ->
|
||||
val env = SplitBuilder.FragmentRule<T, R>(fragmentName, laminate.toMutableMeta()).apply(rule)
|
||||
val env = SplitBuilder.FragmentRule<T, R>(
|
||||
fragmentName,
|
||||
laminate.toMutableMeta(),
|
||||
outputType
|
||||
).apply(rule)
|
||||
//data.map<R>(outputType, meta = env.meta) { env.result(it) }.named(fragmentName)
|
||||
@OptIn(DFInternal::class) Data(outputType, meta = env.meta, dependencies = listOf(data)) {
|
||||
env.result(data.await())
|
||||
@ -71,7 +78,7 @@ internal class SplitAction<T : Any, R : Any>(
|
||||
}
|
||||
|
||||
return ActiveDataTree<R>(outputType) {
|
||||
populateWith(dataSet.dataSequence().flatMap (transform = ::splitOne))
|
||||
populateWith(dataSet.dataSequence().flatMap(transform = ::splitOne))
|
||||
scope?.launch {
|
||||
dataSet.updates.collect { name ->
|
||||
//clear old nodes
|
||||
|
Loading…
Reference in New Issue
Block a user