Fix storage closing

This commit is contained in:
Alexander Nozik 2021-11-14 19:46:00 +03:00
parent 15d17f2cc4
commit 35801d82fc
20 changed files with 161 additions and 173 deletions

View File

@ -403,7 +403,7 @@ open class Context(
get() = plugins[ExecutorPlugin::class] ?: parent?.executors ?: Global.executors
override val coroutineContext: CoroutineContext
get() = this.executors.coroutineContext
get() = executors.coroutineContext
companion object {

View File

@ -41,7 +41,7 @@ interface ExecutorPlugin : Plugin, CoroutineScope {
@PluginDef(group = "hep.dataforge", name = "executor", support = true, info = "Executor plugin")
class DefaultExecutorPlugin(meta: Meta = Meta.empty()) : BasicPlugin(meta), ExecutorPlugin {
private val executors = HashMap<Meta, ExecutorService>();
private val executors = HashMap<Meta, ExecutorService>()
/**
* Create a default executor that uses plugin meta
@ -51,21 +51,16 @@ class DefaultExecutorPlugin(meta: Meta = Meta.empty()) : BasicPlugin(meta), Exec
getExecutor(meta)
}
override fun getExecutor(meta: Meta): ExecutorService {
synchronized(context) {
return executors.getOrPut(meta) {
val workerName = meta.getString("workerName", "worker");
@Synchronized
override fun getExecutor(meta: Meta): ExecutorService = executors.getOrPut(meta) {
val workerName = meta.getString("workerName", "worker")
val threads = meta.getInt("threads", Runtime.getRuntime().availableProcessors())
val factory = { pool: ForkJoinPool ->
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool).apply {
name = "${context.name}_$workerName-$poolIndex"
}
}
ForkJoinPool(
threads,
factory, null, false)
}
}
ForkJoinPool(threads, factory, null, false)
}
override val coroutineContext: CoroutineContext by lazy { defaultExecutor.asCoroutineDispatcher() }

View File

@ -45,9 +45,7 @@ interface EnvelopeReader {
/**
* Read the envelope from channel
*/
fun read(channel: ReadableByteChannel): Envelope {
return read(Channels.newInputStream(channel))
}
fun read(channel: ReadableByteChannel): Envelope = read(Channels.newInputStream(channel))
/**
* Read the envelope from buffer (could produce lazy envelope)
@ -59,9 +57,7 @@ interface EnvelopeReader {
/**
* Read the envelope from NIO file (could produce lazy envelope)
*/
fun read(file: Path): Envelope {
return Files.newByteChannel(file, READ).use { read(it) }
}
fun read(file: Path): Envelope = Files.newByteChannel(file, READ).use { read(it) }
companion object {

View File

@ -160,24 +160,22 @@ interface Value : Serializable, Comparable<Value> {
* @param obj a [Object] object.
* @return a [Value] object.
*/
fun of(value: Any?): Value {
return when (value) {
null -> Value.NULL
fun of(value: Any?): Value = when (value) {
null -> NULL
is Value -> value
is Number -> NumberValue(value)
is Instant -> TimeValue(value)
is LocalDateTime -> TimeValue(value)
is Boolean -> BooleanValue.ofBoolean(value)
is String -> StringValue(value)
is Collection<Any?> -> Value.of(value)
is Stream<*> -> Value.of(value.toList())
is Array<*> -> Value.of(value.map(::of))
is Collection<Any?> -> of(value)
is Stream<*> -> of(value.toList())
is Array<*> -> of(value.map(::of))
is Enum<*> -> StringValue(value.name)
else -> StringValue(value.toString())
}
}
}
}
/**
* Java compatibility layer

View File

@ -1,5 +1,6 @@
package hep.dataforge.fx
import hep.dataforge.context.Context
import hep.dataforge.context.Global
import hep.dataforge.goals.Coal
import hep.dataforge.goals.Goal
@ -14,13 +15,13 @@ import javafx.scene.image.ImageView
import javafx.scene.layout.Region
import javafx.scene.paint.Color
import javafx.stage.Stage
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.plus
import tornadofx.*
import java.util.*
import java.util.concurrent.Executor
import java.util.function.BiConsumer
import kotlin.collections.HashMap
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
val dfIcon: Image = Image(Global::class.java.getResourceAsStream("/img/df.png"))
val dfIconView = ImageView(dfIcon)
@ -73,9 +74,14 @@ private fun removeMonitor(component: UIComponent, id: String) {
}
}
fun <R> UIComponent.runGoal(id: String, scope: CoroutineScope = GlobalScope, block: suspend GoalMonitor.() -> R): Coal<R> {
fun <R> UIComponent.runGoal(
context: Context,
id: String,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: suspend GoalMonitor.() -> R,
): Coal<R> {
val monitor = getMonitor(id);
return Coal(scope, Collections.emptyList(), id) {
return Coal(context + coroutineContext, Collections.emptyList(), id) {
monitor.progress = -1.0
block(monitor).also {
monitor.progress = 1.0
@ -146,7 +152,8 @@ fun runNow(r: Runnable) {
class ToggleUIComponent(
val component: UIComponent,
val owner: Node,
val toggle: BooleanProperty) {
val toggle: BooleanProperty,
) {
val stage: Stage by lazy {
val res = component.modalStage ?: component.openWindow(owner = owner.scene.window)
?: throw RuntimeException("Can'topen window for $component")

View File

@ -62,6 +62,10 @@ interface StorageElement : Named, Metoid, Provider, ContextAware, AutoConnectibl
parent?.fullName?.plus(name) ?: Name.ofSingle(name)
}
override fun close() {
//DO nothing
}
companion object {
const val STORAGE_TARGET = "storage"
}
@ -75,14 +79,14 @@ interface Storage : StorageElement {
/**
* Top level children of this storage
*/
val children: Collection<StorageElement>
fun getChildren(): Collection<StorageElement>
/**
* Names of direct children for provider
*/
@get:ProvidesNames(STORAGE_TARGET)
val childrenNames: Collection<String>
get() = runBlocking { children.map { it.name } }
get() = runBlocking { getChildren().map { it.name } }
/**
* Get storage element (name notation for recursive calls). Null if not present
@ -99,7 +103,7 @@ interface Storage : StorageElement {
operator fun get(name: Name): StorageElement? {
return if (name.length == 1) {
children.find { it.name == name.unescaped }
getChildren().find { it.name == name.unescaped }
} else {
(get(name.first) as Storage?)?.get(name.cutFirst())
}
@ -107,14 +111,6 @@ interface Storage : StorageElement {
override fun getDefaultTarget(): String = STORAGE_TARGET
/**
* By default closes all children on close. If overridden, children should be closed before parent.
*/
override fun close() {
children.forEach { it.close() }
}
}
/**
@ -202,7 +198,5 @@ interface StorageElementType : Named {
fun create(context: Context, meta: Meta, parent: StorageElement? = null): StorageElement
fun create(parent: StorageElement, meta: Meta): StorageElement {
return create(parent.context, meta, parent)
}
fun create(parent: StorageElement, meta: Meta): StorageElement = create(parent.context, meta, parent)
}

View File

@ -50,7 +50,6 @@ class StorageConnection(storageFactory: () -> MutableStorage) : Connection, Cont
isOpen = true
}
@Throws(Exception::class)
override fun close() {
if (isOpen) {
storage.close()

View File

@ -27,7 +27,6 @@ import hep.dataforge.meta.buildMeta
import hep.dataforge.nullable
import hep.dataforge.providers.Provides
import hep.dataforge.providers.ProvidesNames
import kotlin.streams.toList
@PluginDef(name = "storage", group = "hep.dataforge", info = "Dataforge root storage plugin")
@ -37,7 +36,7 @@ class StorageManager : BasicPlugin(), MutableStorage {
private val _connectionHelper = ConnectionHelper(this)
private val _children = HashMap<String, StorageElement>()
override val children get() = _children.values
override fun getChildren() = _children.values
override fun getConnectionHelper(): ConnectionHelper = _connectionHelper

View File

@ -20,7 +20,6 @@ import hep.dataforge.Named
import hep.dataforge.asName
import hep.dataforge.connections.ConnectionHelper
import hep.dataforge.context.Context
import hep.dataforge.context.launch
import hep.dataforge.description.ValueDef
import hep.dataforge.description.ValueDefs
import hep.dataforge.io.envelopes.Envelope
@ -33,8 +32,8 @@ import hep.dataforge.storage.MutableStorage
import hep.dataforge.storage.StorageElement
import hep.dataforge.storage.StorageElementType
import hep.dataforge.storage.StorageManager
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.runBlocking
import java.nio.file.*
import kotlin.streams.asSequence
@ -74,16 +73,16 @@ class FileStorage(
override fun getConnectionHelper(): ConnectionHelper = _connectionHelper
private var isInitialized = false
private val _children = HashMap<Path, StorageElement>()
override val children: Collection<StorageElement>
get() = runBlocking(Dispatchers.IO) {
if (!isInitialized) {
refresh()
override fun getChildren(): Collection<StorageElement> = runBlocking {
Files.list(path).toList().map { path ->
async{
type.read(context, path, this@FileStorage).also {
if(it == null){
logger.warn("Can't read $path")
}
_children.values
}
}
}.awaitAll().filterNotNull()
}
@ -105,33 +104,8 @@ class FileStorage(
//TODO actually watch for file change
override fun create(meta: Meta): StorageElement {
val path = path.resolve(meta.getString("path", meta.getString("name")))
return _children.getOrPut(path) {
resolveType(meta)
?.create(this, meta)
?: error("Can't resolve storage element type.")
}
}
/**
* Manually refresh storage state
*/
suspend fun refresh() {
//Remove non-existent entries
_children.keys.filter { !Files.exists(it) }.forEach { _children.remove(it) }
//update existing entries if needed
Files.list(path).map { path ->
launch {
if (!_children.contains(path)) {
type.read(context, path, this@FileStorage)?.let { _children[path] = it }
?: logger.debug("Could not resolve type for $path in $this")
}
}
}.toList().joinAll()
isInitialized = true
}
override fun create(meta: Meta): StorageElement =
resolveType(meta)?.create(this, meta) ?: error("Can't resolve storage element type.")
companion object {
@ -161,7 +135,7 @@ class FileStorage(
return file.fileName.toString().substringBeforeLast(".")
}
val directory = FileStorage.Directory()
val directory = Directory()
}
open class Directory : FileStorageElementType {
@ -208,7 +182,7 @@ class FileStorage(
null
}
} else {
//Otherwise delegate to the type
//Otherwise, delegate to the type
type.read(context, path, parent)
}.also {
if (it != null && parent == null) {

View File

@ -22,7 +22,7 @@ class NumassDataFactory : DataFactory<NumassSet>(NumassSet::class.java) {
*/
private fun Storage.sequence(prefix: Name = Name.empty()): Sequence<Pair<Name, StorageElement>> {
return sequence {
runBlocking { children }.forEach {
runBlocking { getChildren() }.forEach {
val newName = prefix + it.name
yield(Pair(newName, it))
if (it is Storage) {

View File

@ -20,7 +20,7 @@ private suspend fun createSummaryNode(storage: Storage): MetaBuilder {
.setValue("name", storage.name)
.setValue("path", storage.fullName)
storage.children.forEach { element ->
storage.getChildren().forEach { element ->
if(element is Storage && element.name.startsWith("Fill")){
builder.putNode(createSummaryNode(element))
} else if(element is NumassDataLoader){

View File

@ -43,7 +43,7 @@ object Threshold {
fun Storage.loaders(): Sequence<NumassDataLoader> {
return sequence<NumassDataLoader> {
print("Reading ${this@loaders.fullName}")
runBlocking { this@loaders.children }.forEach {
runBlocking { this@loaders.getChildren() }.forEach {
if (it is NumassDataLoader) {
yield(it)
} else if (it is Storage) {

View File

@ -25,6 +25,7 @@ import javafx.collections.ObservableMap
import javafx.scene.control.CheckBox
import javafx.scene.control.ChoiceBox
import javafx.scene.image.ImageView
import kotlinx.coroutines.Dispatchers
import tornadofx.*
class AmplitudeView : View(title = "Numass amplitude spectrum plot", icon = ImageView(dfIcon)) {
@ -127,7 +128,7 @@ class AmplitudeView : View(title = "Numass amplitude spectrum plot", icon = Imag
private fun invalidate() {
data.forEach { (key, point) ->
plots.getOrPut(key) {
runGoal<Plottable>("loadAmplitudeSpectrum_$key") {
runGoal<Plottable>(app.context, "loadAmplitudeSpectrum_$key", Dispatchers.IO) {
val valueAxis = if (normalize) {
NumassAnalyzer.COUNT_RATE_KEY
} else {

View File

@ -15,6 +15,7 @@ import javafx.collections.FXCollections
import javafx.collections.MapChangeListener
import javafx.collections.ObservableMap
import javafx.scene.image.ImageView
import kotlinx.coroutines.Dispatchers
import tornadofx.*
@ -54,7 +55,7 @@ class HVView : View(title = "High voltage time plot", icon = ImageView(dfIcon))
}
if (change.wasAdded()) {
runLater { container.progress = -1.0 }
runGoal("hvData[${change.key}]") {
runGoal(app.context,"hvData[${change.key}]", Dispatchers.IO) {
change.valueAdded.getHvData()
} ui { table ->
if (table != null) {

View File

@ -17,7 +17,10 @@ import javafx.scene.layout.Priority
import javafx.scene.text.Font
import javafx.stage.DirectoryChooser
import javafx.stage.FileChooser
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.javafx.JavaFx
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.controlsfx.control.StatusBar
import tornadofx.*
import java.io.File
@ -28,6 +31,8 @@ class MainView : View(title = "Numass viewer", icon = dfIconView) {
private val pointCache by inject<PointCache>()
val storageView by inject<StorageView>()
private val statusBar = StatusBar()
// private val logFragment = LogFragment().apply {
// addLogHandler(context.logger)
@ -140,7 +145,7 @@ class MainView : View(title = "Numass viewer", icon = dfIconView) {
if (Files.isDirectory(path)) {
if (Files.exists(path.resolve(NumassDataLoader.META_FRAGMENT_NAME))) {
//build set view
runGoal("viewer.load.set[$path]") {
runGoal(app.context, "viewer.load.set[$path]", Dispatchers.IO) {
title = "Load set ($path)"
message = "Building numass set..."
NumassDataLoader(app.context, null, path.fileName.toString(), path)
@ -158,12 +163,12 @@ class MainView : View(title = "Numass viewer", icon = dfIconView) {
}
} else {
//build storage
runGoal("viewer.load.storage[$path]") {
title = "Load storage ($path)"
message = "Building numass storage tree..."
NumassDirectory.INSTANCE.read(app.context, path)
} ui {
contentView = StorageView(it as Storage)
app.context.launch {
val storageElement = NumassDirectory.INSTANCE.read(app.context, path) as Storage
withContext(Dispatchers.JavaFx){
contentView = storageView
storageView.storageProperty.set(storageElement)
}
}
}
} else {

View File

@ -62,37 +62,3 @@ class PointCache : Controller() {
cache.clear()
}
}
//class CachedSet(set: NumassSet, context: Context) {
// override val points: ObservableList<CachedPoint> by lazy {
// set.points.map { CachedPoint(it, context) }.toObservable()
// }
// init {
// var watcher: WatchService? = null
//
// if (set is NumassDataLoader) {
// context.launch(Dispatchers.IO) {
// watcher = set.path.fileSystem.newWatchService()
// try {
// val key: WatchKey = set.path.register(watcher!!, ENTRY_CREATE)
// while (true) {
// key.pollEvents().forEach { event ->
// if (event.kind() == ENTRY_CREATE) {
// val path: Path = event.context() as Path
// if (path.fileName.toString().startsWith(NumassDataLoader.POINT_FRAGMENT_NAME)) {
// val envelope: Envelope = NumassEnvelopeType.infer(path)?.reader?.read(path)
// ?: kotlin.error("Can't read point file")
// val point = NumassDataUtils.read(envelope)
// points.add(CachedPoint(point, context))
// }
// }
// }
// }
// } catch (x: IOException) {
// x.printStackTrace()
// }
// }
// }
// }
//}

View File

@ -16,6 +16,7 @@ import javafx.collections.FXCollections
import javafx.collections.MapChangeListener
import javafx.collections.ObservableMap
import javafx.scene.image.ImageView
import kotlinx.coroutines.Dispatchers
import tornadofx.*
/**
@ -43,7 +44,7 @@ class SlowControlView : View(title = "Numass slow control view", icon = ImageVie
plot.remove(change.key)
}
if (change.wasAdded()) {
runGoal("loadTable[${change.key}]") {
runGoal(app.context,"loadTable[${change.key}]", Dispatchers.IO) {
val plotData = getData(change.valueAdded)
val names = plotData.format.namesAsArray().filter { it != "timestamp" }

View File

@ -3,13 +3,10 @@ package inr.numass.viewer
import hep.dataforge.configure
import hep.dataforge.fx.dfIcon
import hep.dataforge.fx.plots.PlotContainer
import hep.dataforge.fx.runGoal
import hep.dataforge.fx.ui
import hep.dataforge.names.Name
import hep.dataforge.plots.data.DataPlot
import hep.dataforge.plots.jfreechart.JFreeChartFrame
import hep.dataforge.tables.Adapters
import hep.dataforge.values.Values
import inr.numass.data.analyzers.countInWindow
import inr.numass.data.api.NumassSet
import javafx.beans.property.SimpleIntegerProperty
@ -20,6 +17,10 @@ import javafx.geometry.Insets
import javafx.geometry.Orientation
import javafx.scene.image.ImageView
import javafx.util.converter.NumberStringConverter
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.javafx.JavaFx
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.controlsfx.control.RangeSlider
import tornadofx.*
import java.util.concurrent.atomic.AtomicInteger
@ -123,13 +124,13 @@ class SpectrumView : View(title = "Numass spectrum plot", icon = ImageView(dfIco
val plot: DataPlot =
frame.plots[Name.ofSingle(name)] as DataPlot? ?: DataPlot(name).apply { frame.add(this) }
runGoal("spectrumData[$name]") {
set.points.map {
app.context.launch {
val points = set.points.map {
pointCache.getCachedPoint("$name/${it.voltage}[${it.index}]", it)
}.map { cachedPoint ->
val count = cachedPoint.spectrum.await().countInWindow(loChannel.toShort(), upChannel.toShort())
val seconds = cachedPoint.length.toMillis() / 1000.0
runLater {
launch(Dispatchers.JavaFx) {
container.progress = progress.incrementAndGet().toDouble() / totalProgress
}
Adapters.buildXYDataPoint(
@ -138,10 +139,10 @@ class SpectrumView : View(title = "Numass spectrum plot", icon = ImageView(dfIco
sqrt(count.toDouble()) / seconds
)
}
} ui { points: List<Values> ->
withContext(Dispatchers.JavaFx) {
plot.fillData(points)
container.progress = 1.0
//spectrumExportButton.isDisable = false
}
}
}
}

View File

@ -6,19 +6,25 @@ import hep.dataforge.meta.Meta
import hep.dataforge.meta.Metoid
import hep.dataforge.names.AlphanumComparator
import hep.dataforge.storage.Storage
import hep.dataforge.storage.files.FileStorage
import hep.dataforge.storage.files.FileTableLoader
import hep.dataforge.storage.tables.TableLoader
import inr.numass.data.api.NumassPoint
import inr.numass.data.api.NumassSet
import inr.numass.data.storage.NumassDataLoader
import javafx.beans.property.SimpleBooleanProperty
import javafx.beans.property.SimpleObjectProperty
import javafx.collections.ObservableList
import javafx.scene.control.ContextMenu
import javafx.scene.control.TreeItem
import tornadofx.*
import java.nio.file.WatchService
class StorageView(val storage: Storage) : View(title = "Numass storage", icon = dfIconView) {
class StorageView : View(title = "Numass storage", icon = dfIconView) {
val storageProperty = SimpleObjectProperty<Storage>()
val storage by storageProperty
private val pointCache by inject<PointCache>()
@ -28,7 +34,11 @@ class StorageView(val storage: Storage) : View(title = "Numass storage", icon =
private val hvView: HVView by inject()
private val scView: SlowControlView by inject()
init {
private var watcher: WatchService? = null
fun clear() {
watcher?.close()
ampView.clear()
timeView.clear()
spectrumView.clear()
@ -81,7 +91,7 @@ class StorageView(val storage: Storage) : View(title = "Numass storage", icon =
}
fun getChildren(): ObservableList<Container>? = when (content) {
is Storage -> content.children.map {
is Storage -> content.getChildren().map {
buildContainer(it, this)
}.sortedWith(Comparator.comparing({ it.id }, AlphanumComparator)).asObservable()
is NumassSet -> content.points
@ -90,6 +100,35 @@ class StorageView(val storage: Storage) : View(title = "Numass storage", icon =
.asObservable()
else -> null
}
/*
is NumassDataLoader -> {
val res = content.points.sortedBy { it.index }.map { buildContainer(it, this) }.toObservable()
watchJob = app.context.launch(Dispatchers.IO) {
val key: WatchKey = content.path.register(watcher!!, ENTRY_CREATE)
coroutineContext[Job]?.invokeOnCompletion {
key.cancel()
}
while (watcher != null && isActive) {
try {
key.pollEvents().forEach { event ->
if (event.kind() == ENTRY_CREATE) {
val path: Path = event.context() as Path
if (path.fileName.toString().startsWith(NumassDataLoader.POINT_FRAGMENT_NAME)) {
val envelope: Envelope = NumassEnvelopeType.infer(path)?.reader?.read(path)
?: kotlin.error("Can't read point file")
val point = NumassDataUtils.read(envelope)
res.add(buildContainer(point, this@Container))
}
}
}
} catch (x: Throwable) {
app.context.logger.error("Error during dynamic point read", x)
}
}
}
res
}
*/
val hasChildren: Boolean = (content is Storage) || (content is NumassSet)
@ -99,6 +138,9 @@ class StorageView(val storage: Storage) : View(title = "Numass storage", icon =
override val root = splitpane {
treeview<Container> {
//isShowRoot = false
storageProperty.onChange { storage ->
clear()
if (storage == null) return@onChange
root = TreeItem(Container(storage.name, storage))
root.isExpanded = true
lazyPopulate(leafCheck = {
@ -106,6 +148,14 @@ class StorageView(val storage: Storage) : View(title = "Numass storage", icon =
}) {
it.value.getChildren()
}
watcher?.close()
watcher = if (storage is FileStorage) {
storage.path.fileSystem.newWatchService()
} else {
null
}
}
cellFormat { value: Container ->
when (value.content) {
is Storage -> {

View File

@ -21,6 +21,7 @@ import javafx.beans.binding.DoubleBinding
import javafx.collections.FXCollections
import javafx.collections.ObservableMap
import javafx.scene.image.ImageView
import kotlinx.coroutines.Dispatchers
import tornadofx.*
class TimeView : View(title = "Numass time spectrum plot", icon = ImageView(dfIcon)) {
@ -103,7 +104,7 @@ class TimeView : View(title = "Numass time spectrum plot", icon = ImageView(dfIc
private fun invalidate() {
data.forEach { key, point ->
plots.getOrPut(key) {
runGoal<Plottable>("loadAmplitudeSpectrum_$key") {
runGoal<Plottable>(app.context, "loadAmplitudeSpectrum_$key", Dispatchers.IO) {
val initialEstimate = analyzer.analyze(point)
val cr = initialEstimate.getDouble("cr")