Remove unnecessary scope in hub message flow

This commit is contained in:
Alexander Nozik 2024-02-27 10:31:35 +03:00
parent 57e9df140b
commit 9edf3b13ef
3 changed files with 16 additions and 24 deletions

View File

@ -19,6 +19,7 @@
### Fixed
- Property writing does not trigger change if logical state already is the same as value to be set.
- Modbus-slave triggers only once for multi-register write.
- Removed unnecessary scope in hub messageFlow
### Security

View File

@ -1,10 +1,9 @@
package space.kscience.controls.manager
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.merge
import space.kscience.controls.api.*
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.plus
@ -74,7 +73,7 @@ public suspend fun Device.respondMessage(deviceTarget: Name, request: DeviceMess
public suspend fun DeviceHub.respondHubMessage(request: DeviceMessage): List<DeviceMessage> {
return try {
val targetName = request.targetDevice
if(targetName == null) {
if (targetName == null) {
buildDeviceTree().mapNotNull {
it.value.respondMessage(it.key, request)
}
@ -90,27 +89,19 @@ public suspend fun DeviceHub.respondHubMessage(request: DeviceMessage): List<Dev
/**
* Collect all messages from given [DeviceHub], applying proper relative names.
*/
public fun DeviceHub.hubMessageFlow(scope: CoroutineScope): Flow<DeviceMessage> {
public fun DeviceHub.hubMessageFlow(): Flow<DeviceMessage> {
//TODO could we avoid using downstream scope?
val outbox = MutableSharedFlow<DeviceMessage>()
if (this is Device) {
messageFlow.onEach {
outbox.emit(it)
}.launchIn(scope)
}
//TODO maybe better create map of all devices to limit copying
devices.forEach { (token, childDevice) ->
val flow = if (childDevice is DeviceHub) {
childDevice.hubMessageFlow(scope)
val deviceMessageFlow = if (this is Device) messageFlow else emptyFlow()
val childrenFlows = devices.map { (token, childDevice) ->
if (childDevice is DeviceHub) {
childDevice.hubMessageFlow()
} else {
childDevice.messageFlow
}
flow.onEach { deviceMessage ->
outbox.emit(
}.map { deviceMessage ->
deviceMessage.changeSource { token + it }
)
}.launchIn(scope)
}
return outbox
}
return merge(deviceMessageFlow, *childrenFlows.toTypedArray())
}

View File

@ -12,7 +12,7 @@ import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.context.debug
import space.kscience.dataforge.context.logger
//TODO replace by plugin?
public fun DeviceManager.storage(
factory: Factory<DeviceMessageStorage>,
): DeviceMessageStorage = factory.build(context, meta)