Akka flow Input (`In`) as Output (`Out`)

0

I am trying to write a piece of code which does following:-

  1. Reads a large csv file from remote source like s3.
  2. Process the file record by record.
  3. Send notification to user
  4. Write the output to a remote location

Sample record in input csv:

recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000

My input case class which represents a record in input csv:

case class InputRecord(recordId: String, name: String, salary: Long)

Sample record in output csv (that needs to be written):

recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager

My output case class which represents a record in input csv:

case class OutputRecord(recordId: String, name: String, designation: String)

Reading a record using akka stream csv (uses Alpakka reactive s3 https://doc.akka.io/docs/alpakka/current/s3.html):

def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] = 
S3.download(s3Object.bucket, s3Object.path)
      .runWith(Sink.head)
// This is then converted to csv

Now I have a function to process the records:

def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer

Function to write the OutputRecord as csv

def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] = 
S3.multipartUpload(s3Object.bucket,
                       s3Object.path,
                       metaHeaders = MetaHeaders(Map())

Function to send email notification:

def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info

Stitching it all together

readAsCSV.flatMap { recordSource =>
  recordSource.map { record
    val outputRecord = process(record)
    outputRecord
  }
  .via(notify) //Error: Line 15
  .to(writeOutput) //Error: Line 16
  .run()
}

On Line 15 & 16 I am getting an error, I am either able to add Line 15 or Line 16 but not both since both notify & writeOutput needs outputRecord. Once notify is called I loose my outputRecord.

Is there a way I can add both notify and writeOutput to same graph?

I am not looking for parallel execution as I want to first call notify and then only writeOutput. So this is not helpful: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing

The use case seems very simple to me but some how I am not able to find a clean solution.

akka akka-stream alpakka amazon-s3
2021-11-23 22:36:54
1

1

The output of notify is a PushResult, but the input of writeOutput is ByteString. Once you change that it will compile. In case you need ByteString, get the same from OutputRecord.

BTW, in the sample code that you have provided, a similar error exists in readCSV and process.

2021-11-24 03:36:16

In other languages

This page is in other languages

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................