2021년 1월 3일 일요일

Apache Nifi - Kafka to HDFS

 

Kafka에서 데이터를 받아 HDFS에 json, csv형태로 입력받는다.

저장 후 Hive, Spark 등으로 데이터를 처리할 수 있으며 실시간 데이터 인풋을 Nifi를 통해 

확인할 수 있는 장점이 있다.



1. [ ConsumeKafka ] 

컨슈머로 연결하여 브로커에서 보내는 메세지를 소비


Properties : 

Kafka Brokers : 172.31.11.11:9092, 172.31.22.22:9092, 172.31.33.33:9092

Topic Name(s) : your-topic-name

Group ID : your-kafka-group-id




2. [ UpdateAttribute ]

들어오는 카프카데이터에 createdAt를 추가하기 위한 프로퍼티

filename은 HDFS저장시 파일명을 시간으로 지정하기 위함


Property : 

createdAt : ${now():format("yyyy-MM-dd:HH:mm:ss")}

filename : st_${now():format("yyyyMMddHHmmssSS")}_${random():mod(99):plus(1000)}




3. [ ReplaceText ]

2.에서 지정한 createdAt값을 Json으로 입력받는 Kafka데이터 맨뒤에 추가한다.


Property :

Search Value : (?s:(^.*)}$)

Replacement Value : $1,"createdAt":"${createdAt}"}

Replacement Strategy : Regex Replace




4. [ PutHDFS ]

해당되는 파일의 Path를 입력한다.


Property :

Hadoop Configuration Resources : /etc/hadoop/conf.cloudera.hdfs/core-site.xml,

/etc/hadoop/conf.cloudera.hdfs/hdfs-site.xml

Directory : /user/nifi/yourpath





PutEmail을 추가하여 Nifi에러시 Email을 받게 설정할 수도 있다.



Apache Kafka - Simple테스트 Python Code

 

[ Ubuntu ]

* 사전설치 Python3


sudo apt-get upgrade


# Pip설치

sudo apt-get install python3-pip


#Kafka Lib 설치

sudo pip3 install kafka-python




[ 테스트 코드 ]

from kafka import KafkaProducer

import time


producer = KafkaProducer(bootstrap_servers=[ '172.31.11.11:9092', '172.31.22.222:9092', '172.31.33.33:9092' ])


for i in range(1,50):

    producer.send( 'Topic-Name', str.encode('kafka:-%d' % i) )

    time.sleep(0.5)




2021년 1월 2일 토요일

[ Zookeeper ] CLI명령어와 ACL


[ CLI 명령어 ]


zookeeper/bin/zkCli.sh에서 CLI 접속


create /znode "first-app"

ls /znode

get /znode

set /znode "date-updated"

rmr /znode  




[ ACL (Action Control List) ]


znode접근시 Authentication is not valid : /znode-ex 라는 메세지가 뜬다면

인증권한을 확인할 필요가 있다.


ACL은 znode에 액세스 하기위한 인증제어이며

5가지의 종류로 구분된다.

  • CREATE - 생성
  • READ - 읽기 
  • WRITE - 쓰기
  • DELETE - 삭제
  • ADMIN - 위4가지 권한 부여


ls -l 명령어로 확인할 수 있는 리눅스 파일권한 스키마(user:group:others)와는 달리 아래 4가지로 권한을 부여한다.

  • WORLD - 모든 요청 
  • AUTH - 인증된 세션
  • DIGEST - username/password로 인증된 요청
  • IP - 해당 ip요청



[ 조작 명령어 ]

getAcl /znode 결과-> world/anyone/crdwa (c : create, r :read ... )


DIGEST로 인증설정

addauth digest [username]:[password]

setAcl /znode auth:[username]:[password]:crdwa


호스트네임으로 인증설정

setAcl /znode host:[hostname]:crdwa


IP주소로 인증설정

setAcl /znoe ip:[IpAdressIPv4]:crdwa




[ Cloudera Manager ACL ]

znode인증설정을 풀어서 작업을 해야 할 경우 일일히 바꿀수 없기에 

CDH에서는 ACL을 모두 무시하고 접근가능하게 설정 할 수 있다.  


CM -> ZooKeeper -> Configuration 페이지 -> Java Configuration Options for Zookeeper Server에서 아래텍스트를 입력 후 Restart.

-Dzookeeper.skipACL=true


또는 

znode의 모든 권한을 갖는 superDigest 계정을 설정하는 방법도 있다.




참조 :

https://www.mynotes.kr/zookeeper-aclaction-control-list-%EC%84%A4%EC%A0%95/

[Algorithm] 요일구하기


문제 ] 

a와 b 정수 값을 입력 받아 2016년 a월 b일의 요일을 리턴한다.



코드1 ]

import java.util.*;
import java.text.*;

class Solution {
    public String solution(int a, int b) {

        String answer = "";     
        String aa = String.valueOf(a);
        String bb = String.valueOf(b);
        String[] week = {"SUN","MON","TUE","WED","THU","FRI","SAT"};

        if(aa.length() < 2){
            aa = '0' + aa;
        }
        if(bb.length() < 2){
            bb = '0' + bb;
        }
        
        String input_date = "2016"+aa+bb;
        DateFormat date_format = new SimpleDateFormat("yyyyMMdd");
        Date get_date = new Date();
        
        try{
            get_date = date_format.parse(input_date);
        }catch(ParseException e){
            e.printStackTrace();
        }
        
        Calendar calendar = Calendar.getInstance();
        calendar.setTime(get_date);
        
        int idx = calendar.get(Calendar.DAY_OF_WEEK)-1;
        answer = week[idx];

        return answer;
    }
}



코드2]

자바 8에서는 Calendar클래스의 단점들을 보완하는 LocalDate클래스가 등장한다.

아래 코드는 코드1과 같은 결과다.


import java.time.*;

class Solution {
    public String solution(int a, int b) {

        return LocalDate.of(2016, a, b).getDayOfWeek().toString().substring(0,3);

    }
}



LocalDate 클래스 참조 :

https://jeong-pro.tistory.com/163


2020년 12월 28일 월요일

Cloudera Manager [2] - Kerberos 보안설정


CDH는 보안프로그램으로 커버로스를 제공한다. 유저가 많아지고 각 유저에 대한 권한이 세분화되어야 함에 따라 그 필요성이 중시된다. 서버환경은 EC2-CentOs7, CDH5.15으로 진행된다.


[ 커버로스 개념 ]

Protocol의 동작순서는 아래와 같다.











이미지 출처 : 

https://www.letmecompile.com/kerberos-protocol


간단히 이해하자면 커버로스 프로세스는 영화를 예매하는 것에 비유할 수 있다. 인터넷(AS)으로 영화를 예매한다. 예매코드를 가지고 영화관에서 티켓(TGS)를 뽑는다. 입장시 해당 티켓를 점원(SS)에게 제출한다. 3단계 보안프로세스를 통해 유효한 티켓이있는 유저만 접속제한하여 편리하게 관리하는 것이 가능하다.



[ 1.사전 설치 ]

JDK 1.8.0_161이상 버전을 설치하도록 한다.

커버로드 프로토콜에서 AES-256암호화를 이전버전에선 사용하지 못하기에 따로 셋팅을 해줘야하는 번거로움이 있기 때문이다.



[ 2.커버로스 설치 ]

Cloudera manager server가 설치된 EC2

$ yum install openldap-clients

$ yum install krb5-server 

 

모든서버 

$ yum install krb5-workstation

$ yum install krb5-libs


* SSH를 통해 패키지 설치

pem키를 cloudera manager가 설치된 EC2로 옮긴 후,

$ sudo chmod 400 my-key-pair.pem 

설정안할시 Load key "my.pem": bad permissions, Permission denied에러


이후 아래명령어로 접속하여 패키지 설치

$ ssh -i /path/my-key-pair.pem my-instance-user-name@my-instance-public-dns-name


서버가 수십대일시, 쉘스크립트를 짜서 실행



[ 3.커버로스 설정 ]

$ hostname -f    //해당호스트네임 보기


EXAMPLE.COM = 현재 호스트네임,

cent1.example.com = MIT KDC서버 호스트, Cloudera Manager host가 설치된 호스트네임으로 바꿔서 기입


$ vi /etc/krb5.conf

[logging]

default = FILE:/var/log/krb5libs.log

kdc = FILE:/var/log/krb5kdc.log

admin_server = FILE:/var/log/kadmind.log 


[libdefaults]

dns_lookup_realm = false

ticket_lifetime = 24h

renew_lifetime = 7d

forwardable = true

rdns = false

default_realm = EXAMPLE.COM

default_ccache_name = KEYRING:persistent:%{uid} 


[realms]

EXAMPLE.COM = {

kdc = cent1.example.com

admin_server = cent1.example.com

}

 

[domain_realm]

 .example.com = EXAMPLE.COM

 example.com = EXAMPLE.COM



$ vi /var/kerberos/krb5kdc/kdc.conf

해당 설정파일에 

max_life = 1d

max_renewable_life = 7d

설정값을 추가 한다.


[kdcdefaults]

kdc_ports = 88

kdc_tcp_ports = 88

[realms]

EXAMPLE.COM = {

#master_key_type = aes256-cts

acl_file = /var/kerberos/krb5kdc/kadm5.acl

dict_file = /usr/share/dict/words

admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab

supported_enctypes = aes256-cts:normal aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal


max_life = 1d

max_renewable_life = 7d

}


admin 권한을 위해 

/var/kerberos/krb5kdc/kadm5.acl 파일을 수정한다.

EXAMPLE.COM에 해당하는 부분 cloudera manager가 설치된 hostname으로 변경.


*/admin@EXAMPLE.COM     *




[ 4.커버로스 데이터베이스 생성 ]

DB와 비밀번호를 생성한다.


$ kdb5_util create -s

Enter KDC database master key: 비밀번호

Re-enter KDC database master key to verify: 비밀번호



[ 5.서비스 등록 ]

KDC서버를 재시작하고 EC2종료 후에도 실행되도록 한다.

$ sudo service krb5kdc restart

$ sudo service kadmin restart


$ sudo systemctl enable krb5kdc

$ sudo systemctl enable kadmin




[ 6.서비스 실행 ]

예시에선 해당계정을 cloudera-scm로 생성했다.

 

$ sudo kadmin.local

Authenticating as principal root/admin@ip-172-11-11-199.ap-northeast-1.compute.internal with password.

kadmin.local:  addprinc -pw 비밀번호 cloudera-scm/admin@ip-ip-172-11-11-199.ap-northeast-1.compute.internal

WARNING: no policy specified for cloudera-scm/admin@ip-ip-172-11-11-199.ap-northeast-1.compute.internal; defaulting to no policy

Principal "cloudera-scm/admin@ip-ip-172-11-11-199.ap-northeast-1.compute.internal" created.

kadmin.local:  modprinc -maxrenewlife 1week cloudera-scm/admin@ip-ip-172-11-11-199.ap-northeast-1.compute.internal


$ kinit cloudera-scm/admin@ip-172-11-11-199.ap-northeast-1.compute.internal

Password for cloudera-scm/admin@ip-172-11-11-199.ap-northeast-1.compute.internal: 비밀번호


$ klist -e

Ticket cache: KEYRING:persistent:1000:1000

Default principal: cloudera-scm/admin@ip-172-11-11-199.ap-northeast-1.compute.internal

Valid starting       Expires              Service principal

12/23/2020 16:07:04  12/24/2020 16:07:04  krbtgt/ip-172-11-11-199.ap-northeast-1.compute.internal@ip-172-11-11-199.ap-northeast-1.compute.internal

Etype (skey, tkt): aes256-cts-hmac-sha1-96, aes256-cts-hmac-sha1-96




[ 7.CDH에서 커버로스 연동 ]

최상단메뉴의 관리->보안 클릭 후 'Kerberos 설정' 버튼 클릭











시작에서 모두 체크 후 다음,

KDC정보에서 Kerberos 암호화 유형을 아래 aes256-cts-hmac-sha1-96로 설정,

나머지 정보는 cloudera manager설치 호스트네임 입력






나머지 설정은 아래와 같이 Default





















설정 완료 후 커버로스 설정이 됬음을 확인한다.




참조 : https://plenium.wordpress.com/2018/07/17/kerberos-setup-in-cloudera-hadoop


* CDH에서 커버로스 연동을 해제하고 싶다면 

https://www.programmersought.com/article/81434721806




2020년 12월 23일 수요일

[AWS] ELB 운영시 503에러



[ ELB 운영시 만날 수 있는 이슈 ]

















  • HTTP 502 Bad Gateway : 백엔드 인스턴스로부터 온 응답을 ELB가 못 받는 경우
  • HTTP 503 Service Unavailable : 인스턴스 등록이 되어있지 않은 경우 / 모든 인스턴스가 Unhealthy 상태인 경우
  • HTTP 504 Gateway Timeout : 인스턴스 CPU 점유율이 높거나 / 프로그램이 데이터베이스나 외부 API Dependency로 인해 응답을 못 받는 경우
  • Instance Out of Service : 인스턴스가 등록이 되지 않거나 / Unhealthy 상태인 경우
  • Health Check Failure : ELB 생성시 대상그룹을 통해 인스턴스에 200코드 값을 요청하는데, 이외의 코드를 반환할 경우 Health check fail 발생





[ 에러 화면 ] 

ELB 연결 작업시 503코드를 리턴 받아 그 원인을 짚어봄









1. EC2 문제인지 ELB문제인지 확인

프로그램은 php로 작성했기에 EC2 퍼블릭 아이피에 테스트 코드(info.php())를 찍어 

아래와 같은 화면이 나오는지 확인





















2. ELB생성시 대상등록 체크

위에서 언급했듯이 HTTP 503 Service Unavailable의 에러원인은 인스턴스 등록이 되어있지 않은 경우가 있다.

로드밸런싱 -> 대상그룹 -> 해당대상그룹클릭 -> Targets
에서 등록된 EC2를 확인 할 수 있다.

ELB 생성시 5단계 : 대상등록에서 해당 EC2를 체크하고 "등록된 항목에 추가" 버튼을 눌러 위의 등록된 대상 항목이 바뀌는 것을 보고 단계를 넘어가야한다.






















3. 대상그룹의 Status 확인 (Health Check)

대상그룹의 Status가 Unheathly가 된 경우에도 503에러를 띄울 수 있다. 즉 health check가 않았다. ELB를 연결한 해당 프로그램을 API용도로만 만들어서 http://example.com 도메인으로 접속시 어떤 페이지도 나타내지 않는다. 

따라서 도메인의 루트디렉토리(ex. /var/www/html )에 간단히 작성한 index.html를 넣어주면 200코드를 리턴하므로 수정 후 다시 테스트.

주의할 점이 ELB생성시 4단계 : 라우팅구성의 상태검사 경로를 다른 곳으로 지정했다면 200코드를 반환하는 파일을 지정한 곳에 위치시켜야한다. 










































[ 테스트 ]

대상그룹 Targets Health status가 healthy가 되어있는지 확인후
포스트맨으로 아래에 해당하는 URL에 접속해 기능테스트를 한다. 
  • EC2 Public IP
  • ELB DNS 이름(A레코드)





참조 :

https://www.slideshare.net/awskorea/3-operating-issue-solution-for-aws-customers

2020년 12월 15일 화요일

[Algorithm] 깊이/너비 우선탐색(DFS/BFS) [타겟넘버]

 

문제 ]

n개의 음이 아닌 정수가 있습니다.

이 수를 적절히 더하거나 빼서 타겟 넘버를 만들려고 합니다.

예를 들어 [1, 1, 1, 1, 1]로 숫자 3을 만들려면 다음 다섯 방법을 쓸 수 있습니다.


-1+1+1+1+1 = 3

+1-1+1+1+1 = 3

+1+1-1+1+1 = 3

+1+1+1-1+1 = 3

+1+1+1+1-1 = 3 


사용할 수 있는 숫자가 담긴 배열 numbers, 타겟 넘버 target이 매개변수로 주어질 때 숫자를 적절히 더하고 빼서 타겟 넘버를 만드는 방법의 수를 return 하도록 solution 함수를 작성해주세요.


[제한사항]

주어지는 숫자의 개수는 2개 이상 20개 이하입니다.

각 숫자는 1 이상 50 이하인 자연수입니다.

타겟 넘버는 1 이상 1000 이하인 자연수입니다.



DFS와 BFS ]


[ DFS ] 

깊이 우선 탐색(depth-first search)은 맹목적 탐색방법의 하나로, 노드가 트리일시 좌측노드를 기점으로 깊이 제한에 도달할 때 까지 목표노드를 찾는다. 제한 깊이까지 목표노드가 발견되지 않으면 최근노드의 부모노드로 되돌아가서 거치지 않은 다른 자식노드로 탐색을 시작한다. 여기서 부모노드로 되돌아오는 과정을 백트래킹(backtracking)이라고 한다.

















- 장점

  • 현 경로상의 노드들만을 기억하면 되므로 저장공간의 수요가 비교적 적다.
  • 목표노드가 깊은 단계에 있을 경우 해를 빨리 구할 수 있다.
- 단점
  • 해가 없는 경로에 깊이 빠질 가능성이 있다.
  • 얻어진 해가 최단 경로가 아닌 노드 전체를 탐색해야하는 경우가 있을 수 있다. 즉 최적의 방법으로 구한 해는 아닐 수 있다.




[ BFS ]

너비 우선 탐색(Breadth-first search)은 맹목적 탐색방법의 하나로, 루트 노드를 방문한 후 인접한 모든 노드들을 우선적으로 검색하는 방법이다. 더 이상 방문하지 않은 정점이 없을 때 까지 검색을 하는데 트리의 너비를 기준으로 검색한다.















- 장점

  • 출발 노드에서 목표노드까지의 최단 길이 경로를 보장한다.

- 단점

  • 너비경로가 길 경우 탐색가지가 증가함에 따라 보다 많은 기억 공간이 필요하다.



제출 DFS ] 

class Solution {
    public int solution(int[] numbers, int target) {
        int current = numbers[0];
        int answer = 0; 
        answer += dfs(current, 1, numbers, target);         
        answer += dfs(-current, 1, numbers, target); 
        
        return answer;
    }
    
    public int dfs(int prev, int index, int[] numbers, int target){
        
        if(index >= numbers.length){
            if(target==prev) return 1;
            return 0;
        }
        
        int cur1 = prev + numbers[index];
        int cur2 = prev - numbers[index];
        
        int ans = 0;
        ans += dfs(cur1, index+1, numbers, target);
        ans += dfs(cur2, index+1, numbers, target);
        return ans;
    }
}


풀이 DFS ]

재귀를 이용하여 DFS를 활용한다.





















제출 BFS ]

import java.util.Queue;
import java.util.LinkedList;

class Solution {
    
    class Pair{
        int cur;
        int index;
        
        Pair(int cur, int index){
            this.cur = cur;
            this.index = index;
        }
    }
    
    public int solution(int[] numbers, int target) {
        int answer = 0;
        Queue<Pair> queue = new LinkedList<Pair>();
        queue.offer(new Pair(numbers[0],0));
        queue.offer(new Pair(-numbers[0],0));
        
        while(!queue.isEmpty()){
            Pair p = queue.poll();
            if(p.index == numbers.length-1){
                if(p.cur == target){
                    answer += 1;
                }
                continue;
            }
            int c1 = p.cur + numbers[p.index+1];
            int c2 = p.cur - numbers[p.index+1];
            
            queue.add(new Pair(c1, p.index+1));
            queue.add(new Pair(c2, p.index+1));
        }
        return answer;
    }
    
}



풀이 ]


Queue는 인터페이스 형태로 LinkedList를 통해 생성한다. FIFO(Firtst in first out)이 큰 특징이다. 
Class Pair를 원소로 하는 Queue를 이용하여 너비우선탐색을 실행한다. Pair클래스는 깊이를 나타내는 index와 해당 값 (1 +1 +1 -1 ...)을 나타내는 cur를 변수로 가진다.
Index가 numbers길이와 같다면 cur를 target값과 비교해 일치하는지 확인하여 answer에 값을 +할지 안할지 결정한다.