forked from NPM/numass-framework
Update file watcher
This commit is contained in:
parent
f7d9aff838
commit
ce6da61fcf
@ -31,9 +31,7 @@ interface ValueProvider {
|
||||
fun optValue(path: String): Optional<Value>
|
||||
|
||||
|
||||
fun getValue(path: String): Value {
|
||||
return optValue(path).orElseThrow<NameNotFoundException> { NameNotFoundException(path) }
|
||||
}
|
||||
fun getValue(path: String): Value = optValue(path).orElseThrow { NameNotFoundException(path) }
|
||||
|
||||
@Provides(BOOLEAN_TARGET)
|
||||
|
||||
|
@ -57,7 +57,12 @@ interface FileStorageElementType : StorageElementType, Named {
|
||||
/**
|
||||
* Read given path as [FileStorageElement] with given parent. Returns null if path does not belong to storage
|
||||
*/
|
||||
suspend fun read(context: Context, path: Path, parent: StorageElement? = null): FileStorageElement?
|
||||
suspend fun read(
|
||||
context: Context,
|
||||
path: Path,
|
||||
parent: StorageElement? = null,
|
||||
readMeta: Meta? = null,
|
||||
): FileStorageElement?
|
||||
}
|
||||
|
||||
class FileStorage(
|
||||
@ -75,9 +80,9 @@ class FileStorage(
|
||||
|
||||
override fun getChildren(): Collection<StorageElement> = runBlocking {
|
||||
Files.list(path).toList().map { path ->
|
||||
async{
|
||||
async {
|
||||
type.read(context, path, this@FileStorage).also {
|
||||
if(it == null){
|
||||
if (it == null) {
|
||||
logger.warn("Can't read $path")
|
||||
}
|
||||
}
|
||||
@ -117,14 +122,12 @@ class FileStorage(
|
||||
fun resolveMeta(
|
||||
path: Path,
|
||||
metaReader: (Path) -> Meta? = { EnvelopeType.infer(it)?.reader?.read(it)?.meta },
|
||||
): Meta? {
|
||||
return if (Files.isDirectory(path)) {
|
||||
Files.list(path).asSequence()
|
||||
.find { it.fileName.toString() == "meta.df" || it.fileName.toString() == "meta" }
|
||||
?.let(metaReader)
|
||||
} else {
|
||||
metaReader(path)
|
||||
}
|
||||
): Meta? = if (Files.isDirectory(path)) {
|
||||
Files.list(path).asSequence()
|
||||
.find { it.fileName.toString() == "meta.df" || it.fileName.toString() == "meta" }
|
||||
?.let(metaReader)
|
||||
} else {
|
||||
metaReader(path)
|
||||
}
|
||||
|
||||
fun createMetaEnvelope(meta: Meta): Envelope {
|
||||
@ -167,8 +170,13 @@ class FileStorage(
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun read(context: Context, path: Path, parent: StorageElement?): FileStorageElement? {
|
||||
val meta = resolveMeta(path)
|
||||
override suspend fun read(
|
||||
context: Context,
|
||||
path: Path,
|
||||
parent: StorageElement?,
|
||||
readMeta: Meta?,
|
||||
): FileStorageElement? {
|
||||
val meta = readMeta ?: resolveMeta(path)
|
||||
val name = meta?.optString("name").nullable ?: path.fileName.toString()
|
||||
val type = meta?.optString("type").nullable?.let {
|
||||
context.load<StorageManager>().getType(it)
|
||||
|
@ -299,7 +299,7 @@ class TableLoaderType : FileStorageElementType {
|
||||
})
|
||||
}
|
||||
|
||||
override suspend fun read(context: Context, path: Path, parent: StorageElement?): FileStorageElement? {
|
||||
override suspend fun read(context: Context, path: Path, parent: StorageElement?, readMeta: Meta?): FileStorageElement {
|
||||
val envelope = EnvelopeReader.readFile(path)
|
||||
|
||||
val name = envelope.meta.optString("name").nullable ?: path.fileName.toString()
|
||||
|
@ -106,11 +106,9 @@ class ProtoNumassPoint(override val meta: Meta, val protoBuilder: () -> NumassPr
|
||||
this.data.stream
|
||||
}
|
||||
|
||||
fun fromEnvelope(envelope: Envelope): ProtoNumassPoint {
|
||||
return ProtoNumassPoint(envelope.meta) {
|
||||
envelope.dataStream().use {
|
||||
NumassProto.Point.parseFrom(it)
|
||||
}
|
||||
fun fromEnvelope(envelope: Envelope): ProtoNumassPoint = ProtoNumassPoint(envelope.meta) {
|
||||
envelope.dataStream().use {
|
||||
NumassProto.Point.parseFrom(it)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -34,12 +34,17 @@ import java.nio.file.Path
|
||||
class NumassDirectory : FileStorage.Directory() {
|
||||
override val name: String = NUMASS_DIRECTORY_TYPE
|
||||
|
||||
override suspend fun read(context: Context, path: Path, parent: StorageElement?): FileStorageElement? {
|
||||
val meta = FileStorage.resolveMeta(path){ NumassEnvelopeType.infer(it)?.reader?.read(it)?.meta }
|
||||
override suspend fun read(
|
||||
context: Context,
|
||||
path: Path,
|
||||
parent: StorageElement?,
|
||||
readMeta: Meta?,
|
||||
): FileStorageElement? {
|
||||
val meta = readMeta ?: FileStorage.resolveMeta(path) { NumassEnvelopeType.infer(it)?.reader?.read(it)?.meta }
|
||||
return if (Files.isDirectory(path) && meta != null) {
|
||||
NumassDataLoader(context, parent, path.fileName.toString(), path)
|
||||
} else {
|
||||
super.read(context, path, parent)
|
||||
super.read(context, path, parent, meta)
|
||||
}
|
||||
}
|
||||
|
||||
@ -50,8 +55,8 @@ class NumassDirectory : FileStorage.Directory() {
|
||||
/**
|
||||
* Simple read for scripting and debug
|
||||
*/
|
||||
fun read(context: Context = Global, path: String): FileStorageElement?{
|
||||
return runBlocking { INSTANCE.read(context, context.getDataFile(path).absolutePath)}
|
||||
fun read(context: Context = Global, path: String): FileStorageElement? {
|
||||
return runBlocking { INSTANCE.read(context, context.getDataFile(path).absolutePath) }
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -64,7 +69,7 @@ class NumassDataPointEvent(meta: Meta) : Event(meta) {
|
||||
|
||||
override fun toString(): String {
|
||||
return String.format("(%s) [%s] : pushed numass data file with name '%s' and size '%d'",
|
||||
time().toString(), sourceTag(), fileName, fileSize)
|
||||
time().toString(), sourceTag(), fileName, fileSize)
|
||||
}
|
||||
|
||||
companion object {
|
||||
@ -79,9 +84,9 @@ class NumassDataPointEvent(meta: Meta) : Event(meta) {
|
||||
|
||||
fun builder(source: String, fileName: String, fileSize: Int): EventBuilder<*> {
|
||||
return EventBuilder.make("numass.storage.pushData")
|
||||
.setSource(source)
|
||||
.setMetaValue(FILE_NAME_KEY, fileName)
|
||||
.setMetaValue(FILE_SIZE_KEY, fileSize)
|
||||
.setSource(source)
|
||||
.setMetaValue(FILE_NAME_KEY, fileName)
|
||||
.setMetaValue(FILE_SIZE_KEY, fileSize)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -10,7 +10,8 @@ fun main() {
|
||||
Global.output = FXOutputManager()
|
||||
JFreeChartPlugin().startGlobal()
|
||||
|
||||
val file = File("C:\\Users\\darksnake\\Desktop\\test-data\\p20211012142003(20s)").toPath()
|
||||
val file = File("D:\\Work\\Numass\\data\\test\\7.df").toPath()
|
||||
println(file)
|
||||
val point = ProtoNumassPoint.readFile(file)
|
||||
|
||||
point.events.forEach {
|
||||
|
@ -58,6 +58,7 @@ runtime {
|
||||
"javafx.controls"
|
||||
)
|
||||
jpackage {
|
||||
//installerType = "deb"
|
||||
jvmArgs = addJvmArgs
|
||||
val currentOs = org.gradle.internal.os.OperatingSystem.current()
|
||||
installerOptions = installerOptions + listOf("--vendor", "MIPT-NPM lab")
|
||||
|
@ -1,5 +1,6 @@
|
||||
package inr.numass.viewer
|
||||
|
||||
import hep.dataforge.context.ContextAware
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.storage.tables.TableLoader
|
||||
import hep.dataforge.tables.Adapters
|
||||
@ -12,16 +13,22 @@ import inr.numass.data.analyzers.NumassAnalyzer
|
||||
import inr.numass.data.analyzers.TimeAnalyzer
|
||||
import inr.numass.data.api.NumassPoint
|
||||
import inr.numass.data.api.NumassSet
|
||||
import inr.numass.data.storage.NumassDataLoader
|
||||
import javafx.beans.property.SimpleObjectProperty
|
||||
import javafx.collections.FXCollections
|
||||
import javafx.collections.ObservableList
|
||||
import javafx.collections.ObservableMap
|
||||
import kotlinx.coroutines.Deferred
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.async
|
||||
import kotlinx.coroutines.*
|
||||
import tornadofx.*
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardWatchEventKinds
|
||||
import java.nio.file.WatchKey
|
||||
import java.nio.file.attribute.BasicFileAttributes
|
||||
import kotlin.math.floor
|
||||
|
||||
class DataController : Controller() {
|
||||
private val context = app.context
|
||||
class DataController : Controller(), ContextAware {
|
||||
override val context get() = app.context
|
||||
|
||||
val analyzer = TimeAnalyzer()
|
||||
|
||||
@ -83,17 +90,73 @@ class DataController : Controller() {
|
||||
val points: ObservableMap<String, CachedPoint> = FXCollections.observableHashMap()
|
||||
val sc: ObservableMap<String, TableLoader> = FXCollections.observableHashMap()
|
||||
|
||||
val files: ObservableList<Path> = FXCollections.observableArrayList()
|
||||
|
||||
val watchPathProperty = SimpleObjectProperty<Path?>()
|
||||
|
||||
private var watchJob: Job? = null
|
||||
|
||||
init {
|
||||
watchPathProperty.onChange { watchPath ->
|
||||
watchJob?.cancel()
|
||||
if (watchPath != null) {
|
||||
Files.list(watchPath).toList()
|
||||
.filter {
|
||||
!Files.isDirectory(it) && it.fileName.startsWith(NumassDataLoader.POINT_FRAGMENT_NAME)
|
||||
}
|
||||
.sortedBy { file ->
|
||||
val attr = Files.readAttributes(file, BasicFileAttributes::class.java)
|
||||
attr.creationTime()
|
||||
}.forEach { path ->
|
||||
try {
|
||||
runLater {
|
||||
files.add(path)
|
||||
}
|
||||
} catch (x: Throwable) {
|
||||
app.context.logger.error("Error during dynamic point read", x)
|
||||
}
|
||||
}
|
||||
val watcher = watchPath.fileSystem.newWatchService()
|
||||
watchJob = app.context.launch(Dispatchers.IO) {
|
||||
watcher.use { watcher ->
|
||||
val key: WatchKey = watchPath.register(watcher,
|
||||
StandardWatchEventKinds.ENTRY_CREATE)
|
||||
coroutineContext[Job]?.invokeOnCompletion {
|
||||
key.cancel()
|
||||
}
|
||||
while (isActive) {
|
||||
try {
|
||||
key.pollEvents().forEach { event ->
|
||||
if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
|
||||
val path: Path = event.context() as Path
|
||||
runLater {
|
||||
files.add(watchPath.resolve(path))
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (x: Throwable) {
|
||||
app.context.logger.error("Error during dynamic point read", x)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun clear() {
|
||||
cache.clear()
|
||||
sets.clear()
|
||||
points.clear()
|
||||
sc.clear()
|
||||
watchPathProperty.set(null)
|
||||
}
|
||||
|
||||
|
||||
fun addPoint(id: String, point: NumassPoint) {
|
||||
points[id] = getCachedPoint(id, point)
|
||||
fun addPoint(id: String, point: NumassPoint): CachedPoint {
|
||||
val newPoint = getCachedPoint(id, point)
|
||||
points[id] = newPoint
|
||||
return newPoint
|
||||
}
|
||||
|
||||
fun addSet(id: String, set: NumassSet) {
|
||||
|
@ -4,88 +4,49 @@ import hep.dataforge.fx.dfIconView
|
||||
import hep.dataforge.io.envelopes.Envelope
|
||||
import inr.numass.data.NumassDataUtils
|
||||
import inr.numass.data.NumassEnvelopeType
|
||||
import inr.numass.data.api.NumassPoint
|
||||
import inr.numass.data.storage.NumassDataLoader
|
||||
import javafx.beans.property.SimpleObjectProperty
|
||||
import javafx.collections.FXCollections
|
||||
import javafx.collections.MapChangeListener
|
||||
import javafx.scene.control.ContextMenu
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.isActive
|
||||
import kotlinx.coroutines.launch
|
||||
import tornadofx.*
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardWatchEventKinds.ENTRY_CREATE
|
||||
import java.nio.file.WatchKey
|
||||
|
||||
class DirectoryWatchView : View(title = "Numass storage", icon = dfIconView) {
|
||||
|
||||
val pathProperty = SimpleObjectProperty<Path>()
|
||||
private val dataController by inject<DataController>()
|
||||
|
||||
private val ampView: AmplitudeView by inject()
|
||||
private val timeView: TimeView by inject()
|
||||
|
||||
private var watcherProperty = pathProperty.objectBinding {
|
||||
it?.fileSystem?.newWatchService()
|
||||
// private val files: ObservableList<DataController.CachedPoint> =
|
||||
// FXCollections.observableArrayList<DataController.CachedPoint>().apply {
|
||||
// bind(dataController.points) { _, v -> v }
|
||||
// }
|
||||
|
||||
private fun readPointFile(path: Path): NumassPoint {
|
||||
val envelope: Envelope = NumassEnvelopeType.infer(path)?.reader?.read(path)
|
||||
?: kotlin.error("Can't read point file")
|
||||
return NumassDataUtils.read(envelope)
|
||||
}
|
||||
|
||||
private val files = FXCollections.observableArrayList<DataController.CachedPoint>()
|
||||
|
||||
private var watchJob: Job? = null
|
||||
|
||||
init {
|
||||
dataController.points.addListener(MapChangeListener { change ->
|
||||
if (change.wasAdded()) {
|
||||
files.add(change.valueAdded)
|
||||
} else if (change.wasRemoved()) {
|
||||
files.remove(change.valueRemoved)
|
||||
}
|
||||
})
|
||||
|
||||
watcherProperty.onChange { watchService ->
|
||||
watchJob?.cancel()
|
||||
if (watchService != null) {
|
||||
watchJob = app.context.launch(Dispatchers.IO) {
|
||||
val key: WatchKey = pathProperty.get().register(watchService, ENTRY_CREATE)
|
||||
coroutineContext[Job]?.invokeOnCompletion {
|
||||
key.cancel()
|
||||
}
|
||||
while (isActive) {
|
||||
try {
|
||||
key.pollEvents().forEach { event ->
|
||||
if (event.kind() == ENTRY_CREATE) {
|
||||
val path: Path = event.context() as Path
|
||||
if (path.fileName.toString().startsWith(NumassDataLoader.POINT_FRAGMENT_NAME)) {
|
||||
val envelope: Envelope = NumassEnvelopeType.infer(path)?.reader?.read(path)
|
||||
?: kotlin.error("Can't read point file")
|
||||
val point = NumassDataUtils.read(envelope)
|
||||
files.add(dataController.getCachedPoint(path.toString(), point))
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (x: Throwable) {
|
||||
app.context.logger.error("Error during dynamic point read", x)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
override val root = splitpane {
|
||||
listview(files) {
|
||||
multiSelect(true)
|
||||
cellFormat { value: DataController.CachedPoint ->
|
||||
text = "${value.voltage}[${value.index}]"
|
||||
listview(dataController.files) {
|
||||
//multiSelect(true)
|
||||
cellFormat { path: Path ->
|
||||
text = path.fileName.toString()
|
||||
graphic = null
|
||||
contextMenu = ContextMenu().apply {
|
||||
item("Info") {
|
||||
action {
|
||||
PointInfoView(value).openModal(escapeClosesWindow = true)
|
||||
if (path.fileName.toString().startsWith(NumassDataLoader.POINT_FRAGMENT_NAME)) {
|
||||
val point = readPointFile(path)
|
||||
val cachedPoint = dataController.addPoint(path.toString(), point)
|
||||
//val point = dataController.getCachedPoint(value.toString())
|
||||
contextMenu = ContextMenu().apply {
|
||||
item("Info") {
|
||||
action {
|
||||
PointInfoView(cachedPoint).openModal(escapeClosesWindow = true)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
contextMenu = null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -38,8 +38,7 @@ class MainView : View(title = "Numass viewer", icon = dfIconView) {
|
||||
// addLogHandler(context.logger)
|
||||
// }
|
||||
|
||||
private val pathProperty = SimpleObjectProperty<Path>()
|
||||
private var path: Path by pathProperty
|
||||
private val pathProperty = SimpleObjectProperty<Path?>()
|
||||
|
||||
private val contentViewProperty = SimpleObjectProperty<UIComponent>()
|
||||
private var contentView: UIComponent? by contentViewProperty
|
||||
@ -54,7 +53,6 @@ class MainView : View(title = "Numass viewer", icon = dfIconView) {
|
||||
}
|
||||
|
||||
|
||||
|
||||
override val root = borderpane {
|
||||
prefHeight = 600.0
|
||||
prefWidth = 800.0
|
||||
@ -83,9 +81,10 @@ class MainView : View(title = "Numass viewer", icon = dfIconView) {
|
||||
if (rootDir != null) {
|
||||
NumassProperties.setNumassProperty("numass.viewer.lastPath", rootDir.absolutePath)
|
||||
app.context.launch {
|
||||
val path = rootDir.toPath()
|
||||
dataController.clear()
|
||||
runLater {
|
||||
path = rootDir.toPath()
|
||||
pathProperty.set(path)
|
||||
contentView = null
|
||||
}
|
||||
if (Files.exists(path.resolve(NumassDataLoader.META_FRAGMENT_NAME))) {
|
||||
@ -108,7 +107,8 @@ class MainView : View(title = "Numass viewer", icon = dfIconView) {
|
||||
} else {
|
||||
//build storage
|
||||
app.context.launch {
|
||||
val storageElement = NumassDirectory.INSTANCE.read(app.context, path) as Storage
|
||||
val storageElement =
|
||||
NumassDirectory.INSTANCE.read(app.context, path) as? Storage
|
||||
withContext(Dispatchers.JavaFx) {
|
||||
contentView = storageView
|
||||
storageView.storageProperty.set(storageElement)
|
||||
@ -141,9 +141,10 @@ class MainView : View(title = "Numass viewer", icon = dfIconView) {
|
||||
NumassProperties.setNumassProperty("numass.viewer.lastPath",
|
||||
file.parentFile.absolutePath)
|
||||
app.context.launch {
|
||||
val path = file.toPath()
|
||||
dataController.clear()
|
||||
runLater {
|
||||
path = file.toPath()
|
||||
pathProperty.set(file.toPath())
|
||||
contentView = null
|
||||
}
|
||||
//Reading individual file
|
||||
@ -199,10 +200,12 @@ class MainView : View(title = "Numass viewer", icon = dfIconView) {
|
||||
if (dir != null) {
|
||||
NumassProperties.setNumassProperty("numass.viewer.lastPath", dir.absolutePath)
|
||||
app.context.launch {
|
||||
val path = dir.toPath()
|
||||
dataController.clear()
|
||||
runLater {
|
||||
path = dir.toPath()
|
||||
contentView = null
|
||||
pathProperty.set(path)
|
||||
contentView = directoryWatchView
|
||||
dataController.watchPathProperty.set(path)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user