Akka Stream continuously consume websocket

0

Im kinda new to Scala and Akka Stream and im trying to get JSON String messages from a websocket and push them to a Kafka topic.

For now i am only working on the "get messages from a ws" part.

Messages coming from the websocket looks like this :

{  
   "bitcoin":"6389.06534240",
   "ethereum":"192.93111286",
   "monero":"108.90302506",
   "litecoin":"52.25484165"
}

I want to split this JSON message to multiple messages :

   {"coin": "bitcoin", "price": "6389.06534240"}
   {"coin": "ethereum", "price": "192.93111286"}
   {"coin": "monero", "price": "108.90302506"}
   {"coin": "litecoin", "price": "52.25484165"}

And then push each of these messages to a kafka topic.

Here's what i achieved so far :

val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
    msg => msg.toString.replaceAll("[{})(]", "").split(",")
  ).map( msg => {
    val splitted = msg.split(":")
    s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
  })

val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)

val flow: Flow[Message, Message, Promise[Option[Message]]] =
    Flow.fromSinkAndSourceMat(
      message_decomposition.to(sink),
      Source.maybe[Message])(Keep.right)

val (upgradeResponse, promise) = Http().singleWebSocketRequest(
      WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
      flow)

It's working im getting the expected output Json message but i was wondering if i could write this producer in a more "Akka-ish" style, like using GraphDSL. So i have a few questions :

  • Is it possible to continuously consume a WebSocket using a GraphDSL ? If yes, can you show me a example please ?
  • Is it a good idea to consume the WS using a GraphDSL ?
  • Should i decompose the received Json Message like im doing before sending it to kafka ? Or it's better to send it as it is for lower latency ?
  • After producing the message to Kafka, i am planning to consume it using Apache Storm, is it a good idea ? Or should i stick with Akka ?

Thanks for reading me, Regards, Arès

akka akka-stream apache-kafka scala
2021-11-20 14:01:02
1

1

That code is plenty Akka-ish: scaladsl is just as Akka as the GraphDSL or implementing a custom GraphStage. The only reason, IMO/E, to go to the GraphDSL is if the actual shape of the graph isn't readily expressible in the scaladsl.

I would personally go the route of defining a CoinPrice class to make the model explicit

case class CoinPrice(coin: String, price: BigDecimal)

And then have a Flow[Message, CoinPrice, NotUsed] which parses 1 incoming message into zero or more CoinPrices. Something (using Play JSON here) like:

val toCoinPrices =
  Flow[Message]
    .mapConcat { msg =>
      Json.parse(msg.toString)
        .asOpt[JsObject]
        .toList
        .flatMap { json =>
          json.underlying.flatMap { kv =>
            import scala.util.Try

            kv match {
              case (coin, JsString(priceStr)) =>
                Try(BigDecimal(priceStr)).toOption
                  .map(p => CoinPrice(coin, p))                

              case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
              case _ => None
            }
          }
        }
    }

You might, depending on what the size of the JSONs in the message are, want to break that into different stream stages to allow for an async boundary between the JSON parse and the extraction into CoinPrices. For example,

Flow[Message]
  .mapConcat { msg =>
    Json.parse(msg.toString).asOpt[JsObject].toList
  }
  .async
  .mapConcat { json =>
    json.underlying.flatMap { kv =>
      import scala.util.Try

      kv match {
        case (coin, JsString(priceStr)) =>
          Try(BigDecimal(priceStr)).toOption
            .map(p => CoinPrice(coin, p))

        case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
        case _ => None
      }
    }
  }

In the above, the stages on either side of the async boundary will execute in separate actors and thus, possibly concurrently (if there's enough CPU cores available etc.), at the cost of extra overhead for the actors to coordinate and exchange messages. That extra coordination/communication overhead (cf. Gunther's Universal Scalability Law) is only going to be worth it if the JSON objects are sufficiently large and coming in sufficiently quickly (consistently coming in before the previous one has finished processing).

If your intention is to consume the websocket until the program stops, you might find it clearer to just use Source.never[Message].

2021-11-21 12:42:30

Thanks for you answer, it's very clear, i just have one question tho. How can i break my response into different stream stages ? Can you just show me a little example please ? Or orient me to the proper part of the documentation ?
Arès

In other languages

This page is in other languages

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................