Im kinda new to Scala and Akka Stream and im trying to get JSON String messages from a websocket and push them to a Kafka topic.
For now i am only working on the "get messages from a ws" part.
Messages coming from the websocket looks like this :
{
"bitcoin":"6389.06534240",
"ethereum":"192.93111286",
"monero":"108.90302506",
"litecoin":"52.25484165"
}
I want to split this JSON message to multiple messages :
{"coin": "bitcoin", "price": "6389.06534240"}
{"coin": "ethereum", "price": "192.93111286"}
{"coin": "monero", "price": "108.90302506"}
{"coin": "litecoin", "price": "52.25484165"}
And then push each of these messages to a kafka topic.
Here's what i achieved so far :
val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
msg => msg.toString.replaceAll("[{})(]", "").split(",")
).map( msg => {
val splitted = msg.split(":")
s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
})
val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
message_decomposition.to(sink),
Source.maybe[Message])(Keep.right)
val (upgradeResponse, promise) = Http().singleWebSocketRequest(
WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
flow)
It's working im getting the expected output Json message but i was wondering if i could write this producer in a more "Akka-ish" style, like using GraphDSL. So i have a few questions :
- Is it possible to continuously consume a WebSocket using a GraphDSL ? If yes, can you show me a example please ?
- Is it a good idea to consume the WS using a GraphDSL ?
- Should i decompose the received Json Message like im doing before sending it to kafka ? Or it's better to send it as it is for lower latency ?
- After producing the message to Kafka, i am planning to consume it using Apache Storm, is it a good idea ? Or should i stick with Akka ?
Thanks for reading me, Regards, Arès