2021년 2월 21일 일요일

Apache HBase - Kafka to HBase with Nifi

 

[ 아키텍처 ]

Kafka -> Nifi -> HBase -> Python으로 데이터 Fetch 


[ Kafka ]

카프카의 토픽명과 컨슈머그룹명을 정해 데이터를 받는다.


[ Nifi ]

아래와 같은 프로세스 플로우로 구성할 수 있다.

























  • ConsumerKafka에 토픽명과 그룹명을 설정값으로 입력한다
  • UpdateAttribue와 ReplaceText는 해당 데이터가 나이파이에 적제되는 시간을 입력하 위함이므로 생략가능하다.
  • PutHBaseJson에 HBase TableName, Row Identifier, Column Family값을 입력한다. 네임스페이스가 있다면 NS:TableName과 값이 입력한다. Row Identifier값은 로우키가 되므로 유니크한 값으로 설정한다. 예시에선 $(createdAt}_${uuid}값으로 설정하였다. 



















HBase Client Service는 새로 생성한다.

예제는 클라우데라 매니저를 사용하므로 

/opt/cloudera/parcels/CDH/lib/hbase/conf/core-site.xml,

/opt/cloudera/parcels/CDH/lib/hbase/conf/hbase-site.xml

값을 입력한다.



















[ HBase ]

Kafka데이터를 받을 Hbase테이블을 생성한다

Ex) 

$ bin/hbase shell

shell) create 'NS:TableName', 'ColumFamily'



[ HBase 설정 ]

아래 두 설정 값은 False로 두자












[ Python ]

파이썬에서 HBase API를 제공해주는 라이브러리를 설치한다. 

$ pip install happybase


간단히 특정날짜의 HBase데이터를 가지고 와, List로 return하는 예제를 작성하였다.


import happybase

# connect
def connOpen() :
conn = happybase.Connection(host='12.345.67.88', port=9090,
table_prefix='NS',
table_prefix_separator=b':',
timeout=None,
autoconnect=False,
transport='framed',
protocol='compact'
)
return conn

def connClose(conn) :
conn.close()
# fetch
def fetchTwoDay(conn, startDate, endDate) :
conn.open()
table = conn.table('Table_Name')
data = table.scan(row_start=startDate, row_stop=endDate)
return list(data)

# USE CASE
#conn = connOpen()
#data = fetchTwoDay(conn, '2021-02-19', '2021-02-21')
#connClose(conn)
#print(data)


- 해당 라이브러리는 Hbase Thrift 서버와 연결하기 때문에 

기본 9000, Thrift2는 9090포트값을 설정 값으로 입력한다.





참고 URL :

https://my-bigdata-blog.blogspot.com/2017/04/nifi-to-hbase.html

happybase API URL : 

https://happybase.readthedocs.io/en/latest/



댓글 없음:

댓글 쓰기