implemented file object storage

This commit is contained in:
Alexander Nozik 2016-04-16 22:41:21 +03:00
parent 9a9a0beca4
commit d67a74ccd0
11 changed files with 371 additions and 51 deletions

View File

@ -26,8 +26,6 @@ import inr.numass.client.NumassClient
new StorageManager().startGlobal(); new StorageManager().startGlobal();
MetaStreamWriter parser = new JSONMetaWriter(); MetaStreamWriter parser = new JSONMetaWriter();
println "Starting Numass test client..." println "Starting Numass test client..."
@ -38,7 +36,7 @@ BufferedReader br = new BufferedReader(new InputStreamReader(System.in))
while(line == null || !line.startsWith("exit")){ while(line == null || !line.startsWith("exit")){
// print ">" // print ">"
line = br.readLine(); line = br.readLine();
if(!line.startsWith("exit")){ if(line!= null && !line.startsWith("exit")){
NumassClient.runComand("127.0.0.1", 8335, line.split(" ")); NumassClient.runComand("127.0.0.1", 8335, line.split(" "));
} }
} }

View File

@ -0,0 +1,20 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package inr.numass.scripts
import hep.dataforge.io.MetaStreamReader
import hep.dataforge.io.MetaStreamWriter
import hep.dataforge.meta.Meta
import hep.dataforge.storage.commons.JSONMetaWriter
import hep.dataforge.storage.commons.StorageManager
import inr.numass.client.NumassClient
new StorageManager().startGlobal();
MetaStreamWriter parser = new JSONMetaWriter();
NumassClient.runComand("127.0.0.1", 8336, "addNote", "This is my note with <strong>html</strong>");

View File

@ -36,6 +36,7 @@ import java.net.Socket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.time.Instant;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -203,6 +204,23 @@ public class NumassClient extends MessageFactory implements Closeable {
return sendAndRecieve(env.build()).meta(); return sendAndRecieve(env.build()).meta();
} }
public Meta addNote(String text, Instant time) {
EnvelopeBuilder env = requestActionBase("numass.notes", "push");
env.putMetaValue("note.text", text);
if (time != null) {
env.putMetaValue("note.time", time);
}
return sendAndRecieve(env.build()).meta();
}
public Meta getNotes(int limit) {
EnvelopeBuilder env = requestActionBase("numass.notes", "pull");
if (limit > 0) {
env.putMetaValue("limit", limit);
}
return sendAndRecieve(env.build()).meta();
}
public static void main(String[] args) { public static void main(String[] args) {
new StorageManager().startGlobal(); new StorageManager().startGlobal();
@ -224,8 +242,8 @@ public class NumassClient extends MessageFactory implements Closeable {
} }
public static void runComand(String ip, int port, String[] args) { public static void runComand(String ip, int port, String... args) {
checkArgLength(args, 1); checkArgLength(1, args);
try (NumassClient client = new NumassClient(ip, port)) { try (NumassClient client = new NumassClient(ip, port)) {
switch (args[0]) { switch (args[0]) {
case "getRun": case "getRun":
@ -237,7 +255,7 @@ public class NumassClient extends MessageFactory implements Closeable {
} }
return; return;
case "setRun": case "setRun":
checkArgLength(args, 2); checkArgLength(2, args);
Meta setRun = client.startRun(args[1]); Meta setRun = client.startRun(args[1]);
if (setRun.getBoolean("success", true)) { if (setRun.getBoolean("success", true)) {
System.out.println(setRun.getString("run.path")); System.out.println(setRun.getString("run.path"));
@ -246,7 +264,7 @@ public class NumassClient extends MessageFactory implements Closeable {
} }
return; return;
case "getState": case "getState":
checkArgLength(args, 2); checkArgLength(2, args);
String stateName = args[1]; String stateName = args[1];
Map<String, Value> states = client.getStates(stateName); Map<String, Value> states = client.getStates(stateName);
if (states != null) { if (states != null) {
@ -256,7 +274,7 @@ public class NumassClient extends MessageFactory implements Closeable {
} }
return; return;
case "setState": case "setState":
checkArgLength(args, 3); checkArgLength(3, args);
String setStateName = args[1]; String setStateName = args[1];
String setStateValue = args[2]; String setStateValue = args[2];
Meta setStateMeta = client.setState(setStateName, setStateValue); Meta setStateMeta = client.setState(setStateName, setStateValue);
@ -267,7 +285,7 @@ public class NumassClient extends MessageFactory implements Closeable {
} }
return; return;
case "pushPoint": case "pushPoint":
checkArgLength(args, 2); checkArgLength(2, args);
String path; String path;
String fileName; String fileName;
if (args.length == 2) { if (args.length == 2) {
@ -285,6 +303,16 @@ public class NumassClient extends MessageFactory implements Closeable {
} else { } else {
System.out.println("Error: operaton failed"); System.out.println("Error: operaton failed");
} }
return;
case "addNote":
checkArgLength(2, args);
String note = args[1];
Meta addNote = client.addNote(note, null);
if (addNote.getBoolean("success", true)) {
System.out.println("OK");
} else {
System.out.println("Error: operaton failed");
}
} }
} catch (IOException ex) { } catch (IOException ex) {
@ -294,7 +322,7 @@ public class NumassClient extends MessageFactory implements Closeable {
} }
} }
private static void checkArgLength(String[] args, int length) { private static void checkArgLength(int length, String... args) {
if (args.length < length) { if (args.length < length) {
LoggerFactory.getLogger("NumassClient").error("Command line to short"); LoggerFactory.getLogger("NumassClient").error("Command line to short");
System.exit(1); System.exit(1);

View File

@ -18,7 +18,7 @@ package inr.numass.scripts
import hep.dataforge.storage.filestorage.FileStorage import hep.dataforge.storage.filestorage.FileStorage
import inr.numass.server.NumassServer import inr.numass.server.NumassServer
String path = "D:\\temp\\test\\numass\\" String path = "D:\\temp\\test\\numass-server\\"
FileStorage storage = FileStorage.in(new File(path), null); FileStorage storage = FileStorage.in(new File(path), null);

View File

@ -0,0 +1,77 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package inr.numass.server;
import hep.dataforge.meta.Meta;
import hep.dataforge.meta.MetaBuilder;
import java.io.Serializable;
import java.time.Instant;
/**
* A progress note for numass run
*
* @author Alexander Nozik
*/
public class NumassNote implements Serializable {
public static NumassNote buildFrom(Meta meta) {
String text = meta.getString("text", "");
if (meta.hasValue("time")) {
Instant time = meta.getValue("time").timeValue();
return new NumassNote(text, time);
} else {
return new NumassNote(text);
}
}
private final String content;
private final String ref;
private final Instant time;
public NumassNote(String content, Instant time) {
this.content = content;
this.time = time;
this.ref = "#" + time.hashCode();
}
public NumassNote(String content) {
this.content = content;
this.time = Instant.now();
this.ref = "#" + time.hashCode();
}
/**
* Text content
*
* @return
*/
public String content() {
return content;
}
/**
*
* @return
*/
public Instant time() {
return time;
}
/**
* Unique note name for references
*
* @return
*/
public String ref() {
return ref;
}
public Meta toMeta() {
return new MetaBuilder("note").putValue("time", time)
.putValue("ref", ref)
.putValue("text", content);
}
}

View File

@ -18,6 +18,7 @@ package inr.numass.server;
import hep.dataforge.data.binary.Binary; import hep.dataforge.data.binary.Binary;
import hep.dataforge.exceptions.StorageException; import hep.dataforge.exceptions.StorageException;
import hep.dataforge.io.envelopes.Envelope; import hep.dataforge.io.envelopes.Envelope;
import hep.dataforge.io.envelopes.EnvelopeBuilder;
import hep.dataforge.io.envelopes.Responder; import hep.dataforge.io.envelopes.Responder;
import hep.dataforge.meta.Annotated; import hep.dataforge.meta.Annotated;
import hep.dataforge.meta.Meta; import hep.dataforge.meta.Meta;
@ -25,8 +26,15 @@ import hep.dataforge.storage.api.StateLoader;
import hep.dataforge.storage.commons.LoaderFactory; import hep.dataforge.storage.commons.LoaderFactory;
import hep.dataforge.storage.commons.MessageFactory; import hep.dataforge.storage.commons.MessageFactory;
import hep.dataforge.values.Value; import hep.dataforge.values.Value;
import inr.numass.storage.NumassStorage;
import java.io.IOException; import java.io.IOException;
import java.time.Instant;
import java.util.stream.Stream;
import hep.dataforge.storage.api.ObjectLoader;
import inr.numass.storage.NumassStorage;
import java.util.Comparator;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* This object governs remote access to numass storage and performs reading and * This object governs remote access to numass storage and performs reading and
@ -37,6 +45,7 @@ import java.io.IOException;
public class NumassRun implements Annotated, Responder { public class NumassRun implements Annotated, Responder {
public static final String RUN_STATE = "@run"; public static final String RUN_STATE = "@run";
public static final String RUN_NOTES = "@notes";
private final String runPath; private final String runPath;
/** /**
@ -48,13 +57,23 @@ public class NumassRun implements Annotated, Responder {
* Default state loader for this run * Default state loader for this run
*/ */
private final StateLoader states; private final StateLoader states;
private final ObjectLoader noteLoader;
private final MessageFactory factory; private final MessageFactory factory;
private final Logger logger;
// /**
// * A set with inverted order of elements (last note first)
// */
// private final Set<NumassNote> notes = new TreeSet<>((NumassNote o1, NumassNote o2) -> -o1.time().compareTo(o2.time()));
public NumassRun(String path, NumassStorage workStorage, MessageFactory factory) throws StorageException { public NumassRun(String path, NumassStorage workStorage, MessageFactory factory) throws StorageException {
this.storage = workStorage; this.storage = workStorage;
this.states = LoaderFactory.buildStateLoder(storage, RUN_STATE, null); this.states = LoaderFactory.buildStateLoder(storage, RUN_STATE, null);
this.noteLoader = LoaderFactory.buildObjectLoder(storage, RUN_NOTES, null);
this.factory = factory; this.factory = factory;
this.runPath = path; this.runPath = path;
logger = LoggerFactory.getLogger("CURRENT_RUN");
} }
public Value getState(String name) { public Value getState(String name) {
@ -88,12 +107,85 @@ public class NumassRun implements Annotated, Responder {
default: default:
throw new UnknownNumassActionException(action, UnknownNumassActionException.Cause.NOT_SUPPORTED); throw new UnknownNumassActionException(action, UnknownNumassActionException.Cause.NOT_SUPPORTED);
} }
case "numass.notes":
switch (action) {
case "push":
return pushNote(message);
case "pull":
return pullNotes(message);
default:
throw new UnknownNumassActionException(action, UnknownNumassActionException.Cause.NOT_SUPPORTED);
}
default: default:
throw new RuntimeException("Wrong message type"); throw new RuntimeException("Wrong message type");
} }
} }
public Envelope pushNumassPoint(Envelope message) { public synchronized void addNote(String text, Instant time) throws StorageException {
NumassNote note = new NumassNote(text, time);
addNote(note);
}
@SuppressWarnings("unchecked")
public synchronized void addNote(NumassNote note) throws StorageException {
noteLoader.push(note.ref(), note);
}
/**
* Stream of notes in the last to first order
*
* @return
*/
@SuppressWarnings("unchecked")
public Stream<NumassNote> getNotes() {
return noteLoader.fragmentNames().stream().<NumassNote>map(new Function<String, NumassNote>() {
@Override
public NumassNote apply(String name) {
try {
return (NumassNote) noteLoader.pull(name);
} catch (StorageException ex) {
return (NumassNote) null;
}
}
}).sorted(new Comparator<NumassNote>() {
@Override
public int compare(NumassNote o1, NumassNote o2) {
return -o1.time().compareTo(o2.time());
}
});
}
private synchronized Envelope pushNote(Envelope message) {
try {
if (message.meta().hasNode("note")) {
for (Meta node : message.meta().getNodes("note")) {
addNote(NumassNote.buildFrom(node));
}
} else {
addNote(NumassNote.buildFrom(message.meta()));
}
return factory.okResponseBase(message, false, false).build();
} catch (Exception ex) {
logger.error("Failed to push note", ex);
return factory.errorResponseBase(message, ex).build();
}
}
private Envelope pullNotes(Envelope message) {
EnvelopeBuilder envelope = factory.okResponseBase(message, true, false);
int limit = message.meta().getInt("limit", -1);
//TODO add time window and search conditions here
Stream<NumassNote> stream = getNotes();
if (limit > 0) {
stream = stream.limit(limit);
}
stream.forEach((NumassNote note) -> envelope.putMetaNode(note.toMeta()));
return envelope.build();
}
private Envelope pushNumassPoint(Envelope message) {
try { try {
String filePath = message.meta().getString("path", ""); String filePath = message.meta().getString("path", "");
String fileName = message.meta().getString("name") String fileName = message.meta().getString("name")
@ -102,6 +194,7 @@ public class NumassRun implements Annotated, Responder {
//TODO add checksum here //TODO add checksum here
return factory.okResponseBase("numass.data.push.response", false, false).build(); return factory.okResponseBase("numass.data.push.response", false, false).build();
} catch (StorageException | IOException ex) { } catch (StorageException | IOException ex) {
logger.error("Failed to push point", ex);
return factory.errorResponseBase("numass.data.push.response", ex).build(); return factory.errorResponseBase("numass.data.push.response", ex).build();
} }
} }

View File

@ -24,7 +24,7 @@ import hep.dataforge.storage.commons.AbstractNetworkListener;
import hep.dataforge.storage.commons.LoaderFactory; import hep.dataforge.storage.commons.LoaderFactory;
import hep.dataforge.storage.commons.StorageManager; import hep.dataforge.storage.commons.StorageManager;
import hep.dataforge.storage.filestorage.FileStorage; import hep.dataforge.storage.filestorage.FileStorage;
import hep.dataforge.storage.servlet.SorageRatpackHandler; import hep.dataforge.storage.servlet.StorageRatpackHandler;
import inr.numass.storage.NumassStorage; import inr.numass.storage.NumassStorage;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -84,7 +84,7 @@ public class NumassServer extends AbstractNetworkListener {
.serverConfig((ServerConfigBuilder config) -> config.port(port)) .serverConfig((ServerConfigBuilder config) -> config.port(port))
.handlers((Chain chain) -> chain .handlers((Chain chain) -> chain
.get(new NumassRootHandler(this)) .get(new NumassRootHandler(this))
.get("storage", new SorageRatpackHandler(root)) .get("storage", new NumassStorageHandler(root))
) )
); );
} }
@ -114,6 +114,7 @@ public class NumassServer extends AbstractNetworkListener {
case "numass.state": case "numass.state":
return getRootState().respond(message); return getRootState().respond(message);
case "numass.data": case "numass.data":
case "numass.notes":
case "numass.run.state": case "numass.run.state":
return getRun().respond(message); return getRun().respond(message);
case "numass.control": case "numass.control":
@ -282,13 +283,13 @@ public class NumassServer extends AbstractNetworkListener {
// } // }
// b.append("<div class=\"shifted\">\n"); // b.append("<div class=\"shifted\">\n");
// for (Loader loader : storage.loaders().values()) { // for (Loader loader : storage.loaders().values()) {
// renderLoader(ctx, b, loader); // defaultRenderLoader(ctx, b, loader);
// } // }
// b.append("</div>\n"); // b.append("</div>\n");
// b.append("</div>\n"); // b.append("</div>\n");
// } // }
// //
// private void renderLoader(Context ctx, StringBuilder b, Loader loader) { // private void defaultRenderLoader(Context ctx, StringBuilder b, Loader loader) {
// String href = "/storage?path="+loader.getFullPath(); // String href = "/storage?path="+loader.getFullPath();
// b.append(String.format("<p><a href=\"%s\">%s</a> (%s)</p>", href, loader.getName(), loader.getType())); // b.append(String.format("<p><a href=\"%s\">%s</a> (%s)</p>", href, loader.getName(), loader.getType()));
// } // }

View File

@ -0,0 +1,89 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package inr.numass.server;
import freemarker.template.Template;
import hep.dataforge.exceptions.StorageException;
import hep.dataforge.storage.api.ObjectLoader;
import hep.dataforge.storage.api.Storage;
import hep.dataforge.storage.servlet.StorageRatpackHandler;
import hep.dataforge.storage.servlet.Utils;
import java.io.StringWriter;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.LoggerFactory;
import ratpack.handling.Context;
/**
*
* @author Alexander Nozik
*/
public class NumassStorageHandler extends StorageRatpackHandler {
public NumassStorageHandler(Storage root) {
super(root);
}
@Override
@SuppressWarnings("unchecked")
protected void renderObjects(Context ctx, ObjectLoader loader) {
if (NumassRun.RUN_NOTES.equals(loader.getName())) {
try {
ctx.getResponse().contentType("text/html");
Template template = Utils.freemarkerConfig().getTemplate("NoteLoader.ftl");
List<String> notes = getNotes(loader).limit(100).map(note->render(note)).collect(Collectors.toList());
Map data = new HashMap(2);
data.put("notes", notes);
StringWriter writer = new StringWriter();
template.process(data, writer);
ctx.render(writer.toString());
} catch (Exception ex) {
LoggerFactory.getLogger(getClass()).error("Failed to render template", ex);
ctx.render(ex.toString());
}
} else {
super.renderObjects(ctx, loader);
}
}
private String render(NumassNote note){
return String.format("%s: <strong>%s</strong> %s", note.ref(), note.time(), note.content());
}
/**
* Stream of notes in the last to first order
*
* @return
*/
@SuppressWarnings("unchecked")
private Stream<NumassNote> getNotes(ObjectLoader noteLoader) {
return noteLoader.fragmentNames().stream().<NumassNote>map(new Function<String, NumassNote>() {
@Override
public NumassNote apply(String name) {
try {
return (NumassNote) noteLoader.pull(name);
} catch (StorageException ex) {
return (NumassNote) null;
}
}
}).sorted(new Comparator<NumassNote>() {
@Override
public int compare(NumassNote o1, NumassNote o2) {
return -o1.time().compareTo(o2.time());
}
});
}
}

View File

@ -0,0 +1,14 @@
<!DOCTYPE html>
<html>
<head>
<title>Numass run notes</title>
<meta charset="UTF-8">
<meta http-equiv="refresh" content="30">
</head>
<body>
<h1> Numass experiment run notes:</h1>
<#list notes as note>
<p>${note};</p>
</#list>
</body>
</html>

View File

@ -21,7 +21,6 @@ import hep.dataforge.io.envelopes.DefaultEnvelopeReader;
import hep.dataforge.io.envelopes.Envelope; import hep.dataforge.io.envelopes.Envelope;
import hep.dataforge.meta.Meta; import hep.dataforge.meta.Meta;
import hep.dataforge.meta.MetaBuilder; import hep.dataforge.meta.MetaBuilder;
import hep.dataforge.storage.api.BinaryLoader;
import hep.dataforge.storage.api.Storage; import hep.dataforge.storage.api.Storage;
import hep.dataforge.storage.loaders.AbstractLoader; import hep.dataforge.storage.loaders.AbstractLoader;
import inr.numass.data.NMEvent; import inr.numass.data.NMEvent;
@ -54,13 +53,14 @@ import static org.apache.commons.vfs2.FileType.FOLDER;
import org.apache.commons.vfs2.VFS; import org.apache.commons.vfs2.VFS;
import org.apache.commons.vfs2.provider.local.DefaultLocalFileProvider; import org.apache.commons.vfs2.provider.local.DefaultLocalFileProvider;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import hep.dataforge.storage.api.ObjectLoader;
/** /**
* The reader for numass main detector data directory or zip format; * The reader for numass main detector data directory or zip format;
* *
* @author darksnake * @author darksnake
*/ */
public class NumassDataLoader extends AbstractLoader implements BinaryLoader<Envelope>, NumassData { public class NumassDataLoader extends AbstractLoader implements ObjectLoader<Envelope>, NumassData {
//FIXME administer resource release //FIXME administer resource release
/** /**