diff --git a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/JoinAction.kt b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/JoinAction.kt index 5d9f2517..3d200a14 100644 --- a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/JoinAction.kt +++ b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/JoinAction.kt @@ -3,7 +3,9 @@ package hep.dataforge.data import hep.dataforge.meta.Laminate import hep.dataforge.meta.Meta import hep.dataforge.meta.MetaBuilder +import hep.dataforge.meta.builder import hep.dataforge.names.Name +import hep.dataforge.names.toName import java.util.stream.Collectors import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext @@ -84,9 +86,8 @@ class JoinAction( val laminate = Laminate(group.meta, meta) val goalMap: Map> = group.node - .dataStream() - .filter { it.isValid } - .collect(Collectors.toMap({ it.name }, { it.goal })) + .dataSequence() + .associate { it.first to it.second.goal } val groupName: String = group.name; @@ -94,14 +95,7 @@ class JoinAction( throw AnonymousNotAlowedException("Anonymous groups are not allowed"); } - val env = ActionEnv( - context, - groupName, - laminate.builder, - context.history.getChronicle(Name.joinString(groupName, name)) - ) - - val dispatcher = context + getExecutorService(context, group.meta).asCoroutineDispatcher() + val env = ActionEnv(groupName.toName(), laminate.builder()) val goal = goalMap.join(dispatcher) { group.result.invoke(env, it) } val res = NamedData(env.name, outputType, goal, env.meta)