All tests running
This commit is contained in:
parent
ad2f5681b6
commit
ab47d7723e
@ -1,7 +1,10 @@
|
|||||||
package hep.dataforge.io
|
package hep.dataforge.io
|
||||||
|
|
||||||
import hep.dataforge.meta.*
|
import hep.dataforge.meta.*
|
||||||
import kotlinx.io.*
|
import kotlinx.io.ArrayBinary
|
||||||
|
import kotlinx.io.Binary
|
||||||
|
import kotlinx.io.ExperimentalIoApi
|
||||||
|
import kotlinx.io.Output
|
||||||
|
|
||||||
class EnvelopeBuilder {
|
class EnvelopeBuilder {
|
||||||
private val metaBuilder = MetaBuilder()
|
private val metaBuilder = MetaBuilder()
|
||||||
@ -26,9 +29,7 @@ class EnvelopeBuilder {
|
|||||||
*/
|
*/
|
||||||
@ExperimentalIoApi
|
@ExperimentalIoApi
|
||||||
fun data(block: Output.() -> Unit) {
|
fun data(block: Output.() -> Unit) {
|
||||||
val bytes = buildBytes(builder = block)
|
data = ArrayBinary.write(builder = block)
|
||||||
data = bytes.toByteArray().asBinary() //save data to byte array so
|
|
||||||
bytes.close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
internal fun build() = SimpleEnvelope(metaBuilder.seal(), data)
|
internal fun build() = SimpleEnvelope(metaBuilder.seal(), data)
|
||||||
|
@ -21,7 +21,7 @@ object EnvelopeParts {
|
|||||||
val FORMAT_NAME_KEY = Envelope.ENVELOPE_NODE_KEY + MULTIPART_KEY + "format"
|
val FORMAT_NAME_KEY = Envelope.ENVELOPE_NODE_KEY + MULTIPART_KEY + "format"
|
||||||
val FORMAT_META_KEY = Envelope.ENVELOPE_NODE_KEY + MULTIPART_KEY + "meta"
|
val FORMAT_META_KEY = Envelope.ENVELOPE_NODE_KEY + MULTIPART_KEY + "meta"
|
||||||
|
|
||||||
const val MULTIPART_DATA_SEPARATOR = "#~PART~#\r\n"
|
const val MULTIPART_DATA_SEPARATOR = "\r\n#~PART~#\r\n"
|
||||||
|
|
||||||
const val MULTIPART_DATA_TYPE = "envelope.multipart"
|
const val MULTIPART_DATA_TYPE = "envelope.multipart"
|
||||||
}
|
}
|
||||||
@ -113,7 +113,7 @@ fun Envelope.parts(io: IOPlugin = Global.plugins.fetch(IOPlugin)): Sequence<Enve
|
|||||||
sequence {
|
sequence {
|
||||||
repeat(size) {
|
repeat(size) {
|
||||||
val separator = readRawString(MULTIPART_DATA_SEPARATOR.length)
|
val separator = readRawString(MULTIPART_DATA_SEPARATOR.length)
|
||||||
if(separator!= MULTIPART_DATA_SEPARATOR) error("Separator is expected")
|
if(separator!= MULTIPART_DATA_SEPARATOR) error("Separator is expected, but $separator found")
|
||||||
yield(readObject())
|
yield(readObject())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -106,9 +106,9 @@ class TaglessEnvelopeFormat(
|
|||||||
readArray(bytes)
|
readArray(bytes)
|
||||||
bytes.asBinary()
|
bytes.asBinary()
|
||||||
} else {
|
} else {
|
||||||
buildBytes {
|
ArrayBinary.write {
|
||||||
writeInput(this@readObject)
|
writeInput(this@readObject)
|
||||||
}.toByteArray().asBinary()
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return SimpleEnvelope(meta, data)
|
return SimpleEnvelope(meta, data)
|
||||||
|
@ -3,6 +3,7 @@ package hep.dataforge.io
|
|||||||
import hep.dataforge.meta.DFExperimental
|
import hep.dataforge.meta.DFExperimental
|
||||||
import hep.dataforge.meta.get
|
import hep.dataforge.meta.get
|
||||||
import hep.dataforge.meta.int
|
import hep.dataforge.meta.int
|
||||||
|
import kotlinx.io.text.writeRawString
|
||||||
import kotlinx.io.text.writeUtf8String
|
import kotlinx.io.text.writeUtf8String
|
||||||
|
|
||||||
import kotlin.test.Test
|
import kotlin.test.Test
|
||||||
@ -18,9 +19,9 @@ class MultipartTest {
|
|||||||
}
|
}
|
||||||
data {
|
data {
|
||||||
writeUtf8String("Hello World $it")
|
writeUtf8String("Hello World $it")
|
||||||
// repeat(2000) {
|
repeat(300) {
|
||||||
// writeInt(it)
|
writeRawString("$it ")
|
||||||
// }
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -38,7 +38,7 @@ class EnvelopeClient(
|
|||||||
suspend fun close() {
|
suspend fun close() {
|
||||||
try {
|
try {
|
||||||
respond(
|
respond(
|
||||||
Envelope.invoke {
|
Envelope {
|
||||||
type = EnvelopeServer.SHUTDOWN_ENVELOPE_TYPE
|
type = EnvelopeServer.SHUTDOWN_ENVELOPE_TYPE
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -78,6 +78,9 @@ class EnvelopeServer(
|
|||||||
logger.debug { "Accepted request with type ${request.type} from ${socket.remoteSocketAddress}" }
|
logger.debug { "Accepted request with type ${request.type} from ${socket.remoteSocketAddress}" }
|
||||||
if (request.type == SHUTDOWN_ENVELOPE_TYPE) {
|
if (request.type == SHUTDOWN_ENVELOPE_TYPE) {
|
||||||
//Echo shutdown command
|
//Echo shutdown command
|
||||||
|
outputStream.write{
|
||||||
|
writeObject(request)
|
||||||
|
}
|
||||||
logger.info { "Accepted graceful shutdown signal from ${socket.inetAddress}" }
|
logger.info { "Accepted graceful shutdown signal from ${socket.inetAddress}" }
|
||||||
socket.close()
|
socket.close()
|
||||||
return@thread
|
return@thread
|
||||||
|
Loading…
Reference in New Issue
Block a user