2021년 11월 13일 토요일

[ Airflow ] 초기셋팅

예제Airflow실행하기

 https://aldente0630.github.io/data-engineering/2018/06/17/developing-workflows-with-apache-airflow.html


도커로 Airflow 실행하기

https://jybaek.tistory.com/922


Airflow Example 지우는 법

airflow.cfg or docker-compose.yml

load_examples = False


기본 명령어 

[ 목록 ]

airflow dags list

airflow tasks list tutorial

airflow tasks list tutorial --tree


[ 테스트 ]

airflow dags test tutorial '2021-11-08'

airflow tasks test tutorial print_date 2015-06-01


[ 백필 ]

airflow dags backfill tutorial \

    --start-date 2015-06-01 \

    --end-date 2015-06-07



Cloudera Manager [3] - 클러스터 교체시 주의할 점

 

CDH 서버 스펙업시

HDFS시스템을 꺼놓고 ec2유형변경을 하면 

다시 시작시 hdfs가 인식을 못할 경우가 있다.


HDFS가 다시 잡을 수 있게 시스템을 킨 상태에서

EC2종료후 유형변경 작업을 한다.

2021년 11월 7일 일요일

[ Docker ] 기본 명령어


[ 도커 실행 ]

docker run [OPTIONS] IMAGE[:TAG|@DIGEST] [COMMAND] [ARG...]


$ docker run ubuntu:16.04

컨테이너 시작, 해당 컨테이너가 없다면 다운로드 후 컨테이너 생성 후 시작


$ docker run --rm -it ubuntu:16.04 /bin/bash

--rm : 프로세스가 종료되면 컨테이너도 삭제

-it : 터미널 입력을 위한 옵션

/bin/bash : 해당 컨테이너의 Bash Shell 접속


$ docker run -d -p 1234:6397 redis

-d : 백그라운드로 실행

-p : 포트연결, 1234의 호스트 포트를 6379의 컨테이너 포트로 연결

- redis test example 

    $ telnet localhost 1234

    set mykey hello

    +OK

    get mykey

    $2

    hello


$ docker run -d -p 3306:3306 -e MYSQL_ALLOW_EMPTY_PASSWORD=true --name mysql mysql:5.7

-e : 컨테이너 내에서 사용할 환경변수 (위 옵션 내용 = pwd없이 root계정 접속 )

--name : 컨테이너 이름 설정



[ 컨테이너 목록 확인 ]

docker ps [OPTIONS]

$ docker ps : 실행 중인 컨테이너 목록

$ docker ps -a : 실행 중 + 실행 종료된 컨테이너 목록


[ 컨테이너 중지 ]

docker stop [OPTIONS] CONTAINER [CONTAINER...]

$ docker stop ${DOCKER_CONTAINER_ID} : 다른 ID랑 중복되지 않는다면 64글자 다 적지 않아도 됨


[ 컨테이너 삭제 ]

docker rm [OPTIONS] CONTAINER [CONTAINER...]

$ docker rm ${CONTAINER_ID_1} ${CONTAINER_ID_2} : 여러 컨테이너 동시 삭제 가능

$ docker rm -v $(docker ps -a -q -f status=exited) : 중지된 컨테이너 전체 삭제


[ 이미지 목록 확인 ]

docker images [OPTIONS] [REPOSITORY[:TAG]] 

$ docker images : 다운로드한 이미지 목록 확인


[ 이미지 다운로드 ]

docker pull [OPTIONS] NAME[:TAG|@DIGEST]

$ docker pull ubuntu:14.04 : 이미지가 중복되면 최신버전으로 재 다운로드


[ 이미지 삭제 ]

docker rmi [OPTIONS] IMAGE [IMAGE...]

$ docker rmi ${IMAGE_ID}


[ 컨테이너 로그 보기 ]

docker logs [OPTIONS] CONTAINER

$ docker logs --tail 10 ${CONTAINER_ID}

$ docker logs -f 10 ${CONTAINER_ID} : 실시간 로그 생성 확인


[ 컨테이너 명령어 실행하기 ]

docker exec [OPTIONS] CONTAINER COMMAND [ARG...]

: 실행 중인 컨테이너에 명령어 실행

$ docker exec -it mysql /bin/bash

$ mysql -uroot

mysql> show databases;


$ docker exec -it mysql mysql -uroot : 바로 mysql command line에 connect


[ 컨테이너 정보 보기 ]

docker inspect CONTAINER_ID


[ 볼륨 옵션 ]

mysql같은 경우 컨테이너가 삭제되면 해당 데이터도 삭제되기 때문에

데이터 볼륨을 컨테이너 외부 저장소( local, S3 등 )에 link연결


# before

$ docker run -d -p 3306:3306 -e MYSQL_ALLOW_EMPTY_PASSWORD=true mysql


# after

$ docker run -d -p 3306:3306 -e MYSQL_ALLOW_EMPTY_PASSWORD=true \

-v /my/local/datadir:/var/lib/mysql mysql



[ Compose ]

설정파일을 이용한 도커컴포즈

https://docs.docker.com/compose/


$ curl -L "https://github.com/docker/compose/releases/download/1.9.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

$ chmod +x /usr/local/bin/docker-compose : 권한변경

$ docker-compose version : 버전 확인

$ docker-compose up : 작성된 docker-compose.yml 파일 실행



참조 : https://subicura.com/2017/01/19/docker-guide-for-beginners-2.html

2021년 10월 23일 토요일

Windows에 Docker를 설치하여 Jupyter Lab 실행

 

[ Windows에 Docker설치 ]


# Power Shell을 관리자권한으로 연 후, 명령어

$ dism.exe /online /enable-feature /featurename:Microsoft-Windows-Subsystem-Linux /all /norestart

$ dism.exe /online /enable-feature /featurename:VirtualMachinePlatform /all /norestart


#x64 머신용 최신 WSL2 Linux 커널 업데이트 패키지 설치 

https://wslstorestorage.blob.core.windows.net/wslblob/wsl_update_x64.msi


#버전 설정

$ wsl --set-default-version 2


#아래 URL에서 Windows용 Docker설치

https://www.docker.com/products/docker-desktop


#확인

$ docker version

$ docker ps


참조 : 

https://www.lainyzine.com/ko/article/a-complete-guide-to-how-to-install-docker-desktop-on-windows-10/


[ docker ps 명령어 에러시 ]

error during connect: In the default daemon configuration on Windows, the docker client must be run with elevated privileges to connect


#해당 윈도우에서 가상컴퓨터를 만들 수 있도록 설절

PowerShell에서 

$ Enable-WindowsOptionalFeature -Online -FeatureName Microsoft-Hyper-V -All


참조 : https://docs.microsoft.com/ko-kr/virtualization/hyper-v-on-windows/quick-start/enable-hyper-v



[ WSL2에 우분투 설치 ]

https://www.44bits.io/ko/post/wsl2-install-and-basic-usage



[ Jupyter Lab 실행 ]


#설치 및 실행

$ docker create -p 8888:8888 --user root -e GRANT_SUDO=yes --name awsanalytics jupyter/minimal-notebook

$ docker start awsanalytics 

$ docker logs awsanayltics 

... 토큰 값 확인 후 로그인시 사용..


# 웹페이지에서 확인

http://127.0.0.1:8888 주소로 접속해 아래와 같은 화면 확인






















# Windows용 Docker에서도 실행 및 CLI확인










# aws 사용위한 패키지 설치 [ jupyter terminal ]

$ pip install awscli

$ sudo apt update

$ sudo apt install groff -y

$ sudo apt install less -y




 

2021년 10월 21일 목요일

[ AWS S3 ] AWS CLI를 이용한 S3에 데이터 Copy

 

awscli 설치

$ sudo apt update

$ sudo apt install awscli

$ aws --version 


awscli 접속정보 설정

$ aws configure

내보안자격증명에서 AccessKeyID, AccessKeyPWD 입력

리전은 ap-northeast-2

Output type은 Json

확인

$ aws configure list


데이터 카피할 버킷생성

$ aws s3 mb s3://dg-s3-demo


해당 URL에서 샘플데이터 

https://registry.opendata.aws/

$ aws s3 ls s3://nyc-tlc/trip\ data/

$ aws s3 cp s3://nyc-tlc/trip\ data/ s3://dg-s3-demo/ --recursive



AWS CLI에서 S3명령어 참조 : 

https://docs.aws.amazon.com/ko_kr/cli/latest/userguide/cli-services-s3-commands.html



2021년 10월 11일 월요일

[ AWS ] AWS를 통한 Data Lake 디자인 (1)

 

코세라의 "Introduction to Designing Data Lakes on AWS" 강의는

AWS개발자들이 생각하는 좋은 Data Lake 아키텍처를 설명하고 있다.


 

위 그림은 배치레이어, 스피드레이어(실시간)를 둘 다 구성할 수 있으며

키바나를 통해 집계그래프로 시각화한다.



[ Data Lake Related  Store Services ]


S3

실시간이든 배치든 저장, 인덱싱, 프로세싱 등 S3가 중심이 된다.

Data Lake의 저장소로 고려해야 될 사항은 네트워크 성능, 접근시간, 용량, 비용, 보안 등 

여러 요소가 있다. 

AWS S3는 위와 같은 요소들을 가지면서 개발자가 환경을 구축시간을 줄여준다.


S3가 가지는 장점은 아래와 같다. 

  • Scalable( 확장성), no limit data ( 용량제한이 없음)
  • Durable, 데이터내구성을 99% 보장한다.
  • Any Type of Data, 데이터의 다형성


또한 Retrieval Timed에 따라 여러 S3의 제품을 선택하여 사용할 수 있다.

즉 자주 접근하는 데이터 일수록 더 비싼 S3를 사용해야 한다.



GLUE

S3과 함께 자주 사용되는 AWS Glue는 

Serverless한 데이터처리 및 카탈로깅 서비스이며, 중앙 메타데이터 저장소이다.

도서관으로 예를 들면, 도서관은 수많은 책(Data)들에게 장르를 부여(cataloging)하여

원하고자 하는 책을 빨리 찾을 수 있게한다.

더욱이 Glue는 다양한 Data소스에서 데이터를 받아올 수 있다.

참조

S3 :

Glue : 



[ Data Movement ]

참조 : https://aws.amazon.com/big-data/datalakes-and-analytics/https://aws.amazon.com/big-data/datalakes-and-analytics/what-is-a-data-lake/


API GateWay

AWS API Gateway는 보안 API를 쉽게생성하며, 유지 관리를 쉽게 지원하는 서비스이다.

자체 백엔드에서 구현해도 되지만 트래픽관리, 권한 및 제어, 모니터링들을 더 용이하게 해준다. 

API를 통해 이동되는 데이터들은 위의 이미지와 같이 EC2, 람다, 키네시스로 전달 할 수 있다.

API가 아니라 바로 파일로 저장되는 경우, Web Server-> Kinesis-> S3로 구성할 수 있다.


Kinesis

실시간 스트리밍 데이터를 쉽게 수집, 처리, 분석할 수 있도록 지원하는 서비스이다.

소규모든 대규모든 데이터가 도착하는대로 처리하고 분석을 즉시 할 수 있다.

데이터 유형과 목적에 따라 다양한 Kinesis 제품을 이용할 수 있다.

  • Kinesis Video Streams
  • Kinesis Data Streams
  • Kinesis Data Firehose

참조 : https://aws.amazon.com/kinesis/  



[ Data Processing ]

데이터를 이동시켰으면 데이터처리를 해야하는데 

Apache Hadoop을 이용한 분산처리시스템을 이용한다.

https://aws.amazon.com/emr/details/hadoop/what-is-hadoop/


EMR

EMR은 Hadoop을 EC2에 쉽게 추가할 수 있는 환경을 만들고,

분산처리에 필요한 어플리케이션( Hbase, Hive, Spark, Phoenix 등 )들을 설치 할 수 있다.

EMR을 사용하지 않고 EC2자체에 Ambari나 CDH(클라우데라 매니저)를 설치해 

Hadoop을 사용할 경우 비용적으로 이득이 있지만, 

개인적인 경험으로 운영시에 EMR이 인적자원이 훨씬 덜 소모되었다. 

https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-what-is-emr.html  


EMR을 Serverless한 아키텍처로 Glue(+Lambda)를 사용할 수도 있다.

Glue Jobs : https://docs.aws.amazon.com/glue/latest/dg/author-job.html

Lambda : https://docs.aws.amazon.com/lambda/latest/dg/welcome.html


Athena Redshift

Sql을 이용하여 작업 후 저장할 때는 athena나 redshift는 좋은 선택지가 된다

참조

Athena : https://docs.aws.amazon.com/athena/latest/ug/what-is.html

RedShift : https://aws.amazon.com/redshift/?whats-new-cards.sort-by=item.additionalFields.postDateTime&whats-new-cards.sort-order=desc  







sdf

2021년 9월 30일 목요일

[ Algorithm ] 기능개발 - Queue


[ 문제 ]

프로그래머스 팀에서는 기능 개선 작업을 수행 중입니다. 각 기능은 진도가 100%일 때 서비스에 반영할 수 있습니다.

또, 각 기능의 개발속도는 모두 다르기 때문에 뒤에 있는 기능이 앞에 있는 기능보다 먼저 개발될 수 있고, 이때 뒤에 있는 기능은 앞에 있는 기능이 배포될 때 함께 배포됩니다.

먼저 배포되어야 하는 순서대로 작업의 진도가 적힌 정수 배열 progresses와 각 작업의 개발 속도가 적힌 정수 배열 speeds가 주어질 때 각 배포마다 몇 개의 기능이 배포되는지를 return 하도록 solution 함수를 완성하세요.


제한 사항

작업의 개수(progresses, speeds배열의 길이)는 100개 이하입니다.

작업 진도는 100 미만의 자연수입니다.

작업 속도는 100 이하의 자연수입니다.

배포는 하루에 한 번만 할 수 있으며, 하루의 끝에 이루어진다고 가정합니다. 예를 들어 진도율이 95%인 작업의 개발 속도가 하루에 4%라면 배포는 2일 뒤에 이루어집니다.


입출력 예

progressesspeedsreturn
[93, 30, 55][1, 30, 5][2, 1]
[95, 90, 99, 99, 80, 99][1, 1, 1, 1, 1, 1][1, 3, 2]




[ 풀이 ]

큐를 이용하여 푸는 문제지만 그렇지 않은 코드도 첨부했다.

progresses의 각 배열의 완료일을 따로 저장한다음

Ex) [ 93, 30, 55 ] -> [ 7, 3, 9 ]

특정 배열 인덱스 값이 그 뒤의 배열 값과 비교, 큰 값이 나올 때까지 반복문을 돈다.

Ex) [ 7, 3, 9 ]  -> 7은 9보다 작으므로 3까지 반복문을 돈다.

7과 3, 2개의 값이 결과 값으로 2를 리턴배열에 input한다.





[ 제출1 ]

import java.util.*;

class Solution {
    public int[] solution(int[] progresses, int[] speeds) {
        int[] temp = new int[100];
        int day = -1;
        for(int i = 0; i < progresses.length; i++){
            while(progresses[i] + (speeds[i] * day) < 100){
                day++;
            }
            temp[day]++;
        }

        int cnt = 0;
        
        for(int n : temp){
            if(n != 0) cnt++;
        }
        
        int[] answer = new int[cnt];
        
        cnt = 0;
        for(int n : temp){
            if(n != 0) answer[cnt++] = n;
        }
        return answer;
    }
}


[ 제출2 - 람다식이용 ]

int[] dayOfend = new int[100];
int day = -1;

for(int i=0; i<progresses.length; i++) {
    while(progresses[i] + (day*speeds[i]) < 100) {
        day++;
    }
    dayOfend[day]++;
}

return Arrays.stream(dayOfend).filter(i -> i!=0).toArray();


제출1
테스트 1 통과 (0.02ms, 77.4MB)
테스트 2 통과 (0.03ms, 76.1MB)
테스트 3 통과 (0.02ms, 76.7MB)
테스트 4 통과 (0.03ms, 76.3MB)
테스트 5 통과 (0.02ms, 73.3MB)

제출2
테스트 1 통과 (3.39ms, 74.2MB)
테스트 2 통과 (3.16ms, 77.1MB)
테스트 3 통과 (2.88ms, 78.8MB)
테스트 4 통과 (3.20ms, 84.3MB)
테스트 5 통과 (1.92ms, 75.2MB)


람다식을 이용할 경우 코드가 간결하나 처리속도가 느림



[ 제출3 - 큐를 이용 ]

List<Integer> answerList = new ArrayList<>();

for (int i = 0; i < speeds.length; i++) {
    
    //종료시점 예측 = 소수점반올림((100 - 진행작업갯수) / 진행속도 )
    double remain = (100 - progresses[i]) / (double) speeds[i];
    int date = (int) Math.ceil(remain);

    if (!q.isEmpty() && q.peek() < date) {
        answerList.add(q.size());
        q.clear();
    }

    q.offer(date);
}

answerList.add(q.size());

int[] answer = new int[answerList.size()];

for (int i = 0; i < answer.length; i++) {
    answer[i] = answerList.get(i);
}

return answer;ys.stream(dayOfend).filter(i -> i!=0).toArray();