2020년 11월 5일 목요일

Apache Kafka [4] - 명령어

 

* 카프카 PATH로 cd하여 실행함을 가정한다.


실행중지

[ 카프카 실행 ]

./bin/kafka-server-start.sh ./config/server.properties --daemon


[ 카프카 중지 ]

./bin/kafka-server-stop.sh



토픽

[ 토픽생성 ]

./bin/kafka-topics.sh --create --zookeeper 172.123.123.111:2181, 172.123.123.222:2181, 172.123.123.333:2181 --replication-factor 3 --partitions 3 --topic kafka-topic-name

: 카프카 서버3대 (broker)에 복제를 3번하는 토픽을 만든다.


[ 토픽확인 ]

./bin/kafka-topics.sh --bootstrap-server kafkahost_IP:9092 --list

or

./bin/kafka-topics --zookeeper 172.123.123.111:2181 --topic kafka-topic-name --describe

or

./bin/kafka-topics.sh --list --zookeeper 172.123.123.111:2181


[ 지노드확인 ]

zookeeperPath/bin/zkCli.sh 

: 주키퍼접속

zk] ls /hadoop-kafka/brokers/ids 

: znode를 통해 확인



프로듀서/컨슈머

[ Producer messages 보내기 ]

./bin/kafka-console-producer.sh --broker-list 172.123.123.111:9092 --topic kafka-topic-name


[ Producer에 파일로 message 보내기 ]

./bin/kafka-console-producer.sh --broker-list 172.123.123.111:9092 --topic kafka-topic-name < /home/ubuntu/sample.json


[ Consume messages 받기 ]

./bin/kafka-console-consumer.sh --bootstrap-server 172.123.123.111:9092 --topic kafka-topic-name



컨슈머그룹

[ Consumer group 생성 ]

./bin/kafka-console-consumer.sh --bootstrap-server 172.123.123.111:9092 --topic kafka-topic-name --consumer-property group.id=kafka-group-id


[ Comsumer groupID 확인 ]

./bin/kafka-consumer-groups.sh --bootstrap-server 172.123.123.111:9092 --list


[ Comsumer group 삭제 ]

./bin/kafka-consumer-groups.sh --zookeeper 172.123.123.111:2181 --delete --group kafka-group-id


[ Consumer group상태 및 offset 확인 ]

./bin/kafka-consumer-groups.sh --bootstrap-server 172.123.123.111:9020 --group kafka-group-id --describe



설정

[ 로그 보관 주기설정 ]

./bin/kafka-topics.sh --zookeeper 172.123.123.111:2181 --alter --topic kafka-topic-name --config retention.ms=86400000 (초단위 10일)

: 카프카의 디스크 공간이 부족하다면 로그보관주기설정을 짧게하여 retention.ms값을 낮추는 것이다.


[ 로그 보관 주기변경확인 ]

./bin/kafka-topics.sh --zookeeper 172.123.123.111:2181 --describe --topic kafka-topic-name 

2020년 11월 2일 월요일

[Algorithm] 배열합계 ( Simple Array Sum )

 

Simple Array Sum


문제 ]

Sample Input

6
1 2 3 4 10 11

Sample Output

31

Explanation

We print the sum of the array's elements: .



제출 ]

import java.io.*;
import java.math.*;
import java.text.*;
import java.util.*;
import java.util.regex.*;

public class Solution {

    static int simpleArraySum(int[] ar) {
        int sum = 0;
        for(int i=0; i < ar.length; i++){
            sum += ar[i];
        }
        return sum;

    }

    private static final Scanner scanner = new Scanner(System.in);

    public static void main(String[] args) throws IOException {
        BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(System.getenv("OUTPUT_PATH")));

        int arCount = Integer.parseInt(scanner.nextLine().trim());
        int[] ar = new int[arCount];

        String[] arItems = scanner.nextLine().split(" ");

        for (int arItr = 0; arItr < arCount; arItr++) {
            int arItem = Integer.parseInt(arItems[arItr].trim();
            ar[arItr] = arItem;
        }

        int result = simpleArraySum(ar);

        bufferedWriter.write(String.valueOf(result));
        bufferedWriter.newLine();

        bufferedWriter.close();
    }
}



풀이 ]

- scanner.nexLine()으로 배열 길이를 입력받는다. scanner클래스의 next()와 nextLine()의 차이점은 전자는 스페이스로 입력값이 종료되고 후자는 엔터이다. 

- simpleArraySum함수로 입력값만큼 for문을 돌려 합계를 리턴한다.

- bufferWriter.write() 함수는 인자값이 string형이여야 함으로 valeOf를 이용하여 형변환한다. 



HADOOP HDFS [3] = FileSystem Shell 명령어

 

FS Shell은 HDFS와 직접 상용작용할 뿐만 아니라 WebHDFS, AmazonS3 Azure WASB 등 하둡에서 지원하는 다른 파일시스템도 포함되어있다.

FS Shell은 " bin/hadoop fs <args>" 에 의해 호출된다. 

HDFS가 이미 사용되고 있다면 hdfs dfs는 동의어이다.(=hadoop fs)


* 'hadoop fs'와 'hdfs dfs' 명령어차이는 전자는 다른 응용프로그램(WebHDFS, AmazonS3 Azure 등) 에서도 사용하는 포괄적인 명령어고 후자는 HDFS에서만 사용할 수 있는 명령어이다. 



cat

hadoop fs -cat /user/hive/text1.txt

: 해당 파일내용을 stdout(콘솔) 출력한다.


chgrp

hadoop fs -chgrp [-R] 변경할그룹 변경할 파일or디렉토리명

: 디렉토리 또는 파일의 소유 그룹을 변경한다. 옵션 -R은 디렉토리 하위모든 파일까지 변경.


chmod

hadoop fs -chmod 744 hdfs파일or디렉토리먕

: hdfs파일의 권한을 변경한다.


chown

hadoop fs -chown 변경할소유자 hdfs파일or디렉토리명

: 파일 소유권을 변경한다. 옵션 -R은 디렉토리 하위모든 파일까지 변경.


copyFromLocal / moveFromLocal / appendToFile / put

hadoop fs -copyFromLocal <로컬파일경로> <hdfs파일경로>

: 로컬파일을 hdfs파일시스템에 복사/이동 시킨다.


copyToLocal / moveToLocal

hadoop fs -copyFromLocal <hdfs파일경로> <로컬파일경로>

: hdfs파일을 로컬경로에 복사/이동 시킨다.


cp / mv

hadoop fs -copyFromLocal <이동시킬 hdfs파일경로> <이동할 hdfs파일경로>

: hdfs파일을 지정한 hdfs경로에 복사/이동 시킨다.


count

hadoop fs -count [-q] [-h] [-v] [t (storage type) ] [-u] <hdfs경로>

: hdfs경로의 파일 및 바이트 수, 디렉토리수 할당량 등을 출력한다.

  • -q : 할당량 함께 표시
  • -u : 할당량과 사용량 표시
  • -t : -q, -u옵션과 사용되며 저장소 유형에 대한 할당량과 사용량 표시
  • -h : 파일크기를 사람이 읽을 수 있는 형식으로 표시
  • -v : 헤더라인을 표시


createSnapshot

hdfs dfs -createSnapshop <hdsf경로> <snapshot이름>

: hdfs 스냅샷 생성


renameSnapshot

hdfs dfs -createSnapshop <hdsf경로> <old snapshot이름> <new snapshot이름>

: hdfs 스냅샷 파일이름 변경


deleteSnapshot

hdfs dfs -deleteSnapshot<hdsf경로> <snapshot이름>

: hdfs 스냅샷 삭제


* 스냅샷 관련 참조내용 

: https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsSnapshots.html


df

hadoop fs -df <hdfs경로>

: 할당가능량(free space)를 표시


du

hadoop fs -du [옵션] <hdfs경로>

: 지정된 경로의 파일 및 디렉토리 길이를 표시

  • -s : 개별파일이 아닌 전체 파일길이의 요약이 표시. 
  • -h : 사람이보기편한 방식으로 표시.
  • -v : 열이름을 헤드라인으로 표시.


find

hadoop fs -find / -name test -print ( hadoop fs -find <path> -name filename <expression> )

: 파일을 찾는다. 


getmerge

hadoop fs -getmerge -nl <hdfs경로> <로컬경로>

: hdfs경로 안에 있는 파일들을 합병한 결과파일을 로컬경로에 생성한다. 


head / tail

hadoop fs -head <hdfs경로>

: 지정된 파일의 앞부분/뒷부분을 stdout출력한다.


ls

hadoop fs -ls <hdfs경로>

: 해당경로의 파일or디렉토리를 조회한다.


mkdir

hadoop fs -mkdir <생성할 디렉토리명>

: 디렉토리를 생성


rm

hadoop fs -rm [옵션] <hdfs파일or디렉토리>

: 파일or디렉토리 삭제

  • - f : 진단메세지 무시
  • - R : 디렉토리와 하위파일모두 삭제


touch

hadoop fs -touch <pathname>

: 지정경로의 파일의 수정시간을 현재시간으로 업데이트. 파일이 없으면 길이0의 파일로 생성


touchz

hadoop fs -touchz <hdfs경로>

: 길이 0의 파일을 생성, 기존 파일이 있다면 오류


truncate

hadoop fs -truncate <파일길이> <hdfs경로>

: 지정된 경로의 지정한 길이로 파일을 잘라낸다.


2020년 10월 27일 화요일

[Algorithm] 배열합계 ( A Very Big Sum )

 A Very Big Sum


문제] 

n this challenge, you are required to calculate and print the sum of the elements in an array, keeping in mind that some of those integers may be quite large.

Function Description

Complete the aVeryBigSum function in the editor below. It must return the sum of all array elements.

aVeryBigSum has the following parameter(s):

  • int ar[n]: an array of integers .

Return

  • long: the sum of all array elements

Input Format

The first line of the input consists of an integer .
The next line contains  space-separated integers contained in the array.

Output Format

Return the integer sum of the elements in the array.

Constraints


Sample Input

5

1000000001 1000000002 1000000003 1000000004 1000000005

Output

5000000015

* 요약 : 문자를 콘솔로 받아서 Long타입 배열에 저장 후 배열인자들의 합을 구한다. 



제출]

import java.io.*;
import java.math.*;
import java.security.*;
import java.text.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.regex.*;

public class Solution {

    // Complete the aVeryBigSum function below.
    static long aVeryBigSum(long[] ar) {
        long result = 0;
        for(int i=0; i < ar.length; i++) {
            result+=ar[i];
        }
        return result;
    }

    private static final Scanner scanner = new Scanner(System.in);

    public static void main(String[] args) throws IOException {
        BufferedWriter bufferedWriter = 
            new BufferedWriter(new OutputStreamWriter(System.out));

        int arCount = scanner.nextInt();
        scanner.skip("(\r\n|[\n\r\u2028\u2029\u0085])?");

        long[] ar = new long[arCount];

        String[] arItems = scanner.nextLine().split(" ");
        scanner.skip("(\r\n|[\n\r\u2028\u2029\u0085])?");

        for (int i = 0; i < arCount; i++) {
            long arItem = Long.parseLong(arItems[i]);
            ar[i] = arItem;
        }

        long result = aVeryBigSum(ar);

        bufferedWriter.write(String.valueOf(result));
        bufferedWriter.newLine();

        bufferedWriter.close();

        scanner.close();
    }
}



풀이]

- Scanner class : 읽은 바이트를 문자, 정수, 실수, 불린, 문자열 등
다양한 타입으로 변환하여 리턴(java.util.scanner

- System.in : 자바의 표준입력 스트림(키보드)

- scanner.skip("(\r\n|[\n\r\u2028\u2029\u0085])?"); :
여러줄을 입력받아야하는 상황에서 엔터도 한줄로 인식하기에 예외처리코드
예를 들어

"
red green blue
[엔터]
japan korea
"
위의 형식으로 입력이 되고 array1 =[ red, green, blue ], array2 = [ japan, korea]
로 저장하려고 할 때, [엔터] 입력 때문에 array2 = [] 가 될 것이다.
참고 : https://dev-kimse9450.tistory.com/17

- aVeryBigSum 함수에선 for문을 이용해 배열인자의 합계를 구한다.


2020년 10월 25일 일요일

Hive [4] - 파일 병합 (File Merge)

 

파일Merge


너무 많은 파일이나 중복된 파일이 HDFS블록 크기보다 작을경우 네임노드의 메모리를 소진시킨다. Hive의 경우 작은 파일을 많이 만들지 않도록 파일병합을 설정할 수 있다.


1. Merge 설정

SET hive.merge.mapfiles : 맵전용 잡의 작은 파일들을 병합한다.(Default true)

SET hive.merge.mapredfiles : 맵리듀스 잡의 결과들을 병합한다.

SET hive.merge.size.per.task: 작업 종료시 병합된 파일을 크기를 정의한다(Default 256MB)

SET hive.merge.smallfiles.avgsize : 파일 병합을 실행하는 트리거 임계값이다. (Default 16MB)


출력파일의 크기가 hive.merge.smallfiles.avgsize보다 작으면 병합을 실시한다.

작업 평균출력파일크기가 이 숫자보다 작을 때( hive.merge.smallfiles.avgsize ) Hive는 출력파일을 더 큰 파일로 병합하는 작업을 시작한다.



2. Query를 사용

MapReduce 작업 수를 설정하여 최종 출력파일이 항상 1이 되도록 조정한다.

  • SET mapred.reduce.tasks=1


overwrite로 테이블에 삽입하여 작은 수의 파일들을 병합

  • INSERT overwrite table <table_name> SELECT * FROM <table_name>;



참조 : 

https://docs.cloudera.com/documentation/enterprise/5-9-x/topics/introduction_compression_snappy.html

Hive [3] - 파일압축 (File Compression)

 

Hive는 TextFile, SequenceFile, RcFile, OrcFile, ParquetFile포맷을 지원한다. 테이블의 파일형식을 지원하는 방법은 아래와 같다.

1. CREATE TABLE … STORE AS <File_Format>

2. ALTER TABLE … [PARTITION partition_spec] SET FILEFORMAT <File_Format>

3. SET hive.default.fileformat=<File_Format> (the default fileformat for table)



[1] File Compress


Hive의 압축은 Full압축이 아닌 중간파일압축(Intermediate compression)을 통해 매퍼와 리듀스 사이의 데이터전송량을 현저하게 줄인다. Hive가 생산한 중간파일을 여러 맵리듀스 작업간에 압축하려면 SET hive.exec.compress를 설정해야한다. (Default는 false)

  1. Deflate - org.apache.hadoop.io.compress.DefaultCodec
  2. GZip - org.apache.hadoop.io.compress.GzipCodec
  3. Bzip2 - org.apache.hadoop.io.compress.BZip2Codec
  4. LZO - com.hadoop.compression.lzo.LzopCodec
  5. LZ4 - org.apache.hadoop.io.compress.Lz4Codec
  6. Snappy - org.apache.hadoop.io.compress.SnappyCodec

중간파일 압축은 다중 맵과 리듀스를 사용하는 특정 job의 디스크 공간만 절약하며 예시는 아래와 같다.

ex)

SET hive.exec.compress.output=true

SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec


set the following properties in mapred-site.xml:

  • For MRv1:
<property>
  <name>mapred.compress.map.output</name>  
  <value>true</value>
</property>
<property>
  <name>mapred.map.output.compression.codec</name>  
  <value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
  • For YARN:
<property>
  <name>mapreduce.map.output.compress</name>  
  <value>true</value>
</property>
<property>
  <name>mapred.map.output.compress.codec</name>  
  <value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>


- ORC or Parquet파일 사용시

1.

SET parquet.compression=GZIP

SET parquet.compression=SNAPPY


2.

create table ...

tblproperties ("orc.compress"="SNAPPY")

tblproperties ("parquet.compress"="SNAPPY")



[2] Hot data


자주 쓰거나 스캔하는 데이터는 hdfs shell명령어로 hot data로 지정할 수 있다. hot data에 대한 데이터 복제팩터를 증가시키면 hive작업에 의해 참조할 가능성이 증가하고 쿼리성능이 향상될 수 있다. 경로가 디렉토리일 경우 디렉토리 아래에 있는 모든 파일의 복제팩터를 증가시킨다.

setrep

Usage: hdfs dfs -setrep [-R] [-w] <numReplicas> <path>

Changes the replication factor of a file. If path is a directory then the command recursively changes the replication factor of all files under the directory tree rooted at path.

Options:

  • The -w flag requests that the command wait for the replication to complete. This can potentially take a very long time.
  • The -R flag is accepted for backwards compatibility. It has no effect.

Example:

  • hdfs dfs -setrep -w 3 /user/hadoop/dir1

Exit Code:

Returns 0 on success and -1 on error.




참조 :

https://timepasstechies.com/hive-tutorial-8-hive-performance-tuning-using-data-file-optimization-using-file-format-compression-storage-optimization/


2020년 10월 22일 목요일

HADOOP HDFS [4] - AWS(EC2) vCPU와 YARN vCORE의 관계

 

1. EC2 vCPU


AWS에서 나와있는 EC2인스턴스별 유형을 살펴보면 이해가 쉽다.

https://aws.amazon.com/ko/ec2/instance-types/

일반 컴퓨팅의 CPU와 같다고 보면 된다.



2. YARN vCORE


YARN이 병렬처리(Thread)를 하기 위해 실제 코어(CPU)를 추상화한 가상코어이다.

Windows OS가 CPU를 하나로 메모리에 응용프로그램을 실행시키는 것과 같이, YARN이 컨테이너를 통한 메모리할당을 할 때 vCORE를 사용한다. 

http://yourHDFSurl:8088에서 설정된 vCORE를 확인하거나 

yarn-site.xml에서 yarn.nodemanager.resource.cpu-vcores 프로퍼티에서 변경할 수 있다. ( Default는 8이다 )




3. vCPU와 vCORE의 관계


YARN의 CORE 수 자체는 CPU에 대해 큰 영향은 주진 않는다. core가 8로 지정되었고 cpu가 4개이면 YARN은 cpu 4개가 사용할 수 있는 최대 컨테이너만 수만을 사용할 뿐이다. 그러나 반대로 cpu갯수가 8개이고 core가 4개이면 자원낭비를 하고 있는 셈이다.

일반적으로 cpu의 갯수에 1~2배정도로 yarn core 수를 설정하지만 cpu의 코어가 하이퍼코어(CPU하나가 멀티스레드 기능을 하는 것)가 아닌 이상 cpu수와 core수는 일치시키는게 좋다. ( Cloudera에서는 Number of physical disks used for Datanode * 2를 권장)

또는 이것 또한 사용하는 응용프로그램에 따라 차이가 있는데 하나의 프로세스로 고정된 자원을 계속 할당하는 Spark의 경우 실제 cpu와 core의 갯수가 같을 수록 최대의 성능을 발휘한다.  

그렇다면 EC2 CPU는 멀티스레드를 몇개까지 사용할 수 있는가? 친절하게도 vCPU = ( cpu * 스레드 수 )이므로 하이퍼코어를 고려할 필요없다.

결론적으로 none-yarn어플리케이션, 어플리케이션에 따른 컨테이너당 사용하는 vcore등 변수를 고려해 yarn vCore설정을 해야한다. 


참고 

: https://docs.aws.amazon.com/ko_kr/AWSEC2/latest/UserGuide/instance-optimize-cpu.html

: https://blog.cloudera.com/how-to-tune-your-apache-spark-jobs-part-2/