Computer Science
Algorithm
Data Processing
Digital Life
Distributed System
Distributed System Infrastructure
Machine Learning
Operating System
Android
Linux
Tizen
Windows
iOS
Programming Language
C++
Erlang
Go
Javascript
Scala
Compare Task Processing Approaches in Scala (2023)
Scheme
Type System
Software Engineering
Storage
Virtualization
Life
Life in Guangzhou (2013)
Recent Works (2013)
东京之旅 (2014)
My 2017 Year in Review (2018)
My 2020 in Review (2021)
十三年前被隔离的经历 (2022)
A Travel to Montreal (2022)
My 2022 in Review (2023)
Projects
Bard
Blog
RSS Brain
Scala2grpc
Comment Everywhere (2013)
Fetch Popular Erlang Modules by Coffee Script (2013)
Psychology
耶鲁大学心理学导论 (2012)
Thoughts
Chinese
English

Compare Task Processing Approaches in Scala

Posted on 27 Aug 2023, tagged Scalaconcurrentcatscats-effectfs2queuestream

All the source code mentioned in this blog can be found in my Github repo.

Task Processing

There is a common problem in computer science and I’ve met it again recently: how to generate and process tasks efficiently? Use my recent project RSS Brain as an example: it needs to find the RSS feeds that haven’t been updated for a while in a database, and fetch the newest data from network.

The easiest way to do it is producing and consuming the tasks in a sequence, for example:

val feeds = getPendingFeeds() // produce the tasks
feeds.foreach(fetchFromNetwork) // consume the dtasks

However, it is unnecessarily slow. Network request doesn’t take lots of CPU and we can send multiple requests at the same time. Even if fetchFromNetwork is a CPU bound task, it can be parallelized if there are multiple CPU cores on a machine.

In this article, we will explore ways to do it more efficiently with Cats Effect and FS2 in a functional programming fashion.

You may wonder why not using AKKA stream? Other than it’s using a different programming paradigm (not functional programming), it’s also because AKKA has changed its license with a ridiculous price.

Introducing Cats Effect and FS2

To make processTask async, there is Future in Scala’s standard library. However, the side effect will happen when you create a Future instance. For example:

def processTask(task: Task): Future[Unit] = Future(println(task))

val runTask1 = processTask(task1) // this will start the async task

I assume the readers have a basic understanding of functional programming, so I’ll not explain why we want to avoid side effects. While Scala is not a pure functional language, a popular Scala library Cats Effect provides convenient ways to wrap side effects. With the help of its IO type, we can define an async task like this:

def processTask(task: Task): IO[Unit] = IO(println(task))

// this will not start the task, so no side effect
val runTask1 = processTask(task1)

// out of pure functional world and starts the side effect
runTask1.unsafeRunSync()

Then there is fs2 that is a stream library that can be used with cats effect. It will be very handy when resolving our problem as we can see later.

Cat Effect has some big changes in version 3.x. In this article, we are using version 2.x. But I may upgrade the version in the future.

Testing Setup

In order to test which approach is the best under different scenarios, we need some basic setup. In TestRunner.scala, I defined some functions to generate tasks. Here are their signatures:

// Produce a sequence of tasks represented by `Int`
def produce(start: Int, end: Int): IO[Seq[Int]]

// Process a task
def consume(x: Int): IO[Unit]

// Produce tasks as a stream
def def produceStream(start: Int, end: Double): fs2.Stream[IO, Int]

produce simply produces tasks as int, and consume just print characters. In each of the functions, I use IO.sleep to create some delay to simulate the real world non-blocking IO. They also print characters P (produce) or C (consume) (based on the width of terminal, some of the C outputs may be skipped to fit the width) when being invoked, so that we can have an intuitive view of how quick tasks are produced and consumed.

Then there is TestConfig.scala for configuring the test:

trait TestConfig {
  val testName: String
  val produceDelay: FiniteDuration
  val minConsumeDelayMillis: Long
  val maxConsumeDelayMillis: Long
  val batchSize = 100  // consume batch size
  val totalSize = 1000 // how many tasks to generate
}

By setting up produce and consume delays, we can test scenarios when producer is slower, consumer is slower, or producer and consumer speed is almost the same. Here are the configurations we are going to use in Main.scala

val configs = Seq(
  new TestConfig {
    override val testName: String = "slow-producer"
    override val produceDelay: FiniteDuration = 1000.millis
    override val minConsumeDelayMillis: Long = 10
    override val maxConsumeDelayMillis: Long = 100
  },
  new TestConfig {
    override val testName: String = "balanced"
    override val produceDelay: FiniteDuration = 1005.millis
    override val minConsumeDelayMillis: Long = 10
    override val maxConsumeDelayMillis: Long = 2000
  },
  new TestConfig {
    override val testName: String = "slow-consumer"
    override val produceDelay: FiniteDuration = 10.millis
    override val minConsumeDelayMillis: Long = 10
    override val maxConsumeDelayMillis: Long = 1000
  }
)

Approach 1: Batch Consuming

The first approach is to make the consuming side parallel. We can consume a batch of tasks concurrently, like in BatchIOApp.scala.

def loop(start: Int): IO[Unit] = {
  if (start >= config.totalSize) {
    IO.unit
  } else {
    produce(start, start + config.batchSize)
      .flatMap{_.map(consume).parSequence}
      .flatMap(_ => loop(start + config.batchSize))
  }
}

However, this only makes a batch of tasks run in parallel. It needs to wait the whole batch to be finished in order to start next batch. This is very obvious when we run this approach and see the output of characters (download Github repo and run sbt "run -n BatchIOApp"). See how it paused after each batch even when consumer is slower than producer:

Approach 2: Use Blocking Queue to Buffer Tasks

We need a way to let producers not waiting for consumers, and also let consumers not wait for a batch to finish in order to start next batch. A very common solution is to use a queue between producers and consumers. Producers put tasks into the queue, and consumers get tasks for the queue. If the queue is thread safe, then both producers and consumers can work on their own without care about each other. In order to not let producer put unlimited tasks into the queue to blowup the memory, we need the queue to have a capacity. When the queue is full, the producer should be blocked. And when the queue is empty, the consumers should be blocked as well.

In Java, BlockingQueue meets our requirements. We can use an implementation LinkedBlockingQueue. However, BlockingQueue will block the whole thread instead of a single IO. Let’s not worry about it for now and see how to use a queue to implement producing and consuming in parallel. The implementation is in BlockingQueueApp.scala:


val queue = new LinkedBlockingQueue[Option[Int]](config.batchSize * 2)

override def work(): IO[Unit] = {
  Seq(
    (produceStream(0).map(Some(_)) ++ fs2.Stream.emit(None))
			.evalMap(x => IO(queue.put(x))).compile.drain,
    dequeueStream().unNoneTerminate.parEvalMap(config.batchSize)(consume).compile.drain,
  ).parSequence.map(_ => ())
}

private def dequeueStream(): fs2.Stream[IO, Option[Int]] = {
  fs2.Stream.eval(IO(queue.take())) ++ dequeueStream()
}

Here we have two IOs run in parallel with parSequence: the first one creates a task stream by produceStream, and append None at the end so that the consumer knows it should end processing. Another stream dequeueStream gets the tasks from the queue then consumes it in parallel with parEvalmap(config.batchSize)(consume).

When run it with sbt "run -n BlockingQueueApp", we can see it’s much faster when the consumer is faster or has the same speed as the producer. Especially when the consumer is slow, it prints multiple P at first, which means the producers doesn’t wait all the consumers to finish in order to produce tasks.

Back to the blocking the whole thread problem: it doesn’t seem to be a problem in this case, right? It’s only because we are lucky! In this setup, we are using two fixed threads as the thread pool of running IO in Main.scala:

private val executor = Executors.newFixedThreadPool(2, (r: Runnable) => {
  val back = new Thread(r)
  back.setDaemon(true)
  back
})

implicit override def executionContext: ExecutionContext = ExecutionContext.fromExecutor(executor)

implicit override def timer: Timer[IO] = IO.timer(executionContext)

implicit override def contextShift: ContextShift[IO] = IO.contextShift(executionContext)

If 2 consumers with empty queue happens to be scheduled on these 2 threads separately, it will block. If we change our BlockingQueueApp to the code in RealBlockingQueueApp:

override def work(): IO[Unit] = {
  Seq(
    dequeueStream().unNoneTerminate.parEvalMap(config.batchSize)(consume).compile.drain,
    dequeueStream().unNoneTerminate.parEvalMap(config.batchSize)(consume).compile.drain,
    (produceStream(0).map(Some(_)) ++ fs2.Stream.emit(None)).evalMap(x => IO(queue.put(x))).compile.drain,
  ).parSequence.map(_ => ())
}

Here we started two dequeue stream at first. Now the whole program will block when run it with sbt "run -b" .

The lesson learned here is that there is a big risk if any operation blocks the whole thread in cats effect. Even it doesn’t block the whole program, it may make a whole thread unavailable.

Actually in Cats Effect’s thread model, there is another thread pool for blocking tasks if we mark it explicitly. In AsyncConsole.scala, I use this exact block mode to run console output so that it won’t effect other non blocking IO operations:

def asyncPrintln(s: String)(
    implicit cs: ContextShift[IO], blocker: Blocker): IO[Unit] = blocker.blockOn(IO(println(s)))

However, if a thread is blocked in this pool, it will start another thread for the next operation. Based on the document, there is no limit on how many threads will be created. So if the producer is much slower than consumer, there will be more and more consume operations blocked on dequeue, so it will generate a large amount of threads, which is not ideal and eventually even will blow up the memory.

Approach 3: Use Cats Effect Friendly Queue

What if we have a queue that only block the dequeue IO when empty instead of blocking the whole thread? Luckily, FS2 provides such a queue. (Cats Effect 3.x also provides such a queue). The implementation is basically the same as above (code in StreamQueueApp.scala):

import fs2.concurrent.Queue

def work(): IO[Unit] = {
  for {
    queue <- Queue.bounded[IO, Option[Int]](config.batchSize * 2)
    _ <- Seq(
      (produceStream(0).map(Some(_)) ++ fs2.Stream.emit(None)).through(queue.enqueue).compile.drain,
      queue.dequeue.unNoneTerminate.parEvalMap(config.batchSize)(consume).compile.drain,
    ).parSequence
  } yield ()
}

Run sbt "run -n StreamAppQueue" to see how it performs.

Approach 4: Use FS2 Stream Directly

FS2 actually provides some advanced stream operations that makes it possible to combine the producing stream and consume stream, like the code in StreamApp.scala:

produceStream(0).parEvalMap(config.batchSize)(consume).compile.drain

Here we map consume in parallel on produce stream. However, if you try to run sbt "run -n StreamApp" vs sbt "run -n StreamQueueApp", you will find this is slower than before. This is because produceStream will give the next batch when the downstream asks. If we can prepare at least one batch before the downstream is free, we can save more time. Luckily, it’s very easy to do in fs2. As we can see in PrefetchStreamApp.scala, we can add prefetch after the produceStream:

produceStream(0).prefetch.parEvalMap(config.batchSize)(consume).compile.drain

It will prefetch a chunk of elements. Use prefetchN if you want to prefetch N chunks.

Then run this with sbt "run -n PrefetchStreamApp", you will find the performance is similar as the queued approach.

Actually if you check the source code of prefetch, you will find the implementation is almost the same as ours:

def prefetch[F2[x] >: F[x]: Concurrent]: Stream[F2, O] = prefetchN[F2](1)

def prefetchN[F2[x] >: F[x]: Concurrent](n: Int): Stream[F2, O] =
  Stream.eval(Queue.bounded[F2, Option[Chunk[O]]](n)).flatMap { queue =>
    queue.dequeue.unNoneTerminate
      .flatMap(Stream.chunk(_))
      .concurrently(chunks.noneTerminate.covary[F2].through(queue.enqueue))
  }

Approach 5: Make Producers Run in Parallel

We’ve made it runs in parallel between consumers, also between consumers and producers. But we haven’t made producers run in parallel yet. With the queue, its very easy to do, just start multiple IOs for produceStream.through(queue.enqueue). ConcurrentProduceQueueApp.scala is an example:

private val counter = new AtomicInteger(0)

override def work(): IO[Unit] = {
  for {
    queue <- Queue.bounded[IO, Int](config.batchSize * 2)
    _ <- Seq(
      produceStream(0, config.totalSize / 2).through(queue.enqueue).compile.drain,
      produceStream(config.totalSize / 2, config.totalSize).through(queue.enqueue).compile.drain,
      queue.dequeue.parEvalMap(config.batchSize) { x =>
        consume(x).map { _ =>
          if (counter.incrementAndGet() >= config.totalSize) {
            None
          } else {
            Some()
          }
        }
      }.unNoneTerminate.compile.drain,
    ).parSequence
  } yield ()
}

It has 2 concurrent producers but in theory you can create as many as you want, just be careful with the parameters of produceStream.

If you run this with sbt "run -n ConcurrentProduceQueueApp", you can find the performance is much better with slower producer. However, with the help of fs2 library, we can make the code cleaner without depends on any queue explicitly. Here is what I did in ConcurrentProducerApp.scala:

def work(): IO[Unit] = {
  fs2.Stream.emits(Range(0, produceParallelism))
    .map(batch => produceStream(
      batch * config.totalSize / produceParallelism,
      (batch + 1) * config.totalSize / produceParallelism.toDouble))
    .parJoin(produceParallelism)
    .prefetch
    .parEvalMap(config.batchSize)(consume).compile.drain
}

Here we use parJoin to join multiple producer stream at the same time.

More

All the approaches above other than the first one uses a queue either implicitly or explicitly. However, under high parallelism and load, every job operating on a single queue may makes this queue a bottleneck. In this case, there is a work stealing algorithm that each consumers can has its own queue, and whenever a consumer’s queue is empty, it steal some tasks from another one. But it’s a little bit complex and unnecessary if the load is not so high, so I will not cover it in this article.

Test Results

Now let’s run all the approaches and compare the performance with sbt "run -n". Here are the results:

  slow producer balanced slow consumer
BatchIO 11086.078637 ms 29912.377578 ms 10015.51878 ms
BlockingQueue 10190.038753 ms 14195.228189 ms 6495.333179 ms
StreamQueue 10138.016643 ms 14458.443122 ms 6418.078377 ms
Stream 10356.178562 ms 15655.697826 ms 6560.111697 ms
PrefetchStream 10141.110634 ms 14578.362136 ms 6376.628036 ms
ConcurrentProduceresQueue 5187.442452 ms 14395.321922 ms 6576.538821 ms
ConcurrentProducer 5198.723825 ms 14544.247312 ms 6418.078377 ms

We can see approaches that parallelize all the parts win the performance game.