4월, 2023의 게시물 표시

ZIO - 카프카 컨슈머에서 ZSink.collectAllN 무한 대기

 이슈 zio-kafka 의 컨슈머에서 컨슘하는 레코드들을 N 개씩 Chunk 로 묶어 일괄 처리하고 싶었다. 그래서 ZSink.collectAllN 함수를 사용했는데, N 개씩 Chunk 로 잘 묶다가 마지막 레코드들이 N 개를 채우지 못하면 무한히 대기하는 현상이 발생했다. ex) n = 3, 레코드를 5개 컨슘한다고 했을 때, 3 개는 Chunk 로 잘 묶어 처리하지만 나머지 2 개는 처리하지 않고 대기했다. ZSink.collectAllN 함수는 ZStream 을 N 개씩 Chunk 로 묶어주는 함수이다. N 개를 채우지 못해도 Chunk로 묶는다. (이 부분 때문에 헷갈렸다.) 코드 예시 ZStream ( 1 , 2 , 3 , 4 , 5 ). run ( ZSink . collectAllN ( 3 ) ) // Output: Chunk(1,2,3), Chunk(4,5) 공식 문서 :  https://zio.dev/reference/stream/zsink/creating-sinks/ Why? 생각해보니 답은 간단했다. 위 코드 예시에선 ZStream 의 끝을 알 수 있어 (1, 2, 3), (4, 5) 로 나눠주지만, 컨슈머는 끝이 없다.  무한히 컨슘하기 때문에 ZSink.collectAllN  함수 입장에선 N 개가 확정된 순간에만 Chunk 로 묶을 수 있다. 결국 ZSink.collectAllN  함수말고 다른 함수를 사용했지만, 어떤 방식으로 컨슘을 하길래 collect 하지 못하고 기다리게 되는지가 궁금했다. 그래서 먼저 ZSink.collectAllN 함수를 봤는데 def collectAllN [ In ]( n : => Int )( implicit trace : Trace ) : ZSink [ Any , Nothing , In , In , Chunk [ In ]] = fromZIO ( ZIO . succeed ( ChunkBuilder . make [ In ]( n )))

Cake Pattern(케이크 패턴) - Scala