2020년 11월 30일 월요일

Apache Nifi - 카프카 데이터 분산처리 ( Distribute Kafka Data)


1. Nifi Processor MergeContent


MergeContent프로세서를 이용해 Input processor(ex.Kafka, File)에서 들어오는 데이터를

적절한 크기로 Merge한 다음에 다음 단계로 넘겨주는 Flow를 기대했다.

그러나 MergeContent는 이미 받은 데이터를 다시 병합하여 데이터를 복제하는 형식으로 기대했던 결과를 가져오지는 못했다.

예를 들어 100의 데이터가 들어온다면 100개의 데이터를 차곡차곡 쌓았다가 하나의 데이터로 병합하여 결국 1개의 데이터가 들어와야 하는것을 결과로 예상했지만, MergeContent는 100개의 데이터를 일단 Nifi 저장 후, 그 데이터들을 병합한 데이터셋 복제복을 만들어 다음 프로세스로 넘겨준다. 결국 병합된 1개의 데이터셋을 전달하지만, 이중 저장을 하기 때문에 비효율적이다.






2. 하나의 클래스터에서 에서 3개의 nifi processor로 분산처리




해당 그림처럼 관련 processor를 여러개 만들어 분산처리가 될 때 

확인해야 할 사항이 

1. 데이터가 중복이 되는가?

2. 데이터가 누락이 되는가?

3. 처리속도가 늘어나는가?

정도가 있을 것 같다.


테스트를 위해 카프카데이터를 받아오는 Flow 3개를 셋팅하고, 

인위적으로 프로듀서에 데이터를 넣는 코드를 작성한다. 

테스트 토픽은 partition 3개로 설정하여 테스트하였다.


[ Nifi Flow ]



[ Python 코드 ]
from kafka import KafkaProducer
import time

producer = KafkaProducer(bootstrap_servers=['192.111.111.111:9092','192.222.222.222:9092','192.3333.333.333:9092'])

for i in range(1,5000):
    producer.send('test',str.encode('kafka:-%d' % i))
    time.sleep(0.5)



Hive테이블에 저장 후 데이터들을 보니 중복,누락 없이 3개의 프로세서가 데이터를 받아온다.

( 파일명 : 1_날짜_tuuid / 2_날짜_tuuid / 3_날짜_tuuid )





3. 3개의 NiFi 클래스터로 분산처리


2.에서는 하나의 클래스터에서 3개의 프로세스로 처리했다면, 클러스터 단계에서도 여러 서버를 두어 트래픽을 분산시킬 수 있다.


카프카 partition과 나이파이 kafka-consumer는 N:1로 매칭된다. 즉 partition 3개 : kafka-consumer 3개로 설정하면 1:1로 매칭되어 최적의 효율을 보여준다.

아래그림을 보면 이해가 편하다. Concurrent Task를 2.에서 작성한 kafka-consumer processor라고 보면 될 것이다.



반면 Partition 2개 : kafka-consumer 3개로 설정하면 kafka-consumer 1개는 데이터를 소비하지 않으므로 비효율적이다.



Partition 4개 : kafka-consumer 2개 로 설정하면 2:1로 매칭되어 아래와 같은 구성이 된다. 



2.와 3.의 방법을 둘다 적용하는 3개의 클러스터에서 2개의 nifi processor로 분산처리하는 방법도 있다.



NiFi에서 특정 파티션으로 연결할 수 있는 기능은 없으며 자동적을 NiFi cluster와 concurrent Task 수에 따라 지정된다. 따라서 시스템을 구성하기 전에 파티션수와 컨슈머 수, 트래픽량을 먼저 고려해야한다.





[ 참조 ]

MergeContent : 

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.5.0/org.apache.nifi.processors.standard.MergeRecord/additionalDetails.html


Remote Process Group 을 이용 : 

https://pierrevillard.com/2017/02/23/listfetch-pattern-and-remote-process-group-in-apache-nifi/


Nifi Distribute : 

https://community.cloudera.com/t5/Community-Articles/Integrating-Apache-NiFi-and-Apache-Kafka/ta-p/247433




댓글 없음:

댓글 쓰기