Add buffer to device messages

This commit is contained in:
Alexander Nozik 2023-10-07 18:34:44 +03:00
parent 01606af307
commit f1b63c3951

View File

@ -3,6 +3,7 @@ package space.kscience.controls.spec
import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.newCoroutineContext import kotlinx.coroutines.newCoroutineContext
@ -13,6 +14,8 @@ import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.debug import space.kscience.dataforge.context.debug
import space.kscience.dataforge.context.logger import space.kscience.dataforge.context.logger
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.int
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
@ -47,7 +50,7 @@ private suspend fun <D : Device, I, O> DeviceActionSpec<D, I, O>.executeWithMeta
*/ */
public abstract class DeviceBase<D : Device>( public abstract class DeviceBase<D : Device>(
final override val context: Context, final override val context: Context,
override val meta: Meta = Meta.EMPTY, final override val meta: Meta = Meta.EMPTY,
) : Device { ) : Device {
/** /**
@ -76,7 +79,10 @@ public abstract class DeviceBase<D : Device>(
*/ */
private val logicalState: HashMap<String, Meta?> = HashMap() private val logicalState: HashMap<String, Meta?> = HashMap()
private val sharedMessageFlow: MutableSharedFlow<DeviceMessage> = MutableSharedFlow() private val sharedMessageFlow: MutableSharedFlow<DeviceMessage> = MutableSharedFlow(
replay = meta["message.buffer"].int ?: 1000,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
public override val messageFlow: SharedFlow<DeviceMessage> get() = sharedMessageFlow public override val messageFlow: SharedFlow<DeviceMessage> get() = sharedMessageFlow