Strarted work on messaging

This commit is contained in:
Alexander Nozik 2018-05-26 21:27:31 +03:00
parent 2a6c7b75a5
commit 3e95e0c248
7 changed files with 199 additions and 28 deletions

View File

@ -8,6 +8,7 @@ mainClassName = mainClass
dependencies { dependencies {
compile project(':numass-core') compile project(':numass-core')
compile 'commons-cli:commons-cli:1.3.1' compile 'commons-cli:commons-cli:1.4'
compile 'org.zeroturnaround:zt-zip:1.9' compile "hep.dataforge:dataforge-messages"
compile 'org.zeroturnaround:zt-zip:1.13'
} }

View File

@ -19,7 +19,7 @@ import hep.dataforge.io.envelopes.DefaultEnvelopeReader;
import hep.dataforge.io.envelopes.DefaultEnvelopeType; import hep.dataforge.io.envelopes.DefaultEnvelopeType;
import hep.dataforge.io.envelopes.Envelope; import hep.dataforge.io.envelopes.Envelope;
import hep.dataforge.io.envelopes.EnvelopeBuilder; import hep.dataforge.io.envelopes.EnvelopeBuilder;
import hep.dataforge.io.messages.Responder; import hep.dataforge.messages.Responder;
import hep.dataforge.meta.Meta; import hep.dataforge.meta.Meta;
import hep.dataforge.meta.MetaBuilder; import hep.dataforge.meta.MetaBuilder;
import hep.dataforge.values.Value; import hep.dataforge.values.Value;
@ -39,9 +39,8 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;
import static hep.dataforge.io.messages.MessagesKt.*; import static hep.dataforge.messages.MessagesKt.*;
/** /**
* @author darksnake * @author darksnake
@ -68,8 +67,9 @@ public class NumassClient implements AutoCloseable, Responder {
socket.close(); socket.close();
} }
@NotNull
@Override @Override
public Envelope respond(Envelope message) { public Envelope respond(@NotNull Envelope message) {
try { try {
write(message, socket.getOutputStream()); write(message, socket.getOutputStream());
return read(socket.getInputStream()); return read(socket.getInputStream());
@ -255,10 +255,4 @@ public class NumassClient implements AutoCloseable, Responder {
public Envelope sendDataPoints(String shelf, String loaderName, Collection<Values> points) { public Envelope sendDataPoints(String shelf, String loaderName, Collection<Values> points) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@NotNull
@Override
public CompletableFuture<Envelope> respondInFuture(@NotNull Envelope message) {
return CompletableFuture.supplyAsync(() -> respond(message));
}
} }

View File

@ -113,10 +113,6 @@ class NumassDataLoader(
TODO() TODO()
} }
override fun respond(message: Envelope): Envelope {
throw TODO("Not supported yet.")
}
override val startTime: Instant override val startTime: Instant
get() = meta.optValue("start_time").map<Instant> { it.time }.orElseGet { super.startTime } get() = meta.optValue("start_time").map<Instant> { it.time }.orElseGet { super.startTime }

View File

@ -19,6 +19,7 @@ mainClassName = "inr.numass.server.ServerRunner"
dependencies { dependencies {
compile project(':numass-core') compile project(':numass-core')
compile "hep.dataforge:storage-server" // project(':dataforge-storage:storage-servlet') compile "hep.dataforge:storage-server" // project(':dataforge-storage:storage-servlet')
compile "hep.dataforge:dataforge-messages"
compile 'commons-daemon:commons-daemon:1.+' compile 'commons-daemon:commons-daemon:1.+'
} }

View File

@ -0,0 +1,180 @@
/*
* Copyright 2018 Alexander Nozik.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package inr.numass.server
import hep.dataforge.io.envelopes.DefaultEnvelopeReader
import hep.dataforge.io.envelopes.DefaultEnvelopeType
import hep.dataforge.io.envelopes.Envelope
import hep.dataforge.messages.errorResponseBase
import hep.dataforge.messages.isTerminator
import hep.dataforge.messages.terminator
import hep.dataforge.meta.Meta
import hep.dataforge.meta.Metoid
import org.slf4j.LoggerFactory
import java.io.IOException
import java.io.InputStream
import java.io.OutputStream
import java.net.ServerSocket
import java.net.Socket
/**
* Abstract network listener for envelopes
*
* @author Darksnake
*/
abstract class AbstractNetworkListener(listnerConfig: Meta?) : Metoid, AutoCloseable {
@Volatile
private var finishflag = false
private var serverSocket: ServerSocket? = null
final override val meta: Meta
private val port: Int
get() = meta.getInt("port", 8335)
init {
if (listnerConfig == null) {
this.meta = Meta.buildEmpty("listener")
} else {
this.meta = listnerConfig
}
}
@Throws(IOException::class, InterruptedException::class)
override fun close() {
finishflag = true
if (serverSocket != null) {
serverSocket!!.close()
}
logger.info("Closing listner...")
}
abstract fun respond(message: Envelope): Envelope
@Throws(Exception::class)
open fun open() {
val clientGroup = ThreadGroup("clients")
Thread({
try {
ServerSocket(port).use { ss ->
serverSocket = ss
logger.info("Starting to listning to the port {}", port)
while (!finishflag) {
//FIXME add timeout
val s = ss.accept()
logger.info("Client accepted from {}", s.remoteSocketAddress.toString())
// new SocketProcessor(s).run();
val socketProcessor = SocketProcessor(s)
Thread(clientGroup, socketProcessor).start()
}
}
} catch (ex: IOException) {
if (!finishflag) {
logger.error("Connection exception", ex)
}
}
logger.info("Listener closed")
serverSocket = null
}, "listner").start()
}
/**
* Decide to accept envelope
*
* @param envelope
* @return
*/
protected fun accept(envelope: Envelope): Boolean {
return true
}
private inner class SocketProcessor (private val socket: Socket) : Runnable {
private val inputStream: InputStream = socket.getInputStream()
private val outputStream: OutputStream = socket.getOutputStream()
override fun run() {
logger.info("Starting client processing from {}", socket.remoteSocketAddress.toString())
while (!finishflag) {
if (socket.isClosed) {
finishflag = true
logger.debug("Socket {} closed by client", socket.remoteSocketAddress.toString())
break
}
try {
val request = read()
//Breaking connection on terminator
if (isTerminator(request)) {
logger.info("Recieved terminator message from {}", socket.remoteSocketAddress.toString())
break
}
if (accept(request)) {
var response: Envelope?
try {
response = respond(request)
} catch (ex: Exception) {
logger.error("Uncatched exception during response evaluation", ex)
response = errorResponseBase("", ex).build()
}
//Null respnses are ignored
if (response != null) {
write(response)
}
}
} catch (ex: IOException) {
logger.error("IO exception during envelope evaluation", ex)
finishflag = true
}
}
logger.info("Client processing finished for {}", socket.remoteSocketAddress.toString())
if (!socket.isClosed) {
try {
write(terminator)//Sending additional terminator to notify client that server is closing connection
} catch (ex: IOException) {
logger.error("Terminator send failed", ex)
}
try {
socket.close()
} catch (ex: IOException) {
logger.error("Can't close the socket", ex)
}
}
}
@Throws(IOException::class)
private fun read(): Envelope {
return DefaultEnvelopeReader().readWithData(inputStream)
}
@Throws(IOException::class)
private fun write(envelope: Envelope) {
DefaultEnvelopeType.INSTANCE.writer.write(outputStream, envelope)
outputStream.flush()
}
}
companion object {
private val logger = LoggerFactory.getLogger("LISTENER")
}
}

View File

@ -18,7 +18,7 @@ package inr.numass.server;
import hep.dataforge.exceptions.StorageException; import hep.dataforge.exceptions.StorageException;
import hep.dataforge.io.envelopes.Envelope; import hep.dataforge.io.envelopes.Envelope;
import hep.dataforge.io.envelopes.EnvelopeBuilder; import hep.dataforge.io.envelopes.EnvelopeBuilder;
import hep.dataforge.io.messages.Responder; import hep.dataforge.messages.Responder;
import hep.dataforge.meta.Meta; import hep.dataforge.meta.Meta;
import hep.dataforge.meta.Metoid; import hep.dataforge.meta.Metoid;
import hep.dataforge.storage.api.ObjectLoader; import hep.dataforge.storage.api.ObjectLoader;
@ -36,8 +36,8 @@ import java.time.Instant;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream; import java.util.stream.Stream;
import static hep.dataforge.io.messages.MessagesKt.errorResponseBase; import static hep.dataforge.messages.MessagesKt.errorResponseBase;
import static hep.dataforge.io.messages.MessagesKt.okResponseBase; import static hep.dataforge.messages.MessagesKt.okResponseBase;
import static inr.numass.server.NumassServerUtils.getNotes; import static inr.numass.server.NumassServerUtils.getNotes;
/** /**
@ -100,8 +100,8 @@ public class NumassRun implements Metoid, Responder {
String type = meta.getString("type", "numass.run.state"); String type = meta.getString("type", "numass.run.state");
String action = meta.getString("action"); String action = meta.getString("action");
switch (type) { switch (type) {
case "numass.run.state": // case "numass.run.state":
return states.respond(message); // return states.respond(message);
case "numass.data": case "numass.data":
switch (action) { switch (action) {
case "push": case "push":

View File

@ -23,7 +23,6 @@ import hep.dataforge.meta.Meta;
import hep.dataforge.meta.MetaBuilder; import hep.dataforge.meta.MetaBuilder;
import hep.dataforge.storage.api.StateLoader; import hep.dataforge.storage.api.StateLoader;
import hep.dataforge.storage.api.Storage; import hep.dataforge.storage.api.Storage;
import hep.dataforge.storage.commons.AbstractNetworkListener;
import hep.dataforge.storage.commons.LoaderFactory; import hep.dataforge.storage.commons.LoaderFactory;
import hep.dataforge.storage.commons.StorageManager; import hep.dataforge.storage.commons.StorageManager;
import hep.dataforge.storage.commons.StorageUtils; import hep.dataforge.storage.commons.StorageUtils;
@ -35,8 +34,8 @@ import ratpack.server.RatpackServer;
import java.io.IOException; import java.io.IOException;
import static hep.dataforge.io.messages.MessagesKt.errorResponseBase; import static hep.dataforge.messages.MessagesKt.errorResponseBase;
import static hep.dataforge.io.messages.MessagesKt.responseBase; import static hep.dataforge.messages.MessagesKt.responseBase;
/** /**
* @author darksnake * @author darksnake
@ -120,10 +119,10 @@ public class NumassServer extends AbstractNetworkListener implements ContextAwar
//switch message type //switch message type
String type = meta.getString("type", "numass.state"); String type = meta.getString("type", "numass.state");
switch (type) { switch (type) {
case "numass.storage": // case "numass.storage":
return getRun().getStorage().respond(message); // return getRun().getStorage().respond(message);
case "numass.state": // case "numass.state":
return getRootState().respond(message); // return getRootState().respond(message);
case "numass.data": case "numass.data":
case "numass.notes": case "numass.notes":
case "numass.run.state": case "numass.run.state":