ZIO - 카프카 컨슈머에서 ZSink.collectAllN 무한 대기
이슈
zio-kafka 의 컨슈머에서 컨슘하는 레코드들을 N 개씩 Chunk 로 묶어 일괄 처리하고 싶었다.
그래서 ZSink.collectAllN 함수를 사용했는데, N 개씩 Chunk 로 잘 묶다가 마지막 레코드들이 N 개를 채우지 못하면 무한히 대기하는 현상이 발생했다.
ex) n = 3, 레코드를 5개 컨슘한다고 했을 때, 3 개는 Chunk 로 잘 묶어 처리하지만 나머지 2 개는 처리하지 않고 대기했다.
ZSink.collectAllN 함수는 ZStream 을 N 개씩 Chunk 로 묶어주는 함수이다. N 개를 채우지 못해도 Chunk로 묶는다. (이 부분 때문에 헷갈렸다.)
코드 예시
ZStream(1, 2, 3, 4, 5).run( ZSink.collectAllN(3) ) // Output: Chunk(1,2,3), Chunk(4,5)
Why?
생각해보니 답은 간단했다.
위 코드 예시에선 ZStream 의 끝을 알 수 있어 (1, 2, 3), (4, 5) 로 나눠주지만, 컨슈머는 끝이 없다.
무한히 컨슘하기 때문에 ZSink.collectAllN 함수 입장에선 N 개가 확정된 순간에만 Chunk 로 묶을 수 있다.
결국 ZSink.collectAllN 함수말고 다른 함수를 사용했지만, 어떤 방식으로 컨슘을 하길래 collect 하지 못하고 기다리게 되는지가 궁금했다.
그래서 먼저 ZSink.collectAllN 함수를 봤는데
def collectAllN[In](n: => Int)(implicit trace: Trace): ZSink[Any, Nothing, In, In, Chunk[In]] = fromZIO(ZIO.succeed(ChunkBuilder.make[In](n))) .flatMap(cb => foldUntil[In, ChunkBuilder[In]](cb, n.toLong)(_ += _)) .map(_.result())
아무리 살펴보아도 이 함수에서 뭔가를 기다리거나 하지는 않는 듯 했다.
단순히 n 개씩 돌며 Chunk 를 만들어준다.
다음으로 Consumer 코드를 찾아봤다. 아래는 실제 consume 이 일어나는 코드이다.
override def partitionedAssignmentStream[R, K, V]( subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] = { /.../ ZStream.unwrapScoped { for { stream <- ZStream.fromHubScoped(partitionAssignments) // Hub 에서 stream 을 가져옴. _ <- extendSubscriptions.withFinalizer(_ => reduceSubscriptions.orDie) } yield stream .map(_.exit) .flattenExitOption .flattenChunks .map { _.collect { case (tp, partition) if Subscription.subscriptionMatches(subscription, tp) => val partitionStream = if (settings.perPartitionChunkPrefetch <= 0) partition else partition.bufferChunks(settings.perPartitionChunkPrefetch) val stream: ZStream[R, Throwable, CommittableRecord[K, V]] = if (onlyByteArraySerdes) partitionStream.asInstanceOf[ZStream[R, Throwable, CommittableRecord[K, V]]] else partitionStream.mapChunksZIO(_.mapZIO(_.deserializeWith(keyDeserializer, valueDeserializer))) tp -> stream } } } }
다른 코드는 볼 필요 없고
stream <- ZStream.fromHubScoped(partitionAssignments)
요 부분이 중요하다. Hub 에서 Stream 을 가져온다.
Hub 는 또 뭐지..? 하고 더 찾아봤다. 아래는 internal Hub 이다.
private[zio] abstract class Hub[A] extends Serializable { /.../ def subscribe(): Hub.Subscription[A] /.../ } private[zio] abstract class Subscription[A] extends Serializable { /.../ def poll(default: A): A /.../ }
Hub 는 구독(subscribe) 메서드가 있고 이 메서드는 구독 객체를 반환한다. Hub 를 구독한 객체(Subscription)는 poll 메서드로 구독 값을 땡겨올 수 있다.
(Hub 가 zio-kafka의 본체? 라는 생각이 들었다.)
위 Hub 는 internal 이고, 외부에서 사용되는 Hub 의 subscribe 메서드는 Dequeue 를 반환한다.
abstract class Hub[A] extends Enqueue[A] { /.../ def subscribe(implicit trace: Trace): ZIO[Scope, Nothing, Dequeue[A]] /.../ }
외부에서 Hub 를 사용한다면 hub.subscribe() 로 구독하고, 구독 대상의 Dequeue 를 받아 Dequeue 의 메서드로 구독 값들을 관리할 수 있을 거다.
Hub 에서 반환하는 Dequeue 메서드의 구현체에서 take 메서드는 다음과 같다.
def take(implicit trace: Trace): UIO[A] = ZIO.fiberIdWith { fiberId => if (shutdownFlag.get) ZIO.interrupt else { val empty = null.asInstanceOf[A] val message = if (pollers.isEmpty()) subscription.poll(empty) else empty // poll 메서드로 message 를 가져옴. message match { case null => val promise = Promise.unsafe.make[Nothing, A](fiberId)(Unsafe.unsafe) ZIO.suspendSucceed { pollers.offer(promise) subscribers.add(subscription -> pollers) strategy.unsafeCompletePollers(hub, subscribers, subscription, pollers) if (shutdownFlag.get) ZIO.interrupt else promise.await // 값이 없다면 wait... }.onInterrupt(ZIO.succeed(unsafeRemove(pollers, promise))) case a => strategy.unsafeOnHubEmptySpace(hub, subscribers) ZIO.succeed(a) } } }
dequeue.take() 로 외부에서 구독값을 가져올터라 Dequeue 구현체에서 위 메서드만 살펴보았다.
ZSink.collectAllN 함수에서 계속 대기하는 원인의 코드가 위에 있었다.
주석으로 달아 놓았듯이, 구독 대상을 poll() 했을 때 empty 라면 promise.await 으로 구독 값(A) 를 가져올 때까지 기다린다.
구독 값(A)를 가져오는 Promise 를 pollers.offer(promise) 코드에서 queue 에 집어넣는데, 이를 unsafeCompletePollers 함수에서 처리한다.
unsafeCompletePollers 함수는 다음과 같다.
final def unsafeCompletePollers( hub: internal.Hub[A], subscribers: Set[(internal.Hub.Subscription[A], MutableConcurrentQueue[Promise[Nothing, A]])], subscription: internal.Hub.Subscription[A], pollers: MutableConcurrentQueue[Promise[Nothing, A]] ): Unit = { var keepPolling = true val nullPoller = null.asInstanceOf[Promise[Nothing, A]] val empty = null.asInstanceOf[A] while (keepPolling && !subscription.isEmpty()) { val poller = pollers.poll(nullPoller) // 1. Promise poll() if (poller eq nullPoller) { subscribers.remove(subscription -> pollers) if (pollers.isEmpty()) keepPolling = false else subscribers.add(subscription -> pollers) } else { subscription.poll(empty) match { // 2. Subscription.poll() case null => unsafeOfferAll(pollers, poller +: unsafePollAll(pollers)) // 3. 다시 pollers 에 offer case a => unsafeCompletePromise(poller, a) unsafeOnHubEmptySpace(hub, subscribers) } } } }
간단히 요약하면
1. 구독 값(A)을 가져오는 Promise 를 poll 한다.
2. 구독 값(A)을 구독 객체(Subscription)에서 poll 해본다.
3. null, 즉 비어있다면 다시 Promise 를 pollers 에 집어넣는다. (1 번부터 반복)
위 동작으로 구독 값(A)가 구독 객체(Subscription)에 쌓일 때까지 계속 poll 을 하며 기다리게 된다.
ZStream, ZSink 에서 출발한 의문이었지만 카프카 컨슈밍의 동작 원리에 대해서 좀 더 이해를 할 수 있게 된 것 같다!
댓글
댓글 쓰기