Kafka에서 데이터를 받아 HDFS에 json, csv형태로 입력받는다.
저장 후 Hive, Spark 등으로 데이터를 처리할 수 있으며 실시간 데이터 인풋을 Nifi를 통해
확인할 수 있는 장점이 있다.
1. [ ConsumeKafka ]
컨슈머로 연결하여 브로커에서 보내는 메세지를 소비
Properties :
Kafka Brokers : 172.31.11.11:9092, 172.31.22.22:9092, 172.31.33.33:9092
Topic Name(s) : your-topic-name
Group ID : your-kafka-group-id
2. [ UpdateAttribute ]
들어오는 카프카데이터에 createdAt를 추가하기 위한 프로퍼티
filename은 HDFS저장시 파일명을 시간으로 지정하기 위함
Property :
createdAt : ${now():format("yyyy-MM-dd:HH:mm:ss")}
filename : st_${now():format("yyyyMMddHHmmssSS")}_${random():mod(99):plus(1000)}
3. [ ReplaceText ]
2.에서 지정한 createdAt값을 Json으로 입력받는 Kafka데이터 맨뒤에 추가한다.
Property :
Search Value : (?s:(^.*)}$)
Replacement Value : $1,"createdAt":"${createdAt}"}
Replacement Strategy : Regex Replace
4. [ PutHDFS ]
해당되는 파일의 Path를 입력한다.
Property :
Hadoop Configuration Resources : /etc/hadoop/conf.cloudera.hdfs/core-site.xml,
/etc/hadoop/conf.cloudera.hdfs/hdfs-site.xml
Directory : /user/nifi/yourpath
댓글 없음:
댓글 쓰기