Compare commits

..

No commits in common. "eb126a60905ced9e30745f4a4e2b3d4f177ee548" and "60a693b1b3eaebc0ca9a2e43dbd2b7cbd096a418" have entirely different histories.

8 changed files with 132 additions and 275 deletions

View File

@ -73,7 +73,7 @@ public data class PropertySetMessage(
public val property: String,
public val value: Meta,
override val sourceDevice: Name? = null,
override val targetDevice: Name?,
override val targetDevice: Name,
override val comment: String? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {

View File

@ -68,7 +68,7 @@ public suspend fun Device.respondMessage(deviceTarget: Name, request: DeviceMess
/**
* Process incoming [DeviceMessage], using hub naming to find target.
* If the `targetDevice` is `null`, then the message is sent to each device in this hub
* If the `targetDevice` is `null`, then message is sent to each device in this hub
*/
public suspend fun DeviceHub.respondHubMessage(request: DeviceMessage): List<DeviceMessage> {
return try {

View File

@ -21,7 +21,7 @@ public interface PeerConnection {
address: String,
contentId: String,
requestMeta: Meta = Meta.EMPTY,
): Envelope?
): Envelope
/**
* Send an [envelope] to a device on a given [address]

View File

@ -23,11 +23,7 @@ class CollectiveDeviceConfiguration(deviceId: CollectiveDeviceId) : Scheme() {
var deviceId by string(deviceId)
var description by string()
var reportInterval by int(500)
var radioFrequency by string(default = DEFAULT_FREQUENCY)
companion object {
const val DEFAULT_FREQUENCY = "169 MHz"
}
var radioFrequency by string(default = "169 MHz")
}
typealias CollectiveDeviceRoster = Map<CollectiveDeviceId, CollectiveDeviceConfiguration>
@ -85,12 +81,12 @@ class CollectiveDeviceConstructor(
val position = registerAsProperty(
CollectiveDevice.position,
position.debounce(configuration.reportInterval.milliseconds)
position.sample(configuration.reportInterval.milliseconds)
)
val velocity = registerAsProperty(
CollectiveDevice.velocity,
velocity.debounce(configuration.reportInterval.milliseconds)
velocity.sample(configuration.reportInterval.milliseconds)
)
private val _visibleNeighbors: MutableDeviceState<Collection<CollectiveDeviceId>> = stateOf(emptyList())
@ -115,4 +111,4 @@ class CollectiveDeviceConstructor(
}
override suspend fun listVisible(): Collection<CollectiveDeviceId> = observation.invoke().keys
}
}

View File

@ -1,53 +1,27 @@
package space.kscience.controls.demo.collective
import kotlinx.coroutines.*
import kotlinx.io.writeString
import kotlinx.serialization.json.Json
import space.kscience.controls.api.DeviceMessage
import space.kscience.controls.api.PropertySetMessage
import space.kscience.controls.client.DeviceClient
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import space.kscience.controls.client.launchMagixService
import space.kscience.controls.client.write
import space.kscience.controls.constructor.DeviceState
import space.kscience.controls.constructor.ModelConstructor
import space.kscience.controls.constructor.MutableDeviceState
import space.kscience.controls.constructor.onTimer
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.install
import space.kscience.controls.manager.respondMessage
import space.kscience.controls.peer.PeerConnection
import space.kscience.controls.spec.name
import space.kscience.controls.spec.write
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.request
import space.kscience.dataforge.io.Envelope
import space.kscience.dataforge.io.toByteArray
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.parseAsName
import space.kscience.kmath.geometry.degrees
import space.kscience.kmath.geometry.radians
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.rsocket.rSocketWithWebSockets
import space.kscience.magix.server.startMagixServer
import space.kscience.maps.coordinates.*
import kotlin.math.PI
import kotlin.random.Random
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
private val deviceVelocity = 0.1.kilometers
private val center = Gmc.ofDegrees(55.925, 37.514)
private val radius = 0.01.degrees
private val json = Json {
ignoreUnknownKeys = true
prettyPrint = true
}
internal data class CollectiveDeviceState(
val id: CollectiveDeviceId,
val configuration: CollectiveDeviceConfiguration,
@ -55,7 +29,7 @@ internal data class CollectiveDeviceState(
val velocity: MutableDeviceState<GmcVelocity>,
)
internal fun CollectiveDeviceState(
internal fun VirtualDeviceState(
id: CollectiveDeviceId,
position: Gmc,
configuration: CollectiveDeviceConfiguration.() -> Unit = {},
@ -66,12 +40,13 @@ internal fun CollectiveDeviceState(
MutableDeviceState(GmcVelocity.zero)
)
internal class DeviceCollectiveModel(
context: Context,
val deviceStates: Collection<CollectiveDeviceState>,
val visibilityRange: Distance = 0.5.kilometers,
val radioRange: Distance = 1.kilometers,
) : ModelConstructor(context) {
val radioRange: Distance = 5.kilometers,
) : ModelConstructor(context), PeerConnection {
/**
* Propagate movement
@ -95,88 +70,33 @@ internal class DeviceCollectiveModel(
return allCurves.filterValues { it.distance in 0.kilometers..visibilityRange }
}
inner class RadioPeerConnectionModel(private val position: DeviceState<Gmc>) : PeerConnection {
override suspend fun receive(address: String, contentId: String, requestMeta: Meta): Envelope? = null
override suspend fun send(address: String, envelope: Envelope, requestMeta: Meta) {
devices.values.filter { it.configuration.radioFrequency == address }.filter {
GeoEllipsoid.WGS84.curveBetween(position.value, it.position.value).distance < radioRange
}.forEach { target ->
check(envelope.data != null) { "Envelope data is empty" }
val message = json.decodeFromString(
DeviceMessage.serializer(),
envelope.data?.toByteArray()?.decodeToString() ?: ""
)
target.respondMessage(target.configuration.deviceId.parseAsName(), message)
}
}
}
val devices = deviceStates.associate { state ->
val devices = deviceStates.associate {
val device = CollectiveDeviceConstructor(
context = context,
configuration = state.configuration,
position = state.position,
velocity = state.velocity,
peerConnection = RadioPeerConnectionModel(state.position),
configuration = it.configuration,
position = it.position,
velocity = it.velocity,
peerConnection = this,
) {
locateVisible(state.id)
locateVisible(it.id)
}
state.id to device
}
internal fun createTrawler(position: Gmc, id: CollectiveDeviceId = "trawler"): CollectiveDeviceConstructor {
val state = CollectiveDeviceState(
id = id,
configuration = CollectiveDeviceConfiguration(id),
position = MutableDeviceState(position),
velocity = MutableDeviceState(GmcVelocity.zero)
)
val result = CollectiveDeviceConstructor(
context = context,
configuration = state.configuration,
position = state.position,
velocity = state.velocity,
peerConnection = RadioPeerConnectionModel(state.position),
) {
locateVisible(state.id)
}
// TODO move to CollectiveDeviceState
onTimer { prev, next ->
val delta = (next - prev)
state.position.value = state.position.value.moveWith(state.velocity.value, delta)
}
result.onTimer(1.seconds) { _, _ ->
val envelope = Envelope {
data {
writeString(
json.encodeToString(
DeviceMessage.serializer(),
PropertySetMessage(
property = CollectiveDevice.velocity.name,
value = gmcVelocityMetaConverter.convert(state.velocity.value),
targetDevice = null
)
)
)
}
}
result.peerConnection.send(
CollectiveDeviceConfiguration.DEFAULT_FREQUENCY,
envelope
)
}
return result
//start movement program
device.moveInCircles()
it.id to device
}
val roster = deviceStates.associate { it.id to it.configuration }
}
override suspend fun receive(address: String, contentId: String, requestMeta: Meta): Envelope {
TODO("Not yet implemented")
}
override suspend fun send(address: String, envelope: Envelope, requestMeta: Meta) {
// devices.values.filter { it.configuration.radioFrequency == address }.forEach { device ->
// ```
// }
}
}
internal fun CoroutineScope.launchCollectiveMagixServer(
collectiveModel: DeviceCollectiveModel,
@ -198,58 +118,4 @@ internal fun CoroutineScope.launchCollectiveMagixServer(
deviceContext.request(DeviceManager).launchMagixService(deviceEndpoint, id)
}
}
internal fun generateModel(
context: Context,
size: Int = 50,
reportInterval: Duration = 500.milliseconds,
additionalConfiguration: CollectiveDeviceConfiguration.() -> Unit = {},
): DeviceCollectiveModel {
val devices: List<CollectiveDeviceState> = List(size) { index ->
val id = "device[$index]"
CollectiveDeviceState(
id = id,
Gmc(
center.latitude + radius * Random.nextDouble(),
center.longitude + radius * Random.nextDouble()
)
) {
deviceId = id
description = "Virtual remote device $id"
this.reportInterval = reportInterval.inWholeMilliseconds.toInt()
additionalConfiguration()
}
}
val model = DeviceCollectiveModel(context, devices)
return model
}
fun DeviceClient.moveInCircles(scope: CoroutineScope = this): Job = scope.launch {
var bearing = Random.nextDouble(-PI, PI).radians
write(CollectiveDevice.velocity, GmcVelocity(bearing, deviceVelocity))
while (isActive) {
delay(500)
bearing += 5.degrees
write(CollectiveDevice.velocity, GmcVelocity(bearing, deviceVelocity))
}
}
internal fun CollectiveDeviceConstructor.moveTo(
targetPosition: Gmc,
speedLimit: Distance = deviceVelocity,
scope: CoroutineScope = this,
): Job = scope.launch {
do {
val curve = GeoEllipsoid.WGS84.curveBetween(position.value, targetPosition)
write(CollectiveDevice.velocity, GmcVelocity(curve.forward.bearing, speedLimit))
delay(1.seconds)
} while (curve.distance > 0.1.kilometers)
write(CollectiveDevice.velocity, GmcVelocity.zero)
}

View File

@ -2,28 +2,27 @@ package space.kscience.controls.demo.collective
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.sample
import space.kscience.controls.constructor.DeviceState
import space.kscience.controls.constructor.MutableDeviceState
import kotlin.time.Duration
@OptIn(FlowPreview::class)
class DebounceDeviceState<T>(
class SampleDeviceState<T>(
val origin: DeviceState<T>,
val interval: Duration,
) : DeviceState<T> {
override val value: T by origin::value
override val valueFlow: Flow<T> get() = origin.valueFlow.debounce(interval)
override val valueFlow: Flow<T> get() = origin.valueFlow.sample(interval)
override fun toString(): String = "DebounceDeviceState($value, interval=$interval)"
}
fun <T> DeviceState<T>.debounce(interval: Duration) = DebounceDeviceState(this, interval)
fun <T> DeviceState<T>.sample(interval: Duration) = SampleDeviceState(this, interval)
@OptIn(FlowPreview::class)
class MutableDebounceDeviceState<T>(
class MutableSampleDeviceState<T>(
val origin: MutableDeviceState<T>,
val interval: Duration,
) : MutableDeviceState<T> {
@ -33,4 +32,4 @@ class MutableDebounceDeviceState<T>(
override fun toString(): String = "DebounceDeviceState($value, interval=$interval)"
}
fun <T> MutableDeviceState<T>.debounce(interval: Duration) = MutableDebounceDeviceState(this, interval)
fun <T> MutableDeviceState<T>.sample(interval: Duration) = MutableSampleDeviceState(this, interval)

View File

@ -0,0 +1,60 @@
package space.kscience.controls.demo.collective
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import space.kscience.controls.spec.write
import space.kscience.dataforge.context.Context
import space.kscience.kmath.geometry.degrees
import space.kscience.kmath.geometry.radians
import space.kscience.maps.coordinates.Gmc
import space.kscience.maps.coordinates.kilometers
import kotlin.math.PI
import kotlin.random.Random
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
private val deviceVelocity = 0.1.kilometers
private val center = Gmc.ofDegrees(55.925, 37.514)
private val radius = 0.01.degrees
internal fun generateModel(
context: Context,
size: Int = 50,
reportInterval: Duration = 500.milliseconds,
additionalConfiguration: CollectiveDeviceConfiguration.() -> Unit = {},
): DeviceCollectiveModel {
val devices: List<CollectiveDeviceState> = List(size) { index ->
val id = "device[$index]"
VirtualDeviceState(
id = id,
Gmc(
center.latitude + radius * Random.nextDouble(),
center.longitude + radius * Random.nextDouble()
)
) {
deviceId = id
description = "Virtual remote device $id"
this.reportInterval = reportInterval.inWholeMilliseconds.toInt()
additionalConfiguration()
}
}
val model = DeviceCollectiveModel(context, devices)
return model
}
fun CollectiveDevice.moveInCircles(): Job = launch {
var bearing = Random.nextDouble(-PI, PI).radians
write(CollectiveDevice.velocity, GmcVelocity(bearing, deviceVelocity))
while (isActive) {
delay(500)
bearing += 5.degrees
write(CollectiveDevice.velocity, GmcVelocity(bearing, deviceVelocity))
}
}

View File

@ -7,26 +7,30 @@ import androidx.compose.foundation.layout.Column
import androidx.compose.foundation.layout.Row
import androidx.compose.foundation.layout.fillMaxWidth
import androidx.compose.foundation.layout.padding
import androidx.compose.material.*
import androidx.compose.material.Card
import androidx.compose.material.Checkbox
import androidx.compose.material.Text
import androidx.compose.material3.CircularProgressIndicator
import androidx.compose.material3.MaterialTheme
import androidx.compose.runtime.*
import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier
import androidx.compose.ui.graphics.Color
import androidx.compose.ui.input.pointer.isSecondaryPressed
import androidx.compose.ui.res.painterResource
import androidx.compose.ui.text.font.FontWeight
import androidx.compose.ui.unit.DpSize
import androidx.compose.ui.unit.dp
import androidx.compose.ui.unit.sp
import androidx.compose.ui.window.Window
import androidx.compose.ui.window.application
import io.ktor.client.HttpClient
import io.ktor.client.engine.cio.CIO
import kotlinx.coroutines.*
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.sample
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.jetbrains.compose.splitpane.ExperimentalSplitPaneApi
import org.jetbrains.compose.splitpane.HorizontalSplitPane
import org.jetbrains.compose.splitpane.rememberSplitPaneState
@ -47,7 +51,6 @@ import space.kscience.maps.coordinates.Gmc
import space.kscience.maps.coordinates.meters
import space.kscience.maps.features.*
import java.nio.file.Path
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
@ -56,8 +59,7 @@ fun rememberContext(name: String, contextBuilder: ContextBuilder.() -> Unit = {}
Context(name, contextBuilder)
}
internal val gmcMetaConverter = MetaConverter.serializable<Gmc>()
internal val gmcVelocityMetaConverter = MetaConverter.serializable<GmcVelocity>()
private val gmcMetaConverter = MetaConverter.serializable<Gmc>()
@Composable
fun App() {
@ -79,6 +81,7 @@ fun App() {
val devices = remember { mutableStateMapOf<CollectiveDeviceId, DeviceClient>() }
LaunchedEffect(collectiveModel) {
launchCollectiveMagixServer(collectiveModel)
@ -89,8 +92,7 @@ fun App() {
collectiveModel.roster.forEach { (id, config) ->
scope.launch {
val deviceClient = magixClient.remoteDevice(parentContext, "listener", id, id.parseAsName())
devices[id] = deviceClient
devices[id] = magixClient.remoteDevice(parentContext, "listener", id, id.parseAsName())
}
}
}
@ -109,29 +111,10 @@ fun App() {
var showOnlyVisible by remember { mutableStateOf(false) }
var movementProgram: Job? by remember { mutableStateOf(null) }
val trawler: CollectiveDeviceConstructor = remember {
collectiveModel.createTrawler(Gmc.ofDegrees(55.925, 37.50))
}
HorizontalSplitPane(
splitPaneState = rememberSplitPaneState(0.9f)
) {
first(400.dp) {
var clickPoint by remember { mutableStateOf<Gmc?>(null) }
CursorDropdownMenu(clickPoint != null, { clickPoint = null }) {
clickPoint?.let { point ->
TextButton({
trawler.moveTo(point)
clickPoint = null
}) {
Text("Move trawler here")
}
}
}
MapView(
mapTileProvider = remember {
OpenStreetMapTileProvider(
@ -139,64 +122,45 @@ fun App() {
cacheDirectory = Path.of("mapCache")
)
},
config = ViewConfig(
onClick = { event, point ->
if (event.buttons.isSecondaryPressed) {
clickPoint = point.focus
}
}
)
config = ViewConfig()
) {
//draw real positions
collectiveModel.deviceStates.forEach { device ->
circle(device.position.value, id = device.id + ".position").color(Color.Red)
device.position.valueFlow.sample(50.milliseconds).onEach {
val activeDevice = selectedDeviceId?.let { devices[it] }
val color = if (selectedDeviceId == device.id) {
Color.Magenta
} else if (
showOnlyVisible &&
activeDevice != null &&
device.id in activeDevice.request(CollectiveDevice.visibleNeighbors)
) {
Color.Cyan
} else {
Color.Red
}
circle(
device.position.value,
id = device.id + ".position",
size = if (selectedDeviceId == device.id) 6.dp else 3.dp
)
.color(color)
.modifyAttribute(ZAttribute, 10f)
.modifyAttribute(AlphaAttribute, if (selectedDeviceId == device.id) 1f else 0.5f)
device.position.valueFlow.onEach {
circle(device.position.value, id = device.id + ".position", size = 3.dp)
.color(Color.Red)
.modifyAttribute(AlphaAttribute, 0.5f) // does not work right now
}.launchIn(scope)
}
//draw received data
scope.launch {
client.await().subscribe(DeviceManager.magixFormat).onEach { (magixMessage, deviceMessage) ->
if (deviceMessage is PropertyChangedMessage && deviceMessage.property == "position") {
val id = magixMessage.sourceEndpoint
val position = gmcMetaConverter.read(deviceMessage.value)
val activeDevice = selectedDeviceId?.let { devices[it] }
rectangle(
position,
id = id,
).color(Color.Blue).onClick { selectedDeviceId = id }
if (
activeDevice == null ||
id == selectedDeviceId ||
!showOnlyVisible ||
id in activeDevice.request(CollectiveDevice.visibleNeighbors)
) {
rectangle(
position,
id = id,
size = if (selectedDeviceId == id) DpSize(10.dp, 10.dp) else DpSize(5.dp, 5.dp)
).color(if (selectedDeviceId == id) Color.Magenta else Color.Blue)
.modifyAttribute(AlphaAttribute, if (selectedDeviceId == id) 1f else 0.5f)
.onClick { selectedDeviceId = id }
} else {
removeFeature(id)
}
}
}.launchIn(scope)
}
// draw trawler
trawler.position.valueFlow.onEach {
circle(it, id = "trawler").color(Color.Black)
}.launchIn(scope)
}
}
second(200.dp) {
@ -204,34 +168,6 @@ fun App() {
Column(
modifier = Modifier.verticalScroll(rememberScrollState())
) {
Button(
onClick = {
if (movementProgram == null) {
//start movement program
movementProgram = parentContext.launch {
devices.values.forEach { device ->
device.moveInCircles(this)
}
}
} else {
movementProgram?.cancel()
parentContext.launch {
devices.values.forEach { device ->
device.write(CollectiveDevice.velocity, GmcVelocity.zero)
}
}
movementProgram = null
}
},
modifier = Modifier.fillMaxWidth()
) {
if (movementProgram == null) {
Text("Move")
} else {
Text("Stop")
}
}
collectiveModel.roster.forEach { (id, _) ->
Card(
elevation = 16.dp,