2020년 11월 30일 월요일

Apache Nifi - 카프카 데이터 분산처리 ( Distribute Kafka Data)


1. Nifi Processor MergeContent


MergeContent프로세서를 이용해 Input processor(ex.Kafka, File)에서 들어오는 데이터를

적절한 크기로 Merge한 다음에 다음 단계로 넘겨주는 Flow를 기대했다.

그러나 MergeContent는 이미 받은 데이터를 다시 병합하여 데이터를 복제하는 형식으로 기대했던 결과를 가져오지는 못했다.

예를 들어 100의 데이터가 들어온다면 100개의 데이터를 차곡차곡 쌓았다가 하나의 데이터로 병합하여 결국 1개의 데이터가 들어와야 하는것을 결과로 예상했지만, MergeContent는 100개의 데이터를 일단 Nifi 저장 후, 그 데이터들을 병합한 데이터셋 복제복을 만들어 다음 프로세스로 넘겨준다. 결국 병합된 1개의 데이터셋을 전달하지만, 이중 저장을 하기 때문에 비효율적이다.






2. 하나의 클래스터에서 에서 3개의 nifi processor로 분산처리




해당 그림처럼 관련 processor를 여러개 만들어 분산처리가 될 때 

확인해야 할 사항이 

1. 데이터가 중복이 되는가?

2. 데이터가 누락이 되는가?

3. 처리속도가 늘어나는가?

정도가 있을 것 같다.


테스트를 위해 카프카데이터를 받아오는 Flow 3개를 셋팅하고, 

인위적으로 프로듀서에 데이터를 넣는 코드를 작성한다. 

테스트 토픽은 partition 3개로 설정하여 테스트하였다.


[ Nifi Flow ]



[ Python 코드 ]
from kafka import KafkaProducer
import time

producer = KafkaProducer(bootstrap_servers=['192.111.111.111:9092','192.222.222.222:9092','192.3333.333.333:9092'])

for i in range(1,5000):
    producer.send('test',str.encode('kafka:-%d' % i))
    time.sleep(0.5)



Hive테이블에 저장 후 데이터들을 보니 중복,누락 없이 3개의 프로세서가 데이터를 받아온다.

( 파일명 : 1_날짜_tuuid / 2_날짜_tuuid / 3_날짜_tuuid )





3. 3개의 NiFi 클래스터로 분산처리


2.에서는 하나의 클래스터에서 3개의 프로세스로 처리했다면, 클러스터 단계에서도 여러 서버를 두어 트래픽을 분산시킬 수 있다.


카프카 partition과 나이파이 kafka-consumer는 N:1로 매칭된다. 즉 partition 3개 : kafka-consumer 3개로 설정하면 1:1로 매칭되어 최적의 효율을 보여준다.

아래그림을 보면 이해가 편하다. Concurrent Task를 2.에서 작성한 kafka-consumer processor라고 보면 될 것이다.



반면 Partition 2개 : kafka-consumer 3개로 설정하면 kafka-consumer 1개는 데이터를 소비하지 않으므로 비효율적이다.



Partition 4개 : kafka-consumer 2개 로 설정하면 2:1로 매칭되어 아래와 같은 구성이 된다. 



2.와 3.의 방법을 둘다 적용하는 3개의 클러스터에서 2개의 nifi processor로 분산처리하는 방법도 있다.



NiFi에서 특정 파티션으로 연결할 수 있는 기능은 없으며 자동적을 NiFi cluster와 concurrent Task 수에 따라 지정된다. 따라서 시스템을 구성하기 전에 파티션수와 컨슈머 수, 트래픽량을 먼저 고려해야한다.





[ 참조 ]

MergeContent : 

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.5.0/org.apache.nifi.processors.standard.MergeRecord/additionalDetails.html


Remote Process Group 을 이용 : 

https://pierrevillard.com/2017/02/23/listfetch-pattern-and-remote-process-group-in-apache-nifi/


Nifi Distribute : 

https://community.cloudera.com/t5/Community-Articles/Integrating-Apache-NiFi-and-Apache-Kafka/ta-p/247433




2020년 11월 27일 금요일

[Algorithm] 정렬 (K번째 수)


문제 ]

배열 array의 i번째 숫자부터 j번째 숫자까지 자르고 정렬했을 때, k번째에 있는 수를 구하려 합니다.

예를 들어 array가 [1, 5, 2, 6, 3, 7, 4], i = 2, j = 5, k = 3이라면

  1. array의 2번째부터 5번째까지 자르면 [5, 2, 6, 3]입니다.
  2. 1에서 나온 배열을 정렬하면 [2, 3, 5, 6]입니다.
  3. 2에서 나온 배열의 3번째 숫자는 5입니다.

배열 array, [i, j, k]를 원소로 가진 2차원 배열 commands가 매개변수로 주어질 때, commands의 모든 원소에 대해 앞서 설명한 연산을 적용했을 때 나온 결과를 배열에 담아 return 하도록 solution 함수를 작성해주세요.

제한사항
  • array의 길이는 1 이상 100 이하입니다.
  • array의 각 원소는 1 이상 100 이하입니다.
  • commands의 길이는 1 이상 50 이하입니다.
  • commands의 각 원소는 길이가 3입니다.
입출력 예
arraycommandsreturn
[1, 5, 2, 6, 3, 7, 4][[2, 5, 3], [4, 4, 1], [1, 7, 3]][5, 6, 3]
입출력 예 설명

[1, 5, 2, 6, 3, 7, 4]를 2번째부터 5번째까지 자른 후 정렬합니다. [2, 3, 5, 6]의 세 번째 숫자는 5입니다.
[1, 5, 2, 6, 3, 7, 4]를 4번째부터 4번째까지 자른 후 정렬합니다. [6]의 첫 번째 숫자는 6입니다.
[1, 5, 2, 6, 3, 7, 4]를 1번째부터 7번째까지 자릅니다. [1, 2, 3, 4, 5, 6, 7]의 세 번째 숫자는 3입니다.



제출 ] 

import java.util.Arrays;

class Solution {


    public int[] solution(int[] array, int[][] commands) {

        int[] answer = new int[commands.length];

        int[] temp = {};


        for(int i=0; i <commands.length; i++){

            int idx = commands[i][2]-1;

            temp = Arrays.copyOfRange(array, commands[i][0]-1, commands[i][1]);

            Arrays.sort(temp);

            answer[i] = temp[commands[i][2]-1];

          }


        return answer;

    }

}



풀이 ]

1. Out Of Index 에러를 막기위해 answer INT배열을 새로운 길이로 재선언한다. 

2. Arrays.copyOfRange함수를 이용해 배열을 자른다.

Arrays.copyOfRange(자를 배열, 시작인덱스, 끝인덱스)를 통해 배열을 잘라도 

원 배열의 값은 변경되지 않는다.





2020년 11월 25일 수요일

Hive [7] - Hive to Phoenix Table


[ 사전셋팅 (Prerequisites) ]

phoenix-version-hive.jar 파일을 찾고, 

해당 하이브 설정파일에 value를 추가하여 하이브 맵리듀스 잡이 jar파일 사용하게 한다.

1) hive-env.sh : HIVE_AUX_JARS_PATH=<path to jar>

2) hive-site.xml

<property>

    <name>hive.aux.jars.path</name>

    <value>file://<path></value>

</property>




[ 테이블 생성 ]

jar파일에 있는 storage Handler는 internal과 external 하이브테이블 생성을 지원한다.


1) Create Internal Table

Hive에서 테이블생성시 Phoenix에도 자동으로 테이블 생성되며, Hive나 Hue에서 데이터 조회와 같은 쿼리가 가능하다.

Internal Phoenix테이블은 Hive테이블 lifecycle을 따른다. 즉 Hive테이블에서 데이터 또는 테이블이 삭제되면 Phoenix테이블 또한 동일하게 영향을 받는다.

create table phoenix_table (
	  s1 string,
	  i1 int,
	  f1 float,
	  d1 double
	)
	STORED BY 'org.apache.phoenix.hive.PhoenixStorageHandler'
	TBLPROPERTIES (
	  "phoenix.table.name" = "phoenix_table",
	  "phoenix.zookeeper.quorum" = "localhost",
	  "phoenix.zookeeper.znode.parent" = "/hbase",
	  "phoenix.zookeeper.client.port" = "2181",
	  "phoenix.rowkeys" = "s1, i1",
	  "phoenix.column.mapping" = "s1:s1, i1:i1, f1:f1, d1:d1",
	  "phoenix.table.options" = "SALT_BUCKETS=10, DATA_BLOCK_ENCODING='DIFF'"
	);



2) Create External Table

Hive에서 테이블생성시 Phoenix에 맵핑되는 테이블이 없으면 생성이 불가능하다. 테이블과 데이터를 따로 metadata로 매니징하기에 Hive나 Hue에서 데이터 조회 등의 쿼리를 실행 할 수 없다. 

External Phoenix테이블은 Hive테이블에 큰 영향을 받지않는다. Hive테이블에서 데이터 또는 테이블이 삭제되어도 Phoenix테이블 또한 동일하게 영향을 받는다.

create external table ext_table (
  i1 int,
  s1 string,
  f1 float,
  d1 decimal
)
STORED BY 'org.apache.phoenix.hive.PhoenixStorageHandler'
TBLPROPERTIES (
  "phoenix.table.name" = "ext_table",
  "phoenix.zookeeper.quorum" = "localhost",
  "phoenix.zookeeper.znode.parent" = "/hbase",
  "phoenix.zookeeper.client.port" = "2181",
  "phoenix.rowkeys" = "i1",
  "phoenix.column.mapping" = "i1:i1, s1:s1, f1:f1, d1:d1"
);



[ 생성 Properties ]

  1. phoenix.table.name
    • Specifies the Phoenix table name
    • Default: the same as the Hive table
  2. phoenix.zookeeper.quorum
    • Specifies the ZooKeeper quorum for HBase
    • Default: localhost
  3. phoenix.zookeeper.znode.parent
    • Specifies the ZooKeeper parent node for HBase
    • Default: /hbase
  4. phoenix.zookeeper.client.port
    • Specifies the ZooKeeper port
    • Default: 2181
  5. phoenix.rowkeys
    • The list of columns to be the primary key in a Phoenix table
    • Required
  6. phoenix.column.mapping
    • Mappings between column names for Hive and Phoenix. See Limitations for details.



[ Meta데이터위치 ]

Zookeeper znode를 확인하면 /hbase/archive/data 에 hive와 연동된 phoenix메타데이터가 담긴다. 

/hbase/data에 테이블이 정의되어 있더라도 해당 파일을 지우면 phoenix에서 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 에러를 띄우며 트래킹할 수 없다는 메세지를 보낸다.




참조 : https://phoenix.apache.org/hive_storage_handler.html



 

[AWS] Php를 이용한 S3 이미지 업로드와 CloudFront에 추가

[ 순서 ]

Preparation : php설치, AWS CloudFront 사용 중

  • S3 사용을 위해 AWS-SDK-PHP 설치

  • 보안자격증명 key, secret 얻기

  • S3이미지 업로드 Php코드작성

  • S3버킷만들기

  • CloudFront에 S3 연동




[ AWS-SDK-PHP 설치 ]

1 2 3 4 5 6 7 8 9 10 11 php composer 설치 #php -r "copy('https://getcomposer.org/installer', 'composer-setup.php');" #php -r "if (hash_file('sha384', 'composer-setup.php') === '756890a4488ce9024fc62c56153228907f1545c228516cbf63f885e036d37e9a59d27d63f46af1d4d07ee0f76181c7d3') { echo 'Installer verified'; } else { echo 'Installer corrupt'; unlink('composer-setup.php'); } echo PHP_EOL;" #php composer-setup.php #php -r "unlink('composer-setup.php');" composer 전역설정 # sudo cp composer.phar /usr/bin/composer aws-sdk-php설치 #sudo composer require aws/aws-sdk-php

 


[ 보안자격증명 key, secret 얻기 ]

우측상단 계정클릭 → 내보안자격증명 → 액세스키 생성하여 key, secret 포함된 CSV파일 다운












[ S3버킷 설정 ]

1. AWS S3탭에 들어가 버킷생성

2.버킷을 퍼블릭으로 설정

3.해당버킷대쉬보드 → 권한 → 버킷정책

{
"Version": "2020-12-04",
"Statement": [
{
"Sid": "AddPerm",
"Effect": "Allow",
"Principal": "",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::yourBucketName/"
}
]
}


[ S3이미지 업로드 PHP code ]

Post로 File업로드시 필요한 파람터 값을 받는다. S3에 대한 리전, key, secret, 버킷이름 등의 정보를 입력하고, 앞서 다운받았던 aws-sdk-php API를 이용해 업로드 코드를 작성한다.

예제 )

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 #Php to aws S3 require '/PATHtoAWSSDK/aws-sdk/vendor/autoload.php'; use Aws\S3\S3Client; use Aws\Exception\AwsException; use Aws\Credentials\CredentialProvider; #Bucket 연결 객체 $s3Client = new S3Client([ 'region' => 'ap-northeast-1', 'version' => 'latest', # 자격증명 'credentials' => [ 'key' => 'your_key', 'secret' => 'your_secret', ], ]); #File Upload to S3 $fp = fopen($upload_file, r); $result = $s3Client->putObject([ 'Bucket'=>'upload-image-file', 'Key' => $_FILES['file1']['name'], 'Body' => $fp ]); fclose($fp);

참고 : https://docs.aws.amazon.com/ko_kr/sdk-for-php/v3/developer-guide/getting-started_basic-usage.html



[ CloudFront에 S3추가 ]

AWS CloudFront → Create Distribution → Web섹션 Get Started → Origin Domain Name에 만든 S3 Name으로 입력한다.














나머지 설정은 읽어보면서 입력하고 Distribution Settings에 Price Class입력란을 보면 Class 마다 제공되는 Location과 가격이 다르다.














예를 들어 내 서비스가 아프리카국가의 유저들에게는 지원을 하지 않는다면 요금계층200을 선택해 요금을 더 지불하지 않도록 한다.


또한 모든 Location에 CloudFront를 지원하지 않기에 아래 그림을 참조하여 가장 가까운 Edge Location을 찾아 시스템라인을 구성해야 할 것이다.

예를 들어 중국하얼빈 유저들에게 내 웹/앱 서비스를 할 경우 CloudFront Location은 거리상으로 베이징보다 서울이 가깝다고 해보자. 그럼 중국이 포함된 Price Class를 사용하는게 아닌 한국이 포함된 Class를 써야 가장 최적의 네트워크 속도를 가질 것이다.



마지막으로 SSL Certificate된 사용할 도메인을 입력한다.



[ CloudFront Behaviors ]

cloudfront → 변경할 Distributions → Behaviors 탭 → Create Behavior 버튼 → 생성후 저장 Pattern은 * 로 작성




CloudFront 참조 : https://docs.aws.amazon.com/ko_kr/AmazonCloudFront/latest/DeveloperGuide/Introduction.html

https://www.slideshare.net/lacryma1/53-aws-summit-seoul-2015

https://docs.aws.amazon.com/ko_kr/AmazonCloudFront/latest/DeveloperGuide/distribution-web-values-specify.html#DownloadDistValuesPathPattern



RedHat7&Centos7 APM(apache/Php/mysql) 설치

[ 버전정보 ]

  • Httpd 2.4
  • Php 7.3
  • mariaDB 5.5


[ EC2에 APM설치 ]

  • httpd2.4 설치

#sudo yum install httpd

#httpd -v
Server version: Apache/2.4.6 (Red Hat Enterprise Linux)

 

  • Apache httpd vhost 설정

#sudo vi /etc/httpd/conf/httpd.conf

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 Listen 8080 (추가) ..... (VirtualHost 추가) <VirtualHost *:80> ServerName servername:80 ServerAdmin root@example.com DocumentRoot "/var/www/html/server1" CustomLog "logs/access_log" combined ErrorLog "logs/error_log" <Directory "/var/www/html/server1"> Options Indexes FollowSymLinks AllowOverride None Require all granted </Directory> </VirtualHost> <VirtualHost *:8080> ServerName servername:8080 ServerAdmin root@example.com DocumentRoot "/var/www/html/server2" CustomLog "logs/access_log" combined ErrorLog "logs/error_log" <Directory "/var/www/html/server2"> Options Indexes FollowSymLinks AllowOverride None Require all granted </Directory> </VirtualHost> .....

#sudo systemctl restart httpd

 

  • php73 설치

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 remi repository를 yum 에 추가 한다. # wget https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm # rpm -Uvh epel-release-latest-7.noarch.rpm # wget http://rpms.remirepo.net/enterprise/remi-release-7.rpm # rpm -Uvh remi-release-7.rpm # yum install -y yum-utils # yum-config-manager --enable remi-php73 기존의 설치된 PHP 패키지를 확인하여 잘못된 패키지가 삭제되지 않도록 한다. # yum list installed | cut -d " " -f 1 | grep php 기존 설치된 PHP를 제거. (php5) #yum remove -y `yum list installed | cut -d " " -f 1 | grep php` php 패키지 설치. php-common 외의 패키지는 자신의 상황에 맞게 조정해서 설치한다. # yum install -y php-common php-fpm php-cli \ php-process \ php-opcache php-pecl-apcu \ php-mysqlnd php-pdo \ php-gd \ php-mbstring php-xml \ php-pecl-zip \ php-bcmath #php -v PHP 7.3.34

 

  • mariadb 설치

#yum install mariadb mariadb-server 

#mysql --version
mysql Ver 15.1 Distrib 5.5.68-MariaDB, for Linux (x86_64) using readline 5.1

 

  • APM설정파일 경로

/etc/php.ini

/var/lib/php/

/etc/httpd/conf.d/php.conf

/etc/httpd/conf/httpd.conf

 

  • 명령어

-실행

sudo systemctl start httpd

sudo systemctl start mariadb

-확인

sudo systemctl status httpd

sudo systemctl status mariadb