ZIO - 카프카 컨슈머에서 ZSink.collectAllN 무한 대기

 이슈

zio-kafka 의 컨슈머에서 컨슘하는 레코드들을 N 개씩 Chunk 로 묶어 일괄 처리하고 싶었다.
그래서 ZSink.collectAllN 함수를 사용했는데, N 개씩 Chunk 로 잘 묶다가 마지막 레코드들이 N 개를 채우지 못하면 무한히 대기하는 현상이 발생했다.

ex) n = 3, 레코드를 5개 컨슘한다고 했을 때, 3 개는 Chunk 로 잘 묶어 처리하지만 나머지 2 개는 처리하지 않고 대기했다.

ZSink.collectAllN 함수는 ZStream 을 N 개씩 Chunk 로 묶어주는 함수이다. N 개를 채우지 못해도 Chunk로 묶는다. (이 부분 때문에 헷갈렸다.)

코드 예시
ZStream(1, 2, 3, 4, 5).run(
  ZSink.collectAllN(3)
)
// Output: Chunk(1,2,3), Chunk(4,5)

Why?

생각해보니 답은 간단했다.
위 코드 예시에선 ZStream 의 끝을 알 수 있어 (1, 2, 3), (4, 5) 로 나눠주지만, 컨슈머는 끝이 없다. 
무한히 컨슘하기 때문에 ZSink.collectAllN 함수 입장에선 N 개가 확정된 순간에만 Chunk 로 묶을 수 있다.


결국 ZSink.collectAllN 함수말고 다른 함수를 사용했지만, 어떤 방식으로 컨슘을 하길래 collect 하지 못하고 기다리게 되는지가 궁금했다.
그래서 먼저 ZSink.collectAllN 함수를 봤는데
def collectAllN[In](n: => Int)(implicit trace: Trace): ZSink[Any, Nothing, In, In, Chunk[In]] =
    fromZIO(ZIO.succeed(ChunkBuilder.make[In](n)))
      .flatMap(cb => foldUntil[In, ChunkBuilder[In]](cb, n.toLong)(_ += _))
      .map(_.result())
아무리 살펴보아도 이 함수에서 뭔가를 기다리거나 하지는 않는 듯 했다.
단순히 n 개씩 돌며 Chunk 를 만들어준다.

다음으로 Consumer 코드를 찾아봤다. 아래는 실제 consume 이 일어나는 코드이다.
override def partitionedAssignmentStream[R, K, V](
      subscription: Subscription,
      keyDeserializer: Deserializer[R, K],
      valueDeserializer: Deserializer[R, V]
    ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] = {
			/.../
      ZStream.unwrapScoped {
        for {
          stream <- ZStream.fromHubScoped(partitionAssignments) // Hub 에서 stream 을 가져옴.
          _      <- extendSubscriptions.withFinalizer(_ => reduceSubscriptions.orDie)
        } yield stream
          .map(_.exit)
          .flattenExitOption
          .flattenChunks
          .map {
            _.collect {
              case (tp, partition) if Subscription.subscriptionMatches(subscription, tp) =>
                val partitionStream =
                  if (settings.perPartitionChunkPrefetch <= 0) partition
                  else partition.bufferChunks(settings.perPartitionChunkPrefetch)

                val stream: ZStream[R, Throwable, CommittableRecord[K, V]] =
                  if (onlyByteArraySerdes) partitionStream.asInstanceOf[ZStream[R, Throwable, CommittableRecord[K, V]]]
                  else partitionStream.mapChunksZIO(_.mapZIO(_.deserializeWith(keyDeserializer, valueDeserializer)))

                tp -> stream
            }
          }
      }
    }
다른 코드는 볼 필요 없고

stream <- ZStream.fromHubScoped(partitionAssignments)

요 부분이 중요하다. Hub 에서 Stream 을 가져온다.
Hub 는 또 뭐지..? 하고 더 찾아봤다. 아래는 internal Hub 이다.
private[zio] abstract class Hub[A] extends Serializable {
		/.../
		def subscribe(): Hub.Subscription[A]
		/.../
}

private[zio] abstract class Subscription[A] extends Serializable {

    /.../
    def poll(default: A): A
		/.../
}
Hub 는 구독(subscribe) 메서드가 있고 이 메서드는 구독 객체를 반환한다. Hub 를 구독한 객체(Subscription)poll 메서드로 구독 값을 땡겨올 수 있다.
(Hub 가 zio-kafka의 본체? 라는 생각이 들었다.)

위 Hub 는 internal 이고, 외부에서 사용되는 Hub 의 subscribe 메서드는 Dequeue 를 반환한다.
abstract class Hub[A] extends Enqueue[A] {
		/.../
		def subscribe(implicit trace: Trace): ZIO[Scope, Nothing, Dequeue[A]]
		/.../
}

외부에서 Hub 를 사용한다면 hub.subscribe() 로 구독하고, 구독 대상의 Dequeue 를 받아 Dequeue 의 메서드로 구독 값들을 관리할 수 있을 거다.

Hub 에서 반환하는 Dequeue 메서드의 구현체에서 take 메서드는 다음과 같다.
def take(implicit trace: Trace): UIO[A] =
        ZIO.fiberIdWith { fiberId =>
          if (shutdownFlag.get) ZIO.interrupt
          else {
            val empty   = null.asInstanceOf[A]
            val message = if (pollers.isEmpty()) subscription.poll(empty) else empty // poll 메서드로 message 를 가져옴.
            message match {
              case null =>
                val promise = Promise.unsafe.make[Nothing, A](fiberId)(Unsafe.unsafe)
                ZIO.suspendSucceed {
                  pollers.offer(promise)
                  subscribers.add(subscription -> pollers)
                  strategy.unsafeCompletePollers(hub, subscribers, subscription, pollers)
                  if (shutdownFlag.get) ZIO.interrupt else promise.await // 값이 없다면 wait...
                }.onInterrupt(ZIO.succeed(unsafeRemove(pollers, promise)))
              case a =>
                strategy.unsafeOnHubEmptySpace(hub, subscribers)
                ZIO.succeed(a)
            }
          }
        }
dequeue.take() 로 외부에서 구독값을 가져올터라 Dequeue 구현체에서 위 메서드만 살펴보았다.
ZSink.collectAllN 함수에서 계속 대기하는 원인의 코드가 위에 있었다.
주석으로 달아 놓았듯이, 구독 대상을 poll() 했을 때 empty 라면 promise.await 으로 구독 값(A) 를 가져올 때까지 기다린다.
구독 값(A)를 가져오는 Promise 를 pollers.offer(promise) 코드에서 queue 에 집어넣는데, 이를 unsafeCompletePollers 함수에서 처리한다.

unsafeCompletePollers 함수는 다음과 같다.
final def unsafeCompletePollers(
      hub: internal.Hub[A],
      subscribers: Set[(internal.Hub.Subscription[A], MutableConcurrentQueue[Promise[Nothing, A]])],
      subscription: internal.Hub.Subscription[A],
      pollers: MutableConcurrentQueue[Promise[Nothing, A]]
    ): Unit = {
      var keepPolling = true
      val nullPoller  = null.asInstanceOf[Promise[Nothing, A]]
      val empty       = null.asInstanceOf[A]

      while (keepPolling && !subscription.isEmpty()) {
        val poller = pollers.poll(nullPoller)  // 1. Promise poll()
        if (poller eq nullPoller) {
          subscribers.remove(subscription -> pollers)
          if (pollers.isEmpty()) keepPolling = false
          else subscribers.add(subscription -> pollers)
        } else {
          subscription.poll(empty) match {  // 2. Subscription.poll()
            case null =>
              unsafeOfferAll(pollers, poller +: unsafePollAll(pollers))  // 3. 다시 pollers 에 offer
            case a =>
              unsafeCompletePromise(poller, a)
              unsafeOnHubEmptySpace(hub, subscribers)
          }
        }
      }
    }
간단히 요약하면
1. 구독 값(A)을 가져오는 Promise 를 poll 한다.
2. 구독 값(A)구독 객체(Subscription)에서 poll 해본다.
3. null, 즉 비어있다면 다시 Promise 를 pollers 에 집어넣는다. (1 번부터 반복)

위 동작으로 구독 값(A)구독 객체(Subscription)에 쌓일 때까지 계속 poll 을 하며 기다리게 된다.

ZStream, ZSink 에서 출발한 의문이었지만 카프카 컨슈밍의 동작 원리에 대해서 좀 더 이해를 할 수 있게 된 것 같다!

댓글

이 블로그의 인기 게시물

HTML - input file 버튼 꾸미기

HTML - 이미지 미리보기(jQuery 없이)

BOJ - DNA 유사도(2612)