Migration to new Storage API
This commit is contained in:
parent
35cc334e10
commit
32732c9de6
@ -27,13 +27,11 @@ import hep.dataforge.control.ports.Port
|
||||
import hep.dataforge.control.ports.PortFactory
|
||||
import hep.dataforge.description.ValueDef
|
||||
import hep.dataforge.exceptions.ControlException
|
||||
import hep.dataforge.exceptions.StorageException
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.states.StateDef
|
||||
import hep.dataforge.states.valueState
|
||||
import hep.dataforge.storage.api.TableLoader
|
||||
import hep.dataforge.storage.commons.LoaderFactory
|
||||
import hep.dataforge.storage.commons.StorageConnection
|
||||
import hep.dataforge.storage.StorageConnection
|
||||
import hep.dataforge.storage.TableLoader
|
||||
import hep.dataforge.tables.TableFormat
|
||||
import hep.dataforge.tables.TableFormatBuilder
|
||||
import hep.dataforge.utils.DateTimeUtils
|
||||
|
@ -57,7 +57,7 @@ abstract class NumassControlApplication<in D : Device> : App() {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
val d = deviceFactory.build(ctx, deviceConfig) as D
|
||||
d.init()
|
||||
connectStorage(d, config)
|
||||
d.connectStorage(config)
|
||||
|
||||
return d
|
||||
} catch (e: ControlException) {
|
||||
|
@ -2,16 +2,18 @@ package inr.numass.control
|
||||
|
||||
import hep.dataforge.context.Context
|
||||
import hep.dataforge.context.Global
|
||||
import hep.dataforge.context.launch
|
||||
import hep.dataforge.control.connections.Roles
|
||||
import hep.dataforge.control.devices.Device
|
||||
import hep.dataforge.exceptions.StorageException
|
||||
import hep.dataforge.fx.dfIcon
|
||||
import hep.dataforge.io.MetaFileReader
|
||||
import hep.dataforge.io.XMLMetaReader
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.nullable
|
||||
import hep.dataforge.storage.commons.StorageConnection
|
||||
import hep.dataforge.storage.commons.StorageManager
|
||||
import hep.dataforge.storage.MutableStorage
|
||||
import hep.dataforge.storage.StorageConnection
|
||||
import hep.dataforge.storage.StorageManager
|
||||
import hep.dataforge.storage.createShelf
|
||||
import javafx.application.Application
|
||||
import javafx.stage.Stage
|
||||
import org.slf4j.LoggerFactory
|
||||
@ -31,7 +33,6 @@ fun getRunName(config: Meta): String {
|
||||
config.getString("numass.run")
|
||||
} else if (config.hasMeta("numass.server")) {
|
||||
TODO("Not implemented")
|
||||
|
||||
} else {
|
||||
""
|
||||
}
|
||||
@ -43,25 +44,26 @@ fun getRunName(config: Meta): String {
|
||||
* *
|
||||
* @param config
|
||||
*/
|
||||
fun connectStorage(device: Device, config: Meta) {
|
||||
fun Device.connectStorage(config: Meta) {
|
||||
//TODO add on reset listener
|
||||
if (config.hasMeta("storage") && device.acceptsRole(Roles.STORAGE_ROLE)) {
|
||||
if (config.hasMeta("storage") && acceptsRole(Roles.STORAGE_ROLE)) {
|
||||
val numassRun = getRunName(config)
|
||||
val manager = context.getOrLoad(StorageManager::class.java)
|
||||
|
||||
config.getMetaList("storage").forEach { node ->
|
||||
device.context.logger.info("Creating storage for device with getMeta: {}", node)
|
||||
logger.info("Creating storage for device with getMeta: {}", node)
|
||||
//building storage in a separate thread
|
||||
Thread {
|
||||
var storage = StorageManager.buildStorage(device.context, node)
|
||||
launch {
|
||||
var storage = manager.create(node) as MutableStorage
|
||||
if (!numassRun.isEmpty()) {
|
||||
try {
|
||||
storage = storage.buildShelf(numassRun, Meta.empty())
|
||||
} catch (e: StorageException) {
|
||||
device.context.logger.error("Failed to build shelf", e)
|
||||
storage = storage.createShelf(numassRun)
|
||||
} catch (e: Exception) {
|
||||
logger.error("Failed to build shelf", e)
|
||||
}
|
||||
|
||||
}
|
||||
device.connect(StorageConnection(storage), Roles.STORAGE_ROLE)
|
||||
}.start()
|
||||
connect(StorageConnection { storage }, Roles.STORAGE_ROLE)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -105,7 +107,7 @@ fun findDeviceMeta(config: Meta, criterion: (Meta) -> Boolean): Meta? {
|
||||
|
||||
fun setupContext(meta: Meta): Context {
|
||||
val ctx = Global.getContext("NUMASS-CONTROL")
|
||||
ctx.pluginManager.load(StorageManager::class.java)
|
||||
ctx.plugins.load(StorageManager::class.java)
|
||||
return ctx
|
||||
}
|
||||
|
||||
|
@ -4,18 +4,21 @@ import hep.dataforge.control.connections.DeviceConnection
|
||||
import hep.dataforge.control.connections.Roles
|
||||
import hep.dataforge.control.devices.Device
|
||||
import hep.dataforge.nullable
|
||||
import hep.dataforge.storage.api.Storage
|
||||
import hep.dataforge.storage.api.TableLoader
|
||||
import hep.dataforge.storage.commons.LoaderFactory
|
||||
import hep.dataforge.storage.commons.StorageConnection
|
||||
import hep.dataforge.storage.MutableStorage
|
||||
import hep.dataforge.storage.MutableTableLoader
|
||||
import hep.dataforge.storage.Storage
|
||||
import hep.dataforge.storage.StorageConnection
|
||||
import hep.dataforge.storage.files.createTable
|
||||
import hep.dataforge.tables.TableFormat
|
||||
import hep.dataforge.tables.ValuesListener
|
||||
import hep.dataforge.utils.DateTimeUtils
|
||||
import hep.dataforge.values.Values
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.launch
|
||||
import java.util.*
|
||||
|
||||
class NumassStorageConnection(private val loaderName: String? = null, private val formatBuilder: (Device) -> TableFormat) : DeviceConnection(), ValuesListener {
|
||||
private val loaderMap = HashMap<Storage, TableLoader>()
|
||||
private val loaderMap = HashMap<Storage, MutableTableLoader>()
|
||||
|
||||
|
||||
@Synchronized
|
||||
@ -26,11 +29,13 @@ class NumassStorageConnection(private val loaderName: String? = null, private va
|
||||
val loaderName = "${loaderName ?: device.name}_$suffix"
|
||||
device.forEachConnection(Roles.STORAGE_ROLE, StorageConnection::class.java) { connection ->
|
||||
try {
|
||||
//create a loader instance for each connected storage
|
||||
val pl = loaderMap.computeIfAbsent(connection.storage){storage ->
|
||||
LoaderFactory.buildPointLoader(storage, loaderName, "", "timestamp", format)
|
||||
connection.context.launch(Dispatchers.IO) {
|
||||
//create a loader instance for each connected storage
|
||||
val pl = loaderMap.getOrPut(connection.storage) {
|
||||
(connection.storage as MutableStorage).createTable(loaderName, format)
|
||||
}
|
||||
pl.append(point)
|
||||
}
|
||||
pl.push(point)
|
||||
} catch (ex: Exception) {
|
||||
device.logger.error("Push to loader failed", ex)
|
||||
}
|
||||
|
@ -2,9 +2,12 @@ package inr.numass.control
|
||||
|
||||
import hep.dataforge.control.devices.AbstractDevice
|
||||
import hep.dataforge.nullable
|
||||
import hep.dataforge.storage.api.TableLoader
|
||||
import hep.dataforge.storage.commons.StorageConnection
|
||||
import hep.dataforge.storage.StorageConnection
|
||||
import hep.dataforge.storage.TableLoader
|
||||
|
||||
import hep.dataforge.values.Values
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.launch
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
@ -19,8 +22,10 @@ class StorageHelper(private val device: AbstractDevice, private val loaderFactor
|
||||
if (device.states.optBoolean("storing").nullable == true) {
|
||||
device.forEachConnection("storage", StorageConnection::class.java) { connection ->
|
||||
try {
|
||||
val pl = loaderMap.computeIfAbsent(connection, loaderFactory)
|
||||
pl.push(point)
|
||||
val pl = loaderMap.computeIfAbsent(connection, loaderFactory).mutable()
|
||||
device.context.launch(Dispatchers.IO) {
|
||||
pl.append(point)
|
||||
}
|
||||
} catch (ex: Exception) {
|
||||
device.logger.error("Push to loader failed", ex)
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ import hep.dataforge.names.Name
|
||||
import hep.dataforge.storage.Storage
|
||||
import hep.dataforge.storage.StorageElement
|
||||
import inr.numass.data.api.NumassSet
|
||||
import kotlinx.coroutines.runBlocking
|
||||
|
||||
/**
|
||||
* Created by darksnake on 03-Feb-17.
|
||||
@ -21,7 +22,7 @@ class NumassDataFactory : DataFactory<NumassSet>(NumassSet::class.java) {
|
||||
*/
|
||||
private fun Storage.sequence(prefix: Name = Name.empty()): Sequence<Pair<Name, StorageElement>> {
|
||||
return sequence {
|
||||
runBlocking { getChildren() }.forEach {
|
||||
runBlocking { children }.forEach {
|
||||
val newName = prefix + it.name
|
||||
yield(Pair(newName, it))
|
||||
if (it is Storage) {
|
||||
|
@ -24,6 +24,7 @@ import hep.dataforge.storage.StorageElement
|
||||
import hep.dataforge.storage.files.FileStorage
|
||||
import hep.dataforge.storage.files.FileStorageElement
|
||||
import inr.numass.NumassEnvelopeType
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
|
||||
|
@ -12,8 +12,8 @@ import inr.numass.models.sterile.SterileNeutrinoSpectrum
|
||||
import static hep.dataforge.grind.Grind.buildMeta
|
||||
|
||||
Context ctx = Global.instance()
|
||||
ctx.getPluginManager().load(FXPlotManager)
|
||||
ctx.getPluginManager().load(NumassPlugin.class)
|
||||
ctx.getPlugins().load(FXPlotManager)
|
||||
ctx.getPlugins().load(NumassPlugin.class)
|
||||
|
||||
GrindShell shell = new GrindShell(ctx)
|
||||
|
||||
|
@ -23,7 +23,7 @@ import inr.numass.utils.DataModelUtils
|
||||
|
||||
|
||||
Context ctx = Global.instance()
|
||||
ctx.getPluginManager().load(NumassPlugin)
|
||||
ctx.getPlugins().load(NumassPlugin)
|
||||
|
||||
new GrindShell(ctx).eval {
|
||||
PlotHelper ph = plots
|
||||
|
@ -15,9 +15,9 @@ import inr.numass.data.NumassDataUtils
|
||||
import static hep.dataforge.grind.Grind.buildMeta
|
||||
|
||||
Context ctx = Global.instance()
|
||||
ctx.getPluginManager().load(FXPlotManager)
|
||||
ctx.getPluginManager().load(NumassPlugin.class)
|
||||
ctx.getPluginManager().load(CachePlugin.class)
|
||||
ctx.getPlugins().load(FXPlotManager)
|
||||
ctx.getPlugins().load(NumassPlugin.class)
|
||||
ctx.getPlugins().load(CachePlugin.class)
|
||||
|
||||
Meta meta = buildMeta {
|
||||
data(dir: "D:\\Work\\Numass\\data\\2017_05\\Fill_2", mask: "set_.{1,3}")
|
||||
|
@ -28,8 +28,8 @@ import static inr.numass.data.analyzers.NumassAnalyzer.CHANNEL_KEY
|
||||
import static inr.numass.data.analyzers.NumassAnalyzer.COUNT_RATE_KEY
|
||||
|
||||
Context ctx = Global.instance()
|
||||
ctx.getPluginManager().load(NumassPlugin)
|
||||
ctx.getPluginManager().load(CachePlugin)
|
||||
ctx.getPlugins().load(NumassPlugin)
|
||||
ctx.getPlugins().load(CachePlugin)
|
||||
|
||||
Meta meta = buildMeta(t0: 3e4) {
|
||||
data(dir: "D:\\Work\\Numass\\data\\2017_11\\Fill_2", mask: "set_3.")
|
||||
|
@ -53,7 +53,7 @@ class NumassPlugin : BasicPlugin() {
|
||||
// StorageManager.buildFrom(context);
|
||||
super.attach(context)
|
||||
//TODO Replace by local providers
|
||||
loadModels(context[ModelLibrary::class.java])
|
||||
loadModels(context.getOrLoad(ModelLibrary::class.java))
|
||||
loadMath(FunctionLibrary.buildFrom(context))
|
||||
}
|
||||
|
||||
@ -307,6 +307,6 @@ fun displayChart(title: String, context: Context = Global, width: Double = 800.0
|
||||
val frame = JFreeChartFrame()
|
||||
frame.configure(meta)
|
||||
frame.configureValue("title", title)
|
||||
context.pluginManager.load<FXPlugin>().display(PlotContainer(frame), width, height)
|
||||
context.plugins.load<FXPlugin>().display(PlotContainer(frame), width, height)
|
||||
return frame
|
||||
}
|
||||
|
@ -37,9 +37,12 @@ import inr.numass.data.api.NumassPoint
|
||||
import inr.numass.data.api.NumassSet
|
||||
import inr.numass.models.FSS
|
||||
import inr.numass.utils.ExpressionUtils
|
||||
import javafx.application.Platform
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.apache.commons.math3.analysis.UnivariateFunction
|
||||
import org.jfree.chart.plot.IntervalMarker
|
||||
import org.jfree.chart.ui.RectangleInsets
|
||||
import org.slf4j.Logger
|
||||
import java.awt.Color
|
||||
import java.awt.Font
|
||||
import java.io.IOException
|
||||
@ -215,7 +218,7 @@ fun JFreeChartFrame.addSetMarkers(sets: Collection<NumassSet>) {
|
||||
marker.label = set.name
|
||||
marker.labelFont = Font("Verdana", Font.BOLD, 20);
|
||||
marker.labelOffset = RectangleInsets(30.0, 30.0, 30.0, 30.0)
|
||||
runLater { jfcPlot.addDomainMarker(marker) }
|
||||
Platform.runLater { jfcPlot.addDomainMarker(marker) }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -25,6 +25,7 @@ import hep.dataforge.maths.chain.Chain
|
||||
import inr.numass.data.api.NumassBlock
|
||||
import inr.numass.data.api.OrphanNumassEvent
|
||||
import inr.numass.data.api.SimpleBlock
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.apache.commons.math3.random.RandomGenerator
|
||||
import java.lang.Math.max
|
||||
import java.time.Duration
|
||||
|
@ -21,9 +21,11 @@ import hep.dataforge.plots.PlotFrame
|
||||
import hep.dataforge.plots.data.XYFunctionPlot
|
||||
import hep.dataforge.utils.Misc
|
||||
import hep.dataforge.values.Values
|
||||
import kotlinx.coroutines.*
|
||||
import org.apache.commons.math3.analysis.BivariateFunction
|
||||
import org.apache.commons.math3.analysis.UnivariateFunction
|
||||
import org.apache.commons.math3.exception.OutOfRangeException
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.lang.Math.exp
|
||||
import java.util.*
|
||||
|
||||
|
@ -13,6 +13,7 @@ import hep.dataforge.values.Values
|
||||
import inr.numass.models.misc.LossCalculator
|
||||
import inr.numass.utils.ExpressionUtils
|
||||
import org.apache.commons.math3.analysis.BivariateFunction
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
|
@ -5,6 +5,10 @@ import inr.numass.actions.TimeAnalyzerAction
|
||||
import inr.numass.data.NumassGenerator
|
||||
import inr.numass.data.api.SimpleNumassPoint
|
||||
import inr.numass.data.generateBlock
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.channels.produce
|
||||
import kotlinx.coroutines.channels.toList
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import java.time.Instant
|
||||
|
||||
fun main(args: Array<String>) {
|
||||
|
@ -38,6 +38,8 @@ import inr.numass.data.SpectrumAdapter
|
||||
import inr.numass.data.SpectrumGenerator
|
||||
import inr.numass.models.NBkgSpectrum
|
||||
import inr.numass.models.sterile.SterileNeutrinoSpectrum
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.launch
|
||||
import java.io.PrintWriter
|
||||
import kotlin.math.sqrt
|
||||
|
||||
@ -89,7 +91,7 @@ fun main(args: Array<String>) {
|
||||
}
|
||||
|
||||
val adapter = SpectrumAdapter(Meta.empty())
|
||||
val fm = context.get<FitManager>()
|
||||
val fm = context.getOrLoad(FitManager::class.java)
|
||||
|
||||
fun plotFitResidual(name: String, vararg override: Pair<String, Double>): Plot {
|
||||
val paramsMod = params.update(*override)
|
||||
|
@ -4,6 +4,7 @@ import inr.numass.data.channel
|
||||
import inr.numass.data.plotAmplitudeSpectrum
|
||||
import inr.numass.data.storage.ProtoNumassPoint
|
||||
import inr.numass.data.transformChain
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import java.io.File
|
||||
|
||||
fun main(args: Array<String>) {
|
||||
|
@ -10,6 +10,7 @@ import hep.dataforge.storage.Storage
|
||||
import hep.dataforge.useValue
|
||||
import inr.numass.data.storage.NumassDataLoader
|
||||
import inr.numass.data.storage.NumassDirectory
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import java.io.File
|
||||
|
||||
private suspend fun createSummaryNode(storage: Storage): MetaBuilder {
|
||||
@ -19,7 +20,7 @@ private suspend fun createSummaryNode(storage: Storage): MetaBuilder {
|
||||
.setValue("name", storage.name)
|
||||
.setValue("path", storage.fullName)
|
||||
|
||||
storage.getChildren().forEach { element ->
|
||||
storage.children.forEach { element ->
|
||||
if(element is Storage && element.name.startsWith("Fill")){
|
||||
builder.putNode(createSummaryNode(element))
|
||||
} else if(element is NumassDataLoader){
|
||||
|
@ -22,6 +22,7 @@ import inr.numass.data.api.NumassSet
|
||||
import inr.numass.data.api.SimpleNumassPoint
|
||||
import inr.numass.data.storage.NumassDataLoader
|
||||
import inr.numass.data.storage.NumassDirectory
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.apache.commons.math3.analysis.ParametricUnivariateFunction
|
||||
import org.apache.commons.math3.exception.DimensionMismatchException
|
||||
import org.apache.commons.math3.fitting.SimpleCurveFitter
|
||||
@ -39,7 +40,7 @@ object Threshold {
|
||||
fun Storage.loaders(): Sequence<NumassDataLoader>{
|
||||
return sequence<NumassDataLoader> {
|
||||
print("Reading ${this@loaders.fullName}")
|
||||
runBlocking { this@loaders.getChildren()}.forEach {
|
||||
runBlocking { this@loaders.children }.forEach {
|
||||
if(it is NumassDataLoader){
|
||||
yield(it)
|
||||
} else if (it is Storage){
|
||||
|
@ -22,6 +22,10 @@ import inr.numass.data.analyzers.SimpleAnalyzer
|
||||
import inr.numass.data.api.NumassBlock
|
||||
import inr.numass.data.api.NumassPoint
|
||||
import inr.numass.data.api.NumassSet
|
||||
import kotlinx.coroutines.CoroutineStart
|
||||
import kotlinx.coroutines.Deferred
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.async
|
||||
|
||||
private val analyzer = SimpleAnalyzer()
|
||||
|
||||
|
@ -3,6 +3,9 @@ package inr.numass.viewer
|
||||
import hep.dataforge.context.Context
|
||||
import hep.dataforge.context.Global
|
||||
import hep.dataforge.fx.dfIconView
|
||||
import hep.dataforge.fx.except
|
||||
import hep.dataforge.fx.runGoal
|
||||
import hep.dataforge.fx.ui
|
||||
import hep.dataforge.storage.Storage
|
||||
import inr.numass.NumassProperties
|
||||
import inr.numass.data.api.NumassPoint
|
||||
@ -16,6 +19,10 @@ import javafx.scene.layout.Priority
|
||||
import javafx.scene.text.Font
|
||||
import javafx.stage.DirectoryChooser
|
||||
import javafx.stage.FileChooser
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.launch
|
||||
import org.controlsfx.control.StatusBar
|
||||
import tornadofx.*
|
||||
import java.io.File
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
|
@ -3,6 +3,12 @@ package inr.numass.viewer
|
||||
import hep.dataforge.fx.meta.MetaViewer
|
||||
import inr.numass.data.analyzers.NumassAnalyzer
|
||||
import javafx.beans.property.SimpleIntegerProperty
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.launch
|
||||
import org.controlsfx.glyphfont.FontAwesome
|
||||
import tornadofx.*
|
||||
import tornadofx.controlsfx.borders
|
||||
import tornadofx.controlsfx.toGlyph
|
||||
|
||||
class PointInfoView(val point: CachedPoint) : MetaViewer(point.meta) {
|
||||
|
||||
|
@ -14,6 +14,8 @@ import inr.numass.data.storage.NumassDataLoader
|
||||
import javafx.beans.property.SimpleBooleanProperty
|
||||
import javafx.scene.control.ContextMenu
|
||||
import javafx.scene.control.TreeItem
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import tornadofx.*
|
||||
|
||||
class StorageView(val storage: Storage) : View(title = "Numass storage", icon = dfIconView) {
|
||||
|
||||
@ -77,7 +79,7 @@ class StorageView(val storage: Storage) : View(title = "Numass storage", icon =
|
||||
|
||||
val children: List<Container>? by lazy {
|
||||
when (content) {
|
||||
is Storage -> runBlocking { content.getChildren() }.map { buildContainer(it, this) }.sortedWith(
|
||||
is Storage -> runBlocking { content.children }.map { buildContainer(it, this) }.sortedWith(
|
||||
object : Comparator<Container> {
|
||||
private val alphanumComparator = AlphanumComparator()
|
||||
override fun compare(o1: Container, o2: Container): Int = alphanumComparator.compare(o1.id, o2.id)
|
||||
|
@ -10,6 +10,9 @@ import inr.numass.data.storage.NumassDirectory
|
||||
import inr.numass.viewer.*
|
||||
import javafx.application.Application
|
||||
import javafx.scene.image.ImageView
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.launch
|
||||
import tornadofx.*
|
||||
import java.io.File
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user