From ce6da61fcf731fe7d99809a321265c8a5241feaa Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Mon, 22 Nov 2021 15:35:18 +0300 Subject: [PATCH] Update file watcher --- .../hep/dataforge/values/ValueProvider.kt | 4 +- .../dataforge/storage/files/FileStorage.kt | 34 ++++--- .../storage/files/FileTableLoader.kt | 2 +- .../inr/numass/data/ProtoNumassPoint.kt | 8 +- .../inr/numass/data/storage/NumassStorage.kt | 23 +++-- .../inr/numass/scripts/analysis/test_data.kt | 3 +- numass-viewer/build.gradle.kts | 1 + .../inr/numass/viewer/DataController.kt | 77 ++++++++++++++-- .../inr/numass/viewer/DirectoryWatchView.kt | 89 ++++++------------- .../main/kotlin/inr/numass/viewer/MainView.kt | 19 ++-- 10 files changed, 149 insertions(+), 111 deletions(-) diff --git a/dataforge-core/src/main/kotlin/hep/dataforge/values/ValueProvider.kt b/dataforge-core/src/main/kotlin/hep/dataforge/values/ValueProvider.kt index a0366762..6826dd4e 100644 --- a/dataforge-core/src/main/kotlin/hep/dataforge/values/ValueProvider.kt +++ b/dataforge-core/src/main/kotlin/hep/dataforge/values/ValueProvider.kt @@ -31,9 +31,7 @@ interface ValueProvider { fun optValue(path: String): Optional - fun getValue(path: String): Value { - return optValue(path).orElseThrow { NameNotFoundException(path) } - } + fun getValue(path: String): Value = optValue(path).orElseThrow { NameNotFoundException(path) } @Provides(BOOLEAN_TARGET) diff --git a/dataforge-storage/src/main/kotlin/hep/dataforge/storage/files/FileStorage.kt b/dataforge-storage/src/main/kotlin/hep/dataforge/storage/files/FileStorage.kt index 541aa392..e98089dd 100644 --- a/dataforge-storage/src/main/kotlin/hep/dataforge/storage/files/FileStorage.kt +++ b/dataforge-storage/src/main/kotlin/hep/dataforge/storage/files/FileStorage.kt @@ -57,7 +57,12 @@ interface FileStorageElementType : StorageElementType, Named { /** * Read given path as [FileStorageElement] with given parent. Returns null if path does not belong to storage */ - suspend fun read(context: Context, path: Path, parent: StorageElement? = null): FileStorageElement? + suspend fun read( + context: Context, + path: Path, + parent: StorageElement? = null, + readMeta: Meta? = null, + ): FileStorageElement? } class FileStorage( @@ -75,9 +80,9 @@ class FileStorage( override fun getChildren(): Collection = runBlocking { Files.list(path).toList().map { path -> - async{ + async { type.read(context, path, this@FileStorage).also { - if(it == null){ + if (it == null) { logger.warn("Can't read $path") } } @@ -117,14 +122,12 @@ class FileStorage( fun resolveMeta( path: Path, metaReader: (Path) -> Meta? = { EnvelopeType.infer(it)?.reader?.read(it)?.meta }, - ): Meta? { - return if (Files.isDirectory(path)) { - Files.list(path).asSequence() - .find { it.fileName.toString() == "meta.df" || it.fileName.toString() == "meta" } - ?.let(metaReader) - } else { - metaReader(path) - } + ): Meta? = if (Files.isDirectory(path)) { + Files.list(path).asSequence() + .find { it.fileName.toString() == "meta.df" || it.fileName.toString() == "meta" } + ?.let(metaReader) + } else { + metaReader(path) } fun createMetaEnvelope(meta: Meta): Envelope { @@ -167,8 +170,13 @@ class FileStorage( } } - override suspend fun read(context: Context, path: Path, parent: StorageElement?): FileStorageElement? { - val meta = resolveMeta(path) + override suspend fun read( + context: Context, + path: Path, + parent: StorageElement?, + readMeta: Meta?, + ): FileStorageElement? { + val meta = readMeta ?: resolveMeta(path) val name = meta?.optString("name").nullable ?: path.fileName.toString() val type = meta?.optString("type").nullable?.let { context.load().getType(it) diff --git a/dataforge-storage/src/main/kotlin/hep/dataforge/storage/files/FileTableLoader.kt b/dataforge-storage/src/main/kotlin/hep/dataforge/storage/files/FileTableLoader.kt index 4f7047c9..e247d9af 100644 --- a/dataforge-storage/src/main/kotlin/hep/dataforge/storage/files/FileTableLoader.kt +++ b/dataforge-storage/src/main/kotlin/hep/dataforge/storage/files/FileTableLoader.kt @@ -299,7 +299,7 @@ class TableLoaderType : FileStorageElementType { }) } - override suspend fun read(context: Context, path: Path, parent: StorageElement?): FileStorageElement? { + override suspend fun read(context: Context, path: Path, parent: StorageElement?, readMeta: Meta?): FileStorageElement { val envelope = EnvelopeReader.readFile(path) val name = envelope.meta.optString("name").nullable ?: path.fileName.toString() diff --git a/numass-core/numass-data-proto/src/main/kotlin/inr/numass/data/ProtoNumassPoint.kt b/numass-core/numass-data-proto/src/main/kotlin/inr/numass/data/ProtoNumassPoint.kt index 289f71ec..335fe1b1 100644 --- a/numass-core/numass-data-proto/src/main/kotlin/inr/numass/data/ProtoNumassPoint.kt +++ b/numass-core/numass-data-proto/src/main/kotlin/inr/numass/data/ProtoNumassPoint.kt @@ -106,11 +106,9 @@ class ProtoNumassPoint(override val meta: Meta, val protoBuilder: () -> NumassPr this.data.stream } - fun fromEnvelope(envelope: Envelope): ProtoNumassPoint { - return ProtoNumassPoint(envelope.meta) { - envelope.dataStream().use { - NumassProto.Point.parseFrom(it) - } + fun fromEnvelope(envelope: Envelope): ProtoNumassPoint = ProtoNumassPoint(envelope.meta) { + envelope.dataStream().use { + NumassProto.Point.parseFrom(it) } } diff --git a/numass-core/src/main/kotlin/inr/numass/data/storage/NumassStorage.kt b/numass-core/src/main/kotlin/inr/numass/data/storage/NumassStorage.kt index 68fff09a..8342acae 100644 --- a/numass-core/src/main/kotlin/inr/numass/data/storage/NumassStorage.kt +++ b/numass-core/src/main/kotlin/inr/numass/data/storage/NumassStorage.kt @@ -34,12 +34,17 @@ import java.nio.file.Path class NumassDirectory : FileStorage.Directory() { override val name: String = NUMASS_DIRECTORY_TYPE - override suspend fun read(context: Context, path: Path, parent: StorageElement?): FileStorageElement? { - val meta = FileStorage.resolveMeta(path){ NumassEnvelopeType.infer(it)?.reader?.read(it)?.meta } + override suspend fun read( + context: Context, + path: Path, + parent: StorageElement?, + readMeta: Meta?, + ): FileStorageElement? { + val meta = readMeta ?: FileStorage.resolveMeta(path) { NumassEnvelopeType.infer(it)?.reader?.read(it)?.meta } return if (Files.isDirectory(path) && meta != null) { NumassDataLoader(context, parent, path.fileName.toString(), path) } else { - super.read(context, path, parent) + super.read(context, path, parent, meta) } } @@ -50,8 +55,8 @@ class NumassDirectory : FileStorage.Directory() { /** * Simple read for scripting and debug */ - fun read(context: Context = Global, path: String): FileStorageElement?{ - return runBlocking { INSTANCE.read(context, context.getDataFile(path).absolutePath)} + fun read(context: Context = Global, path: String): FileStorageElement? { + return runBlocking { INSTANCE.read(context, context.getDataFile(path).absolutePath) } } } } @@ -64,7 +69,7 @@ class NumassDataPointEvent(meta: Meta) : Event(meta) { override fun toString(): String { return String.format("(%s) [%s] : pushed numass data file with name '%s' and size '%d'", - time().toString(), sourceTag(), fileName, fileSize) + time().toString(), sourceTag(), fileName, fileSize) } companion object { @@ -79,9 +84,9 @@ class NumassDataPointEvent(meta: Meta) : Event(meta) { fun builder(source: String, fileName: String, fileSize: Int): EventBuilder<*> { return EventBuilder.make("numass.storage.pushData") - .setSource(source) - .setMetaValue(FILE_NAME_KEY, fileName) - .setMetaValue(FILE_SIZE_KEY, fileSize) + .setSource(source) + .setMetaValue(FILE_NAME_KEY, fileName) + .setMetaValue(FILE_SIZE_KEY, fileSize) } } diff --git a/numass-main/src/main/kotlin/inr/numass/scripts/analysis/test_data.kt b/numass-main/src/main/kotlin/inr/numass/scripts/analysis/test_data.kt index eb49dd95..55c3c46d 100644 --- a/numass-main/src/main/kotlin/inr/numass/scripts/analysis/test_data.kt +++ b/numass-main/src/main/kotlin/inr/numass/scripts/analysis/test_data.kt @@ -10,7 +10,8 @@ fun main() { Global.output = FXOutputManager() JFreeChartPlugin().startGlobal() - val file = File("C:\\Users\\darksnake\\Desktop\\test-data\\p20211012142003(20s)").toPath() + val file = File("D:\\Work\\Numass\\data\\test\\7.df").toPath() + println(file) val point = ProtoNumassPoint.readFile(file) point.events.forEach { diff --git a/numass-viewer/build.gradle.kts b/numass-viewer/build.gradle.kts index 59283a22..0cb8f42b 100644 --- a/numass-viewer/build.gradle.kts +++ b/numass-viewer/build.gradle.kts @@ -58,6 +58,7 @@ runtime { "javafx.controls" ) jpackage { + //installerType = "deb" jvmArgs = addJvmArgs val currentOs = org.gradle.internal.os.OperatingSystem.current() installerOptions = installerOptions + listOf("--vendor", "MIPT-NPM lab") diff --git a/numass-viewer/src/main/kotlin/inr/numass/viewer/DataController.kt b/numass-viewer/src/main/kotlin/inr/numass/viewer/DataController.kt index 4825e8e0..7f45e9a6 100644 --- a/numass-viewer/src/main/kotlin/inr/numass/viewer/DataController.kt +++ b/numass-viewer/src/main/kotlin/inr/numass/viewer/DataController.kt @@ -1,5 +1,6 @@ package inr.numass.viewer +import hep.dataforge.context.ContextAware import hep.dataforge.meta.Meta import hep.dataforge.storage.tables.TableLoader import hep.dataforge.tables.Adapters @@ -12,16 +13,22 @@ import inr.numass.data.analyzers.NumassAnalyzer import inr.numass.data.analyzers.TimeAnalyzer import inr.numass.data.api.NumassPoint import inr.numass.data.api.NumassSet +import inr.numass.data.storage.NumassDataLoader +import javafx.beans.property.SimpleObjectProperty import javafx.collections.FXCollections +import javafx.collections.ObservableList import javafx.collections.ObservableMap -import kotlinx.coroutines.Deferred -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.async +import kotlinx.coroutines.* import tornadofx.* +import java.nio.file.Files +import java.nio.file.Path +import java.nio.file.StandardWatchEventKinds +import java.nio.file.WatchKey +import java.nio.file.attribute.BasicFileAttributes import kotlin.math.floor -class DataController : Controller() { - private val context = app.context +class DataController : Controller(), ContextAware { + override val context get() = app.context val analyzer = TimeAnalyzer() @@ -83,17 +90,73 @@ class DataController : Controller() { val points: ObservableMap = FXCollections.observableHashMap() val sc: ObservableMap = FXCollections.observableHashMap() + val files: ObservableList = FXCollections.observableArrayList() + + val watchPathProperty = SimpleObjectProperty() + + private var watchJob: Job? = null + + init { + watchPathProperty.onChange { watchPath -> + watchJob?.cancel() + if (watchPath != null) { + Files.list(watchPath).toList() + .filter { + !Files.isDirectory(it) && it.fileName.startsWith(NumassDataLoader.POINT_FRAGMENT_NAME) + } + .sortedBy { file -> + val attr = Files.readAttributes(file, BasicFileAttributes::class.java) + attr.creationTime() + }.forEach { path -> + try { + runLater { + files.add(path) + } + } catch (x: Throwable) { + app.context.logger.error("Error during dynamic point read", x) + } + } + val watcher = watchPath.fileSystem.newWatchService() + watchJob = app.context.launch(Dispatchers.IO) { + watcher.use { watcher -> + val key: WatchKey = watchPath.register(watcher, + StandardWatchEventKinds.ENTRY_CREATE) + coroutineContext[Job]?.invokeOnCompletion { + key.cancel() + } + while (isActive) { + try { + key.pollEvents().forEach { event -> + if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) { + val path: Path = event.context() as Path + runLater { + files.add(watchPath.resolve(path)) + } + } + } + } catch (x: Throwable) { + app.context.logger.error("Error during dynamic point read", x) + } + } + } + } + } + } + } fun clear() { cache.clear() sets.clear() points.clear() sc.clear() + watchPathProperty.set(null) } - fun addPoint(id: String, point: NumassPoint) { - points[id] = getCachedPoint(id, point) + fun addPoint(id: String, point: NumassPoint): CachedPoint { + val newPoint = getCachedPoint(id, point) + points[id] = newPoint + return newPoint } fun addSet(id: String, set: NumassSet) { diff --git a/numass-viewer/src/main/kotlin/inr/numass/viewer/DirectoryWatchView.kt b/numass-viewer/src/main/kotlin/inr/numass/viewer/DirectoryWatchView.kt index d88d4937..4b955aee 100644 --- a/numass-viewer/src/main/kotlin/inr/numass/viewer/DirectoryWatchView.kt +++ b/numass-viewer/src/main/kotlin/inr/numass/viewer/DirectoryWatchView.kt @@ -4,88 +4,49 @@ import hep.dataforge.fx.dfIconView import hep.dataforge.io.envelopes.Envelope import inr.numass.data.NumassDataUtils import inr.numass.data.NumassEnvelopeType +import inr.numass.data.api.NumassPoint import inr.numass.data.storage.NumassDataLoader -import javafx.beans.property.SimpleObjectProperty -import javafx.collections.FXCollections -import javafx.collections.MapChangeListener import javafx.scene.control.ContextMenu -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch import tornadofx.* import java.nio.file.Path -import java.nio.file.StandardWatchEventKinds.ENTRY_CREATE -import java.nio.file.WatchKey class DirectoryWatchView : View(title = "Numass storage", icon = dfIconView) { - val pathProperty = SimpleObjectProperty() private val dataController by inject() private val ampView: AmplitudeView by inject() private val timeView: TimeView by inject() - private var watcherProperty = pathProperty.objectBinding { - it?.fileSystem?.newWatchService() +// private val files: ObservableList = +// FXCollections.observableArrayList().apply { +// bind(dataController.points) { _, v -> v } +// } + + private fun readPointFile(path: Path): NumassPoint { + val envelope: Envelope = NumassEnvelopeType.infer(path)?.reader?.read(path) + ?: kotlin.error("Can't read point file") + return NumassDataUtils.read(envelope) } - private val files = FXCollections.observableArrayList() - - private var watchJob: Job? = null - - init { - dataController.points.addListener(MapChangeListener { change -> - if (change.wasAdded()) { - files.add(change.valueAdded) - } else if (change.wasRemoved()) { - files.remove(change.valueRemoved) - } - }) - - watcherProperty.onChange { watchService -> - watchJob?.cancel() - if (watchService != null) { - watchJob = app.context.launch(Dispatchers.IO) { - val key: WatchKey = pathProperty.get().register(watchService, ENTRY_CREATE) - coroutineContext[Job]?.invokeOnCompletion { - key.cancel() - } - while (isActive) { - try { - key.pollEvents().forEach { event -> - if (event.kind() == ENTRY_CREATE) { - val path: Path = event.context() as Path - if (path.fileName.toString().startsWith(NumassDataLoader.POINT_FRAGMENT_NAME)) { - val envelope: Envelope = NumassEnvelopeType.infer(path)?.reader?.read(path) - ?: kotlin.error("Can't read point file") - val point = NumassDataUtils.read(envelope) - files.add(dataController.getCachedPoint(path.toString(), point)) - } - } - } - } catch (x: Throwable) { - app.context.logger.error("Error during dynamic point read", x) - } - } - } - } - } - } - - override val root = splitpane { - listview(files) { - multiSelect(true) - cellFormat { value: DataController.CachedPoint -> - text = "${value.voltage}[${value.index}]" + listview(dataController.files) { + //multiSelect(true) + cellFormat { path: Path -> + text = path.fileName.toString() graphic = null - contextMenu = ContextMenu().apply { - item("Info") { - action { - PointInfoView(value).openModal(escapeClosesWindow = true) + if (path.fileName.toString().startsWith(NumassDataLoader.POINT_FRAGMENT_NAME)) { + val point = readPointFile(path) + val cachedPoint = dataController.addPoint(path.toString(), point) + //val point = dataController.getCachedPoint(value.toString()) + contextMenu = ContextMenu().apply { + item("Info") { + action { + PointInfoView(cachedPoint).openModal(escapeClosesWindow = true) + } } } + } else { + contextMenu = null } } } diff --git a/numass-viewer/src/main/kotlin/inr/numass/viewer/MainView.kt b/numass-viewer/src/main/kotlin/inr/numass/viewer/MainView.kt index 64877eca..d1e95a23 100644 --- a/numass-viewer/src/main/kotlin/inr/numass/viewer/MainView.kt +++ b/numass-viewer/src/main/kotlin/inr/numass/viewer/MainView.kt @@ -38,8 +38,7 @@ class MainView : View(title = "Numass viewer", icon = dfIconView) { // addLogHandler(context.logger) // } - private val pathProperty = SimpleObjectProperty() - private var path: Path by pathProperty + private val pathProperty = SimpleObjectProperty() private val contentViewProperty = SimpleObjectProperty() private var contentView: UIComponent? by contentViewProperty @@ -54,7 +53,6 @@ class MainView : View(title = "Numass viewer", icon = dfIconView) { } - override val root = borderpane { prefHeight = 600.0 prefWidth = 800.0 @@ -83,9 +81,10 @@ class MainView : View(title = "Numass viewer", icon = dfIconView) { if (rootDir != null) { NumassProperties.setNumassProperty("numass.viewer.lastPath", rootDir.absolutePath) app.context.launch { + val path = rootDir.toPath() dataController.clear() runLater { - path = rootDir.toPath() + pathProperty.set(path) contentView = null } if (Files.exists(path.resolve(NumassDataLoader.META_FRAGMENT_NAME))) { @@ -108,7 +107,8 @@ class MainView : View(title = "Numass viewer", icon = dfIconView) { } else { //build storage app.context.launch { - val storageElement = NumassDirectory.INSTANCE.read(app.context, path) as Storage + val storageElement = + NumassDirectory.INSTANCE.read(app.context, path) as? Storage withContext(Dispatchers.JavaFx) { contentView = storageView storageView.storageProperty.set(storageElement) @@ -141,9 +141,10 @@ class MainView : View(title = "Numass viewer", icon = dfIconView) { NumassProperties.setNumassProperty("numass.viewer.lastPath", file.parentFile.absolutePath) app.context.launch { + val path = file.toPath() dataController.clear() runLater { - path = file.toPath() + pathProperty.set(file.toPath()) contentView = null } //Reading individual file @@ -199,10 +200,12 @@ class MainView : View(title = "Numass viewer", icon = dfIconView) { if (dir != null) { NumassProperties.setNumassProperty("numass.viewer.lastPath", dir.absolutePath) app.context.launch { + val path = dir.toPath() dataController.clear() runLater { - path = dir.toPath() - contentView = null + pathProperty.set(path) + contentView = directoryWatchView + dataController.watchPathProperty.set(path) } } }