Finish migration to kotlinx-io

This commit is contained in:
Alexander Nozik 2023-12-13 20:20:03 +03:00
parent 606c2cf5b1
commit a12cf440e8
3 changed files with 14 additions and 10 deletions

View File

@ -13,7 +13,7 @@ val xodusVersion by extra("2.0.1")
allprojects { allprojects {
group = "space.kscience" group = "space.kscience"
version = "0.3.0-dev-3" version = "0.3.0-dev-4"
repositories{ repositories{
maven("https://maven.pkg.jetbrains.space/spc/p/sci/dev") maven("https://maven.pkg.jetbrains.space/spc/p/sci/dev")
} }

View File

@ -51,7 +51,7 @@ public class ChannelPort(
} }
if (num < 0) cancel("The input channel is exhausted") if (num < 0) cancel("The input channel is exhausted")
} catch (ex: Exception) { } catch (ex: Exception) {
if(ex is AsynchronousCloseException){ if (ex is AsynchronousCloseException) {
logger.info { "Channel $channel closed" } logger.info { "Channel $channel closed" }
} else { } else {
logger.error(ex) { "Channel read error, retrying in 1 second" } logger.error(ex) { "Channel read error, retrying in 1 second" }
@ -108,7 +108,7 @@ public object UdpPort : PortFactory {
/** /**
* Connect a datagram channel to a remote host/port. If [localPort] is provided, it is used to bind local port for receiving messages. * Connect a datagram channel to a remote host/port. If [localPort] is provided, it is used to bind local port for receiving messages.
*/ */
public fun open( public fun openChannel(
context: Context, context: Context,
remoteHost: String, remoteHost: String,
remotePort: Int, remotePort: Int,
@ -130,6 +130,6 @@ public object UdpPort : PortFactory {
val remotePort by meta.number { error("Remote port is not specified") } val remotePort by meta.number { error("Remote port is not specified") }
val localHost: String? by meta.string() val localHost: String? by meta.string()
val localPort: Int? by meta.int() val localPort: Int? by meta.int()
return open(context, remoteHost, remotePort.toInt(), localPort, localHost ?: "localhost") return openChannel(context, remoteHost, remotePort.toInt(), localPort, localHost ?: "localhost")
} }
} }

View File

@ -1,6 +1,10 @@
package space.kscience.controls.ports package space.kscience.controls.ports
import kotlinx.coroutines.flow.* import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
@ -25,19 +29,19 @@ internal class PortIOTest {
@Test @Test
fun testUdpCommunication() = runTest { fun testUdpCommunication() = runTest {
val receiver = UdpPort.open(Global, "localhost", 8811, localPort = 8812) val receiver = UdpPort.openChannel(Global, "localhost", 8811, localPort = 8812)
val sender = UdpPort.open(Global, "localhost", 8812, localPort = 8811) val sender = UdpPort.openChannel(Global, "localhost", 8812, localPort = 8811)
delay(30)
repeat(10) { repeat(10) {
sender.send("Line number $it\n") sender.send("Line number $it\n")
} }
val res = receiver val res = receiver
.receiving() .receiving()
.onEach { println("ARRAY: ${it.decodeToString()}") }
.withStringDelimiter("\n") .withStringDelimiter("\n")
.onEach { println("LINE: $it") } .take(10)
.take(10).toList() .toList()
assertEquals("Line number 3", res[3].trim()) assertEquals("Line number 3", res[3].trim())
receiver.close() receiver.close()