import io.ktor.client.HttpClient import io.ktor.client.plugins.websocket.WebSockets import io.ktor.client.plugins.websocket.webSocket import io.ktor.websocket.Frame import io.ktor.websocket.readText import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred import kotlinx.coroutines.flow.* import kotlinx.coroutines.launch import java.time.Instant import io.ktor.client.engine.cio.CIO as ClientCIO suspend fun CoroutineScope.aggregateFromService(url: String): List { val client = HttpClient(ClientCIO) { install(WebSockets) } val result = CompletableDeferred>() launch { client.webSocket(url) { val res = incoming.receiveAsFlow() .filterIsInstance() .take(3) .map { Instant.parse(it.readText()) } .toList() result.complete(res) } } return result.await() } //suspend fun aggregateFromServiceAsync(url: String): Deferred> { // val client = HttpClient(ClientCIO) { // install(WebSockets) // } // // val result = CompletableDeferred>() // // client.webSocket(url) { // val res = incoming.consumeAsFlow() // .filterIsInstance() // .take(3) // .map { Instant.parse(it.readText()) } // .toList() // // result.complete(res) // } // // return result //}