From 35801d82fc4d86b695b7e25d664ece1810c707dc Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sun, 14 Nov 2021 19:46:00 +0300 Subject: [PATCH] Fix storage closing --- .../kotlin/hep/dataforge/context/Context.kt | 2 +- .../hep/dataforge/context/ExecutorPlugin.kt | 23 +++---- .../dataforge/io/envelopes/EnvelopeReader.kt | 8 +-- .../main/kotlin/hep/dataforge/values/Value.kt | 28 ++++---- .../main/kotlin/hep/dataforge/fx/KodexFX.kt | 29 +++++--- .../kotlin/hep/dataforge/storage/Storage.kt | 22 +++--- .../dataforge/storage/StorageConnection.kt | 1 - .../hep/dataforge/storage/StorageManager.kt | 3 +- .../dataforge/storage/files/FileStorage.kt | 58 +++++----------- .../numass/data/storage/NumassDataFactory.kt | 2 +- .../inr/numass/scripts/utils/ScanTree.kt | 2 +- .../inr/numass/subthreshold/Threshold.kt | 2 +- .../kotlin/inr/numass/viewer/AmplitudeView.kt | 3 +- .../main/kotlin/inr/numass/viewer/HVView.kt | 3 +- .../main/kotlin/inr/numass/viewer/MainView.kt | 19 ++++-- .../kotlin/inr/numass/viewer/PointCache.kt | 34 ---------- .../inr/numass/viewer/SlowControlView.kt | 3 +- .../kotlin/inr/numass/viewer/SpectrumView.kt | 21 +++--- .../kotlin/inr/numass/viewer/StorageView.kt | 68 ++++++++++++++++--- .../main/kotlin/inr/numass/viewer/TimeView.kt | 3 +- 20 files changed, 161 insertions(+), 173 deletions(-) diff --git a/dataforge-core/src/main/kotlin/hep/dataforge/context/Context.kt b/dataforge-core/src/main/kotlin/hep/dataforge/context/Context.kt index 766d7b4e..ece5f92d 100644 --- a/dataforge-core/src/main/kotlin/hep/dataforge/context/Context.kt +++ b/dataforge-core/src/main/kotlin/hep/dataforge/context/Context.kt @@ -403,7 +403,7 @@ open class Context( get() = plugins[ExecutorPlugin::class] ?: parent?.executors ?: Global.executors override val coroutineContext: CoroutineContext - get() = this.executors.coroutineContext + get() = executors.coroutineContext companion object { diff --git a/dataforge-core/src/main/kotlin/hep/dataforge/context/ExecutorPlugin.kt b/dataforge-core/src/main/kotlin/hep/dataforge/context/ExecutorPlugin.kt index eb9896ce..e388c446 100644 --- a/dataforge-core/src/main/kotlin/hep/dataforge/context/ExecutorPlugin.kt +++ b/dataforge-core/src/main/kotlin/hep/dataforge/context/ExecutorPlugin.kt @@ -41,7 +41,7 @@ interface ExecutorPlugin : Plugin, CoroutineScope { @PluginDef(group = "hep.dataforge", name = "executor", support = true, info = "Executor plugin") class DefaultExecutorPlugin(meta: Meta = Meta.empty()) : BasicPlugin(meta), ExecutorPlugin { - private val executors = HashMap(); + private val executors = HashMap() /** * Create a default executor that uses plugin meta @@ -51,21 +51,16 @@ class DefaultExecutorPlugin(meta: Meta = Meta.empty()) : BasicPlugin(meta), Exec getExecutor(meta) } - override fun getExecutor(meta: Meta): ExecutorService { - synchronized(context) { - return executors.getOrPut(meta) { - val workerName = meta.getString("workerName", "worker"); - val threads = meta.getInt("threads", Runtime.getRuntime().availableProcessors()) - val factory = { pool: ForkJoinPool -> - ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool).apply { - name = "${context.name}_$workerName-$poolIndex" - } - } - ForkJoinPool( - threads, - factory, null, false) + @Synchronized + override fun getExecutor(meta: Meta): ExecutorService = executors.getOrPut(meta) { + val workerName = meta.getString("workerName", "worker") + val threads = meta.getInt("threads", Runtime.getRuntime().availableProcessors()) + val factory = { pool: ForkJoinPool -> + ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool).apply { + name = "${context.name}_$workerName-$poolIndex" } } + ForkJoinPool(threads, factory, null, false) } override val coroutineContext: CoroutineContext by lazy { defaultExecutor.asCoroutineDispatcher() } diff --git a/dataforge-core/src/main/kotlin/hep/dataforge/io/envelopes/EnvelopeReader.kt b/dataforge-core/src/main/kotlin/hep/dataforge/io/envelopes/EnvelopeReader.kt index 28f656cd..a980ec26 100644 --- a/dataforge-core/src/main/kotlin/hep/dataforge/io/envelopes/EnvelopeReader.kt +++ b/dataforge-core/src/main/kotlin/hep/dataforge/io/envelopes/EnvelopeReader.kt @@ -45,9 +45,7 @@ interface EnvelopeReader { /** * Read the envelope from channel */ - fun read(channel: ReadableByteChannel): Envelope { - return read(Channels.newInputStream(channel)) - } + fun read(channel: ReadableByteChannel): Envelope = read(Channels.newInputStream(channel)) /** * Read the envelope from buffer (could produce lazy envelope) @@ -59,9 +57,7 @@ interface EnvelopeReader { /** * Read the envelope from NIO file (could produce lazy envelope) */ - fun read(file: Path): Envelope { - return Files.newByteChannel(file, READ).use { read(it) } - } + fun read(file: Path): Envelope = Files.newByteChannel(file, READ).use { read(it) } companion object { diff --git a/dataforge-core/src/main/kotlin/hep/dataforge/values/Value.kt b/dataforge-core/src/main/kotlin/hep/dataforge/values/Value.kt index d725e958..8eff65fe 100644 --- a/dataforge-core/src/main/kotlin/hep/dataforge/values/Value.kt +++ b/dataforge-core/src/main/kotlin/hep/dataforge/values/Value.kt @@ -160,21 +160,19 @@ interface Value : Serializable, Comparable { * @param obj a [Object] object. * @return a [Value] object. */ - fun of(value: Any?): Value { - return when (value) { - null -> Value.NULL - is Value -> value - is Number -> NumberValue(value) - is Instant -> TimeValue(value) - is LocalDateTime -> TimeValue(value) - is Boolean -> BooleanValue.ofBoolean(value) - is String -> StringValue(value) - is Collection -> Value.of(value) - is Stream<*> -> Value.of(value.toList()) - is Array<*> -> Value.of(value.map(::of)) - is Enum<*> -> StringValue(value.name) - else -> StringValue(value.toString()) - } + fun of(value: Any?): Value = when (value) { + null -> NULL + is Value -> value + is Number -> NumberValue(value) + is Instant -> TimeValue(value) + is LocalDateTime -> TimeValue(value) + is Boolean -> BooleanValue.ofBoolean(value) + is String -> StringValue(value) + is Collection -> of(value) + is Stream<*> -> of(value.toList()) + is Array<*> -> of(value.map(::of)) + is Enum<*> -> StringValue(value.name) + else -> StringValue(value.toString()) } } } diff --git a/dataforge-gui/src/main/kotlin/hep/dataforge/fx/KodexFX.kt b/dataforge-gui/src/main/kotlin/hep/dataforge/fx/KodexFX.kt index dd3b322e..03a873df 100644 --- a/dataforge-gui/src/main/kotlin/hep/dataforge/fx/KodexFX.kt +++ b/dataforge-gui/src/main/kotlin/hep/dataforge/fx/KodexFX.kt @@ -1,5 +1,6 @@ package hep.dataforge.fx +import hep.dataforge.context.Context import hep.dataforge.context.Global import hep.dataforge.goals.Coal import hep.dataforge.goals.Goal @@ -14,13 +15,13 @@ import javafx.scene.image.ImageView import javafx.scene.layout.Region import javafx.scene.paint.Color import javafx.stage.Stage -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.plus import tornadofx.* import java.util.* import java.util.concurrent.Executor import java.util.function.BiConsumer -import kotlin.collections.HashMap +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext val dfIcon: Image = Image(Global::class.java.getResourceAsStream("/img/df.png")) val dfIconView = ImageView(dfIcon) @@ -73,9 +74,14 @@ private fun removeMonitor(component: UIComponent, id: String) { } } -fun UIComponent.runGoal(id: String, scope: CoroutineScope = GlobalScope, block: suspend GoalMonitor.() -> R): Coal { +fun UIComponent.runGoal( + context: Context, + id: String, + coroutineContext: CoroutineContext = EmptyCoroutineContext, + block: suspend GoalMonitor.() -> R, +): Coal { val monitor = getMonitor(id); - return Coal(scope, Collections.emptyList(), id) { + return Coal(context + coroutineContext, Collections.emptyList(), id) { monitor.progress = -1.0 block(monitor).also { monitor.progress = 1.0 @@ -121,9 +127,9 @@ fun addWindowResizeListener(component: Region, action: Runnable) { fun colorToString(color: Color): String { return String.format("#%02X%02X%02X", - (color.red * 255).toInt(), - (color.green * 255).toInt(), - (color.blue * 255).toInt()) + (color.red * 255).toInt(), + (color.green * 255).toInt(), + (color.blue * 255).toInt()) } /** @@ -144,9 +150,10 @@ fun runNow(r: Runnable) { * A display window that could be toggled */ class ToggleUIComponent( - val component: UIComponent, - val owner: Node, - val toggle: BooleanProperty) { + val component: UIComponent, + val owner: Node, + val toggle: BooleanProperty, +) { val stage: Stage by lazy { val res = component.modalStage ?: component.openWindow(owner = owner.scene.window) ?: throw RuntimeException("Can'topen window for $component") diff --git a/dataforge-storage/src/main/kotlin/hep/dataforge/storage/Storage.kt b/dataforge-storage/src/main/kotlin/hep/dataforge/storage/Storage.kt index 0bc50713..e9fec748 100644 --- a/dataforge-storage/src/main/kotlin/hep/dataforge/storage/Storage.kt +++ b/dataforge-storage/src/main/kotlin/hep/dataforge/storage/Storage.kt @@ -62,6 +62,10 @@ interface StorageElement : Named, Metoid, Provider, ContextAware, AutoConnectibl parent?.fullName?.plus(name) ?: Name.ofSingle(name) } + override fun close() { + //DO nothing + } + companion object { const val STORAGE_TARGET = "storage" } @@ -75,14 +79,14 @@ interface Storage : StorageElement { /** * Top level children of this storage */ - val children: Collection + fun getChildren(): Collection /** * Names of direct children for provider */ @get:ProvidesNames(STORAGE_TARGET) val childrenNames: Collection - get() = runBlocking { children.map { it.name } } + get() = runBlocking { getChildren().map { it.name } } /** * Get storage element (name notation for recursive calls). Null if not present @@ -99,7 +103,7 @@ interface Storage : StorageElement { operator fun get(name: Name): StorageElement? { return if (name.length == 1) { - children.find { it.name == name.unescaped } + getChildren().find { it.name == name.unescaped } } else { (get(name.first) as Storage?)?.get(name.cutFirst()) } @@ -107,14 +111,6 @@ interface Storage : StorageElement { override fun getDefaultTarget(): String = STORAGE_TARGET - - /** - * By default closes all children on close. If overridden, children should be closed before parent. - */ - - override fun close() { - children.forEach { it.close() } - } } /** @@ -202,7 +198,5 @@ interface StorageElementType : Named { fun create(context: Context, meta: Meta, parent: StorageElement? = null): StorageElement - fun create(parent: StorageElement, meta: Meta): StorageElement { - return create(parent.context, meta, parent) - } + fun create(parent: StorageElement, meta: Meta): StorageElement = create(parent.context, meta, parent) } \ No newline at end of file diff --git a/dataforge-storage/src/main/kotlin/hep/dataforge/storage/StorageConnection.kt b/dataforge-storage/src/main/kotlin/hep/dataforge/storage/StorageConnection.kt index aecccdef..3ada89e1 100644 --- a/dataforge-storage/src/main/kotlin/hep/dataforge/storage/StorageConnection.kt +++ b/dataforge-storage/src/main/kotlin/hep/dataforge/storage/StorageConnection.kt @@ -50,7 +50,6 @@ class StorageConnection(storageFactory: () -> MutableStorage) : Connection, Cont isOpen = true } - @Throws(Exception::class) override fun close() { if (isOpen) { storage.close() diff --git a/dataforge-storage/src/main/kotlin/hep/dataforge/storage/StorageManager.kt b/dataforge-storage/src/main/kotlin/hep/dataforge/storage/StorageManager.kt index 4ae730c8..7744ff88 100644 --- a/dataforge-storage/src/main/kotlin/hep/dataforge/storage/StorageManager.kt +++ b/dataforge-storage/src/main/kotlin/hep/dataforge/storage/StorageManager.kt @@ -27,7 +27,6 @@ import hep.dataforge.meta.buildMeta import hep.dataforge.nullable import hep.dataforge.providers.Provides import hep.dataforge.providers.ProvidesNames -import kotlin.streams.toList @PluginDef(name = "storage", group = "hep.dataforge", info = "Dataforge root storage plugin") @@ -37,7 +36,7 @@ class StorageManager : BasicPlugin(), MutableStorage { private val _connectionHelper = ConnectionHelper(this) private val _children = HashMap() - override val children get() = _children.values + override fun getChildren() = _children.values override fun getConnectionHelper(): ConnectionHelper = _connectionHelper diff --git a/dataforge-storage/src/main/kotlin/hep/dataforge/storage/files/FileStorage.kt b/dataforge-storage/src/main/kotlin/hep/dataforge/storage/files/FileStorage.kt index a38e0e8d..541aa392 100644 --- a/dataforge-storage/src/main/kotlin/hep/dataforge/storage/files/FileStorage.kt +++ b/dataforge-storage/src/main/kotlin/hep/dataforge/storage/files/FileStorage.kt @@ -20,7 +20,6 @@ import hep.dataforge.Named import hep.dataforge.asName import hep.dataforge.connections.ConnectionHelper import hep.dataforge.context.Context -import hep.dataforge.context.launch import hep.dataforge.description.ValueDef import hep.dataforge.description.ValueDefs import hep.dataforge.io.envelopes.Envelope @@ -33,8 +32,8 @@ import hep.dataforge.storage.MutableStorage import hep.dataforge.storage.StorageElement import hep.dataforge.storage.StorageElementType import hep.dataforge.storage.StorageManager -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.joinAll +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll import kotlinx.coroutines.runBlocking import java.nio.file.* import kotlin.streams.asSequence @@ -74,17 +73,17 @@ class FileStorage( override fun getConnectionHelper(): ConnectionHelper = _connectionHelper - private var isInitialized = false - private val _children = HashMap() - - - override val children: Collection - get() = runBlocking(Dispatchers.IO) { - if (!isInitialized) { - refresh() + override fun getChildren(): Collection = runBlocking { + Files.list(path).toList().map { path -> + async{ + type.read(context, path, this@FileStorage).also { + if(it == null){ + logger.warn("Can't read $path") + } + } } - _children.values - } + }.awaitAll().filterNotNull() + } override fun resolveType(meta: Meta): StorageElementType? { @@ -105,33 +104,8 @@ class FileStorage( //TODO actually watch for file change - override fun create(meta: Meta): StorageElement { - val path = path.resolve(meta.getString("path", meta.getString("name"))) - return _children.getOrPut(path) { - resolveType(meta) - ?.create(this, meta) - ?: error("Can't resolve storage element type.") - } - } - - /** - * Manually refresh storage state - */ - suspend fun refresh() { - //Remove non-existent entries - _children.keys.filter { !Files.exists(it) }.forEach { _children.remove(it) } - - //update existing entries if needed - Files.list(path).map { path -> - launch { - if (!_children.contains(path)) { - type.read(context, path, this@FileStorage)?.let { _children[path] = it } - ?: logger.debug("Could not resolve type for $path in $this") - } - } - }.toList().joinAll() - isInitialized = true - } + override fun create(meta: Meta): StorageElement = + resolveType(meta)?.create(this, meta) ?: error("Can't resolve storage element type.") companion object { @@ -161,7 +135,7 @@ class FileStorage( return file.fileName.toString().substringBeforeLast(".") } - val directory = FileStorage.Directory() + val directory = Directory() } open class Directory : FileStorageElementType { @@ -208,7 +182,7 @@ class FileStorage( null } } else { - //Otherwise delegate to the type + //Otherwise, delegate to the type type.read(context, path, parent) }.also { if (it != null && parent == null) { diff --git a/numass-core/src/main/kotlin/inr/numass/data/storage/NumassDataFactory.kt b/numass-core/src/main/kotlin/inr/numass/data/storage/NumassDataFactory.kt index 89d8d50a..345eafc9 100644 --- a/numass-core/src/main/kotlin/inr/numass/data/storage/NumassDataFactory.kt +++ b/numass-core/src/main/kotlin/inr/numass/data/storage/NumassDataFactory.kt @@ -22,7 +22,7 @@ class NumassDataFactory : DataFactory(NumassSet::class.java) { */ private fun Storage.sequence(prefix: Name = Name.empty()): Sequence> { return sequence { - runBlocking { children }.forEach { + runBlocking { getChildren() }.forEach { val newName = prefix + it.name yield(Pair(newName, it)) if (it is Storage) { diff --git a/numass-main/src/main/kotlin/inr/numass/scripts/utils/ScanTree.kt b/numass-main/src/main/kotlin/inr/numass/scripts/utils/ScanTree.kt index a8a0efe3..738529a5 100644 --- a/numass-main/src/main/kotlin/inr/numass/scripts/utils/ScanTree.kt +++ b/numass-main/src/main/kotlin/inr/numass/scripts/utils/ScanTree.kt @@ -20,7 +20,7 @@ private suspend fun createSummaryNode(storage: Storage): MetaBuilder { .setValue("name", storage.name) .setValue("path", storage.fullName) - storage.children.forEach { element -> + storage.getChildren().forEach { element -> if(element is Storage && element.name.startsWith("Fill")){ builder.putNode(createSummaryNode(element)) } else if(element is NumassDataLoader){ diff --git a/numass-main/src/main/kotlin/inr/numass/subthreshold/Threshold.kt b/numass-main/src/main/kotlin/inr/numass/subthreshold/Threshold.kt index ddd1ace1..0ad196a0 100644 --- a/numass-main/src/main/kotlin/inr/numass/subthreshold/Threshold.kt +++ b/numass-main/src/main/kotlin/inr/numass/subthreshold/Threshold.kt @@ -43,7 +43,7 @@ object Threshold { fun Storage.loaders(): Sequence { return sequence { print("Reading ${this@loaders.fullName}") - runBlocking { this@loaders.children }.forEach { + runBlocking { this@loaders.getChildren() }.forEach { if (it is NumassDataLoader) { yield(it) } else if (it is Storage) { diff --git a/numass-viewer/src/main/kotlin/inr/numass/viewer/AmplitudeView.kt b/numass-viewer/src/main/kotlin/inr/numass/viewer/AmplitudeView.kt index 440dfa6b..d0a49934 100644 --- a/numass-viewer/src/main/kotlin/inr/numass/viewer/AmplitudeView.kt +++ b/numass-viewer/src/main/kotlin/inr/numass/viewer/AmplitudeView.kt @@ -25,6 +25,7 @@ import javafx.collections.ObservableMap import javafx.scene.control.CheckBox import javafx.scene.control.ChoiceBox import javafx.scene.image.ImageView +import kotlinx.coroutines.Dispatchers import tornadofx.* class AmplitudeView : View(title = "Numass amplitude spectrum plot", icon = ImageView(dfIcon)) { @@ -127,7 +128,7 @@ class AmplitudeView : View(title = "Numass amplitude spectrum plot", icon = Imag private fun invalidate() { data.forEach { (key, point) -> plots.getOrPut(key) { - runGoal("loadAmplitudeSpectrum_$key") { + runGoal(app.context, "loadAmplitudeSpectrum_$key", Dispatchers.IO) { val valueAxis = if (normalize) { NumassAnalyzer.COUNT_RATE_KEY } else { diff --git a/numass-viewer/src/main/kotlin/inr/numass/viewer/HVView.kt b/numass-viewer/src/main/kotlin/inr/numass/viewer/HVView.kt index 1bd2b2ff..86b2082c 100644 --- a/numass-viewer/src/main/kotlin/inr/numass/viewer/HVView.kt +++ b/numass-viewer/src/main/kotlin/inr/numass/viewer/HVView.kt @@ -15,6 +15,7 @@ import javafx.collections.FXCollections import javafx.collections.MapChangeListener import javafx.collections.ObservableMap import javafx.scene.image.ImageView +import kotlinx.coroutines.Dispatchers import tornadofx.* @@ -54,7 +55,7 @@ class HVView : View(title = "High voltage time plot", icon = ImageView(dfIcon)) } if (change.wasAdded()) { runLater { container.progress = -1.0 } - runGoal("hvData[${change.key}]") { + runGoal(app.context,"hvData[${change.key}]", Dispatchers.IO) { change.valueAdded.getHvData() } ui { table -> if (table != null) { diff --git a/numass-viewer/src/main/kotlin/inr/numass/viewer/MainView.kt b/numass-viewer/src/main/kotlin/inr/numass/viewer/MainView.kt index 10f55fb4..fa88c13a 100644 --- a/numass-viewer/src/main/kotlin/inr/numass/viewer/MainView.kt +++ b/numass-viewer/src/main/kotlin/inr/numass/viewer/MainView.kt @@ -17,7 +17,10 @@ import javafx.scene.layout.Priority import javafx.scene.text.Font import javafx.stage.DirectoryChooser import javafx.stage.FileChooser +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.javafx.JavaFx import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext import org.controlsfx.control.StatusBar import tornadofx.* import java.io.File @@ -28,6 +31,8 @@ class MainView : View(title = "Numass viewer", icon = dfIconView) { private val pointCache by inject() + val storageView by inject() + private val statusBar = StatusBar() // private val logFragment = LogFragment().apply { // addLogHandler(context.logger) @@ -140,7 +145,7 @@ class MainView : View(title = "Numass viewer", icon = dfIconView) { if (Files.isDirectory(path)) { if (Files.exists(path.resolve(NumassDataLoader.META_FRAGMENT_NAME))) { //build set view - runGoal("viewer.load.set[$path]") { + runGoal(app.context, "viewer.load.set[$path]", Dispatchers.IO) { title = "Load set ($path)" message = "Building numass set..." NumassDataLoader(app.context, null, path.fileName.toString(), path) @@ -158,12 +163,12 @@ class MainView : View(title = "Numass viewer", icon = dfIconView) { } } else { //build storage - runGoal("viewer.load.storage[$path]") { - title = "Load storage ($path)" - message = "Building numass storage tree..." - NumassDirectory.INSTANCE.read(app.context, path) - } ui { - contentView = StorageView(it as Storage) + app.context.launch { + val storageElement = NumassDirectory.INSTANCE.read(app.context, path) as Storage + withContext(Dispatchers.JavaFx){ + contentView = storageView + storageView.storageProperty.set(storageElement) + } } } } else { diff --git a/numass-viewer/src/main/kotlin/inr/numass/viewer/PointCache.kt b/numass-viewer/src/main/kotlin/inr/numass/viewer/PointCache.kt index 22549d3b..e31bf667 100644 --- a/numass-viewer/src/main/kotlin/inr/numass/viewer/PointCache.kt +++ b/numass-viewer/src/main/kotlin/inr/numass/viewer/PointCache.kt @@ -62,37 +62,3 @@ class PointCache : Controller() { cache.clear() } } - - -//class CachedSet(set: NumassSet, context: Context) { -// override val points: ObservableList by lazy { -// set.points.map { CachedPoint(it, context) }.toObservable() -// } -// init { -// var watcher: WatchService? = null -// -// if (set is NumassDataLoader) { -// context.launch(Dispatchers.IO) { -// watcher = set.path.fileSystem.newWatchService() -// try { -// val key: WatchKey = set.path.register(watcher!!, ENTRY_CREATE) -// while (true) { -// key.pollEvents().forEach { event -> -// if (event.kind() == ENTRY_CREATE) { -// val path: Path = event.context() as Path -// if (path.fileName.toString().startsWith(NumassDataLoader.POINT_FRAGMENT_NAME)) { -// val envelope: Envelope = NumassEnvelopeType.infer(path)?.reader?.read(path) -// ?: kotlin.error("Can't read point file") -// val point = NumassDataUtils.read(envelope) -// points.add(CachedPoint(point, context)) -// } -// } -// } -// } -// } catch (x: IOException) { -// x.printStackTrace() -// } -// } -// } -// } -//} diff --git a/numass-viewer/src/main/kotlin/inr/numass/viewer/SlowControlView.kt b/numass-viewer/src/main/kotlin/inr/numass/viewer/SlowControlView.kt index e0307409..9d5d6fad 100644 --- a/numass-viewer/src/main/kotlin/inr/numass/viewer/SlowControlView.kt +++ b/numass-viewer/src/main/kotlin/inr/numass/viewer/SlowControlView.kt @@ -16,6 +16,7 @@ import javafx.collections.FXCollections import javafx.collections.MapChangeListener import javafx.collections.ObservableMap import javafx.scene.image.ImageView +import kotlinx.coroutines.Dispatchers import tornadofx.* /** @@ -43,7 +44,7 @@ class SlowControlView : View(title = "Numass slow control view", icon = ImageVie plot.remove(change.key) } if (change.wasAdded()) { - runGoal("loadTable[${change.key}]") { + runGoal(app.context,"loadTable[${change.key}]", Dispatchers.IO) { val plotData = getData(change.valueAdded) val names = plotData.format.namesAsArray().filter { it != "timestamp" } diff --git a/numass-viewer/src/main/kotlin/inr/numass/viewer/SpectrumView.kt b/numass-viewer/src/main/kotlin/inr/numass/viewer/SpectrumView.kt index 95f18992..0c4fca1c 100644 --- a/numass-viewer/src/main/kotlin/inr/numass/viewer/SpectrumView.kt +++ b/numass-viewer/src/main/kotlin/inr/numass/viewer/SpectrumView.kt @@ -3,13 +3,10 @@ package inr.numass.viewer import hep.dataforge.configure import hep.dataforge.fx.dfIcon import hep.dataforge.fx.plots.PlotContainer -import hep.dataforge.fx.runGoal -import hep.dataforge.fx.ui import hep.dataforge.names.Name import hep.dataforge.plots.data.DataPlot import hep.dataforge.plots.jfreechart.JFreeChartFrame import hep.dataforge.tables.Adapters -import hep.dataforge.values.Values import inr.numass.data.analyzers.countInWindow import inr.numass.data.api.NumassSet import javafx.beans.property.SimpleIntegerProperty @@ -20,6 +17,10 @@ import javafx.geometry.Insets import javafx.geometry.Orientation import javafx.scene.image.ImageView import javafx.util.converter.NumberStringConverter +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.javafx.JavaFx +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext import org.controlsfx.control.RangeSlider import tornadofx.* import java.util.concurrent.atomic.AtomicInteger @@ -123,13 +124,13 @@ class SpectrumView : View(title = "Numass spectrum plot", icon = ImageView(dfIco val plot: DataPlot = frame.plots[Name.ofSingle(name)] as DataPlot? ?: DataPlot(name).apply { frame.add(this) } - runGoal("spectrumData[$name]") { - set.points.map { + app.context.launch { + val points = set.points.map { pointCache.getCachedPoint("$name/${it.voltage}[${it.index}]", it) }.map { cachedPoint -> val count = cachedPoint.spectrum.await().countInWindow(loChannel.toShort(), upChannel.toShort()) val seconds = cachedPoint.length.toMillis() / 1000.0 - runLater { + launch(Dispatchers.JavaFx) { container.progress = progress.incrementAndGet().toDouble() / totalProgress } Adapters.buildXYDataPoint( @@ -138,10 +139,10 @@ class SpectrumView : View(title = "Numass spectrum plot", icon = ImageView(dfIco sqrt(count.toDouble()) / seconds ) } - } ui { points: List -> - plot.fillData(points) - container.progress = 1.0 - //spectrumExportButton.isDisable = false + withContext(Dispatchers.JavaFx) { + plot.fillData(points) + container.progress = 1.0 + } } } } diff --git a/numass-viewer/src/main/kotlin/inr/numass/viewer/StorageView.kt b/numass-viewer/src/main/kotlin/inr/numass/viewer/StorageView.kt index 95c67608..bddfce3b 100644 --- a/numass-viewer/src/main/kotlin/inr/numass/viewer/StorageView.kt +++ b/numass-viewer/src/main/kotlin/inr/numass/viewer/StorageView.kt @@ -6,19 +6,25 @@ import hep.dataforge.meta.Meta import hep.dataforge.meta.Metoid import hep.dataforge.names.AlphanumComparator import hep.dataforge.storage.Storage +import hep.dataforge.storage.files.FileStorage import hep.dataforge.storage.files.FileTableLoader import hep.dataforge.storage.tables.TableLoader import inr.numass.data.api.NumassPoint import inr.numass.data.api.NumassSet import inr.numass.data.storage.NumassDataLoader import javafx.beans.property.SimpleBooleanProperty +import javafx.beans.property.SimpleObjectProperty import javafx.collections.ObservableList import javafx.scene.control.ContextMenu import javafx.scene.control.TreeItem import tornadofx.* +import java.nio.file.WatchService -class StorageView(val storage: Storage) : View(title = "Numass storage", icon = dfIconView) { +class StorageView : View(title = "Numass storage", icon = dfIconView) { + + val storageProperty = SimpleObjectProperty() + val storage by storageProperty private val pointCache by inject() @@ -28,7 +34,11 @@ class StorageView(val storage: Storage) : View(title = "Numass storage", icon = private val hvView: HVView by inject() private val scView: SlowControlView by inject() - init { + private var watcher: WatchService? = null + + + fun clear() { + watcher?.close() ampView.clear() timeView.clear() spectrumView.clear() @@ -81,7 +91,7 @@ class StorageView(val storage: Storage) : View(title = "Numass storage", icon = } fun getChildren(): ObservableList? = when (content) { - is Storage -> content.children.map { + is Storage -> content.getChildren().map { buildContainer(it, this) }.sortedWith(Comparator.comparing({ it.id }, AlphanumComparator)).asObservable() is NumassSet -> content.points @@ -90,6 +100,35 @@ class StorageView(val storage: Storage) : View(title = "Numass storage", icon = .asObservable() else -> null } + /* + is NumassDataLoader -> { + val res = content.points.sortedBy { it.index }.map { buildContainer(it, this) }.toObservable() + watchJob = app.context.launch(Dispatchers.IO) { + val key: WatchKey = content.path.register(watcher!!, ENTRY_CREATE) + coroutineContext[Job]?.invokeOnCompletion { + key.cancel() + } + while (watcher != null && isActive) { + try { + key.pollEvents().forEach { event -> + if (event.kind() == ENTRY_CREATE) { + val path: Path = event.context() as Path + if (path.fileName.toString().startsWith(NumassDataLoader.POINT_FRAGMENT_NAME)) { + val envelope: Envelope = NumassEnvelopeType.infer(path)?.reader?.read(path) + ?: kotlin.error("Can't read point file") + val point = NumassDataUtils.read(envelope) + res.add(buildContainer(point, this@Container)) + } + } + } + } catch (x: Throwable) { + app.context.logger.error("Error during dynamic point read", x) + } + } + } + res + } + */ val hasChildren: Boolean = (content is Storage) || (content is NumassSet) @@ -99,13 +138,24 @@ class StorageView(val storage: Storage) : View(title = "Numass storage", icon = override val root = splitpane { treeview { //isShowRoot = false - root = TreeItem(Container(storage.name, storage)) - root.isExpanded = true - lazyPopulate(leafCheck = { - !it.value.hasChildren - }) { - it.value.getChildren() + storageProperty.onChange { storage -> + clear() + if (storage == null) return@onChange + root = TreeItem(Container(storage.name, storage)) + root.isExpanded = true + lazyPopulate(leafCheck = { + !it.value.hasChildren + }) { + it.value.getChildren() + } + watcher?.close() + watcher = if (storage is FileStorage) { + storage.path.fileSystem.newWatchService() + } else { + null + } } + cellFormat { value: Container -> when (value.content) { is Storage -> { diff --git a/numass-viewer/src/main/kotlin/inr/numass/viewer/TimeView.kt b/numass-viewer/src/main/kotlin/inr/numass/viewer/TimeView.kt index 81d7c4c4..ec7d4a25 100644 --- a/numass-viewer/src/main/kotlin/inr/numass/viewer/TimeView.kt +++ b/numass-viewer/src/main/kotlin/inr/numass/viewer/TimeView.kt @@ -21,6 +21,7 @@ import javafx.beans.binding.DoubleBinding import javafx.collections.FXCollections import javafx.collections.ObservableMap import javafx.scene.image.ImageView +import kotlinx.coroutines.Dispatchers import tornadofx.* class TimeView : View(title = "Numass time spectrum plot", icon = ImageView(dfIcon)) { @@ -103,7 +104,7 @@ class TimeView : View(title = "Numass time spectrum plot", icon = ImageView(dfIc private fun invalidate() { data.forEach { key, point -> plots.getOrPut(key) { - runGoal("loadAmplitudeSpectrum_$key") { + runGoal(app.context, "loadAmplitudeSpectrum_$key", Dispatchers.IO) { val initialEstimate = analyzer.analyze(point) val cr = initialEstimate.getDouble("cr")