2021년 1월 3일 일요일

Apache Nifi - Kafka to HDFS

 

Kafka에서 데이터를 받아 HDFS에 json, csv형태로 입력받는다.

저장 후 Hive, Spark 등으로 데이터를 처리할 수 있으며 실시간 데이터 인풋을 Nifi를 통해 

확인할 수 있는 장점이 있다.



1. [ ConsumeKafka ] 

컨슈머로 연결하여 브로커에서 보내는 메세지를 소비


Properties : 

Kafka Brokers : 172.31.11.11:9092, 172.31.22.22:9092, 172.31.33.33:9092

Topic Name(s) : your-topic-name

Group ID : your-kafka-group-id




2. [ UpdateAttribute ]

들어오는 카프카데이터에 createdAt를 추가하기 위한 프로퍼티

filename은 HDFS저장시 파일명을 시간으로 지정하기 위함


Property : 

createdAt : ${now():format("yyyy-MM-dd:HH:mm:ss")}

filename : st_${now():format("yyyyMMddHHmmssSS")}_${random():mod(99):plus(1000)}




3. [ ReplaceText ]

2.에서 지정한 createdAt값을 Json으로 입력받는 Kafka데이터 맨뒤에 추가한다.


Property :

Search Value : (?s:(^.*)}$)

Replacement Value : $1,"createdAt":"${createdAt}"}

Replacement Strategy : Regex Replace




4. [ PutHDFS ]

해당되는 파일의 Path를 입력한다.


Property :

Hadoop Configuration Resources : /etc/hadoop/conf.cloudera.hdfs/core-site.xml,

/etc/hadoop/conf.cloudera.hdfs/hdfs-site.xml

Directory : /user/nifi/yourpath





PutEmail을 추가하여 Nifi에러시 Email을 받게 설정할 수도 있다.



댓글 없음:

댓글 쓰기