Kubernetes 위에서 Airflow 사용하기
회사에서 batch scheduler 로 Airflow 를 사용할 일이 있었다.
나는 Kuberentes 를 공부하고자 하는 의도로 겸사겸사 Airflow 를 Kubernetes 위에서 운용하려고 했다.
당시 때마침 Airflow 의 Kubernetes 지원이 시작되고 있어서 삽질을 시작해 보았다.
다행히 구성에 성공하여 약 3개월간 큰 이슈없이 사용하고 있으니 성공이라 해야 하나..
내가 사용한 버전은 1.10.2 버전이다.
구성
공식? 설치 가이드는 여기에 있다.
https://github.com/apache/airflow/tree/master/scripts/ci/kubernetes
kube/deploy.sh 를 실행시키면 되는 구조인데, 스크립트를 열어보면 별로 실행시키고 싶지가 않다..
보통 kubernetes 배포라면 helm 이라든지 툴이 많은데, 이걸 직접 template 만들고 shell script 로 쓰다니.. ㅠㅠ
Airflow Helm Chart 는 여기에 있다. 공식 chart 인지는 모르겠지만..
https://github.com/helm/charts/tree/master/stable/airflow
근데 이건 기존의 Celery 구성을 그대로 Kubernetes 에 올리는 설정이라, 새로 개발된 Kubernetes Executor 를 사용하지 않았다.
Kubernetes native 라면 필요할때 executor 가 올라가서 필요한 만큼만의 resource 를 사용해야 하는데, Celery 가 상주해있는 모양은 k8s native 답지 않아 보였다. 이에 따라 Kubernetes native Airflow 의 Helm chart 를 작성하기에 이르렀다.. (삽질의 시작)
여러 삽질 끝에 Kubernetes native Airflow 를 쓰려면 아래와 같은 구성이 필요함을 깨달았다.
- DB (MySql or Postgresql 등) - 각종 상태 저장
- Airflow Webserver - web ui 를 호스팅한다,
airflow webserver
명령어로 실행 - Airflow Scheduler - webserver 등 에서 지시를 받아 실제 job 을 스케쥴링 (실행) 한다,
airflow scheduler
명령어로 실행 - dag 를 저장할 storage
- 그리고 잘.. 설정된
airflow.cfg
설정파일과 docker image, kubernetes 설정 - 보너스로 log 를 저장할 s3 저장소
대략적인 구성이 아래와 같이 잡힌다.

- dag 는 Github 에 저장하고, 필요할때마다 git pull 해온다.
- airflow pod 안에 git-sync 컨테이너를 두어 주기적으로 git pull 을 하게 하고, 나머지 webserver, scheduler 에서는 dag 를 read only 로 사용한다.
- airflow 가
KubernetesExecutor
를 사용하게 되면, task 마다 별도의 worker pod 를 실행해서 task 를 실행하게 된다.- task 가
KubernetesPodOperator
를 사용해서 구성되어있다면,KubernetesExecutor
는 또 pod 를 생성해서 거기서 작업을 실행한다. - task 가 그외의 Operator 를 사용한다면,
KubernetesExecutor
pod 안에서 해당 작업 (예를 들어SimpleHttpOperator
등) 이 실행된다.
- task 가
- airflow 를 포함하는 docker image 는 webserver, scheduler, worker 가 같은 이미지를 사용하게 된다.
- 물론 설정을 하다보면 다 다른 이미지를 쓰게 설정이 가능한 구조이지만.. 다르게 할 필요가 있을까?
- log 의 경우,
KuberentesExecutor
가 각각 생겼다가 사라지기 때문에 어딘가 별도로 저장하지 않으면 이력 추적이 불가하다. 다행히 Airflow 는 log backend 로 s3 를 지원한다.- 근데 회사에서는 s3 를 쓰기 어려워서 minio 를 따로 띄웠다. minio 의 정보는 여기서 얻어보자. (https://min.io/)
- 각각의
KubernetesExecutor
가 직접 s3 에 log 를 저장하는건지, 아니면 scheduler 등을 통해서 저장하는지는 잘 모르겠다.
그러면 실제 설정 파일을 어떻게 했는지 하나씩 소개해본다.
docker image
현재 (2019-07) 기준 airflow 는 2.x 가 개발중이고, 1.10.x 에는 막바지에 Kubernetes 관련 기능이 구현 되었다.
그래서 현재 stable 한 1.10.x 브랜치에서는 docker image 부분이 좀 빈약한데, 그냥 master 브랜치에서 docker image 부분을 퍼와서 써봤다.
https://github.com/humbledude/airflow/blob/v1-10-stable/Dockerfile
현재의 master branch 의 Dockerfile 과는 좀 거리가 있지만 그때는 심플하니 좋았다
그리고 entrypoint.sh 도 어디선가 적당히 가져다 썼다.
https://github.com/humbledude/airflow/blob/v1-10-stable/scripts/docker/entrypoint.sh
docker image 는 그냥 airflow 전체 dependency 를 설치한 그런 image 이다.
airflow.cfg
airflow.cfg 는 회사에서 필요에 의해 개발하다 보니, 전용 설정이 많아서 전체를 그대로 소개하긴 어렵고, kubernetes native 로 쓰기위해 설정한 부분을 소개한다.
이 airflow.cfg 는 helm chart 에서 가져다 쓰기 때문에, templating 문법이 좀 있는데, 그러려니 하자
[core] |
k8s 구성 (helm)
k8s 구성 또한 전용 설정이 많아 그대로 소개하긴 어렵다 ㅜㅜ
혹시나 요청이 있다면 정리해볼지도..
기본 틀은 여기를 참고했다.
https://github.com/apache/airflow/tree/v1-10-stable/scripts/ci/kubernetes/kube
shell 로 직접 구현한 템플레이팅을 거의 그대로 helm 으로 가져다 썼다.
ClusterRoleBinding
- Airflow pod 에서 각종 다른 pod 를 생성하기 위해 권한이 필요하다.
cluster-admin
이라는 ClusterRole 에 바인딩 하는데, 이cluster-admin
은 문서에 의하면 k8s의 super user 란다. 혹시 너무 큰 권한이 부담스럽다면 다른 pod 를 생성할 수 있는 별도의 role 을 구성해야겠다.
Deployment
- 원본은
initContainer
에서tmp/airflow-test-env-init.sh
를 하는 부분이 있는데, 이거는 뭔지 몰라서 지웠다. 아마airflow initdb
커맨드를 실행할꺼 같은데, 나는 webserver 컨테이너에서airflow initdb
커맨드를 실행한다. - 나는
initContainer
에서 git 에서 dag 를 한번 땡겨줬다.initContainers:
- name: init-git
image: {{ .Values.image.git.repo }}:{{ .Values.image.git.tag }}
imagePullPolicy: IfNotPresent
volumeMounts:
- name: airflow-dags
mountPath: /tmp/git
args:
- --repo={{ template "github.url" . }}
- -one-time - 그 외에는 동일하게 webserver 와 scheduler 의 container 를 띄운다
ConfigMap
- 원본은 ConfigMap 에 그냥 airflow.cfg 파일을 통째로 복붙했는데, 나는 별도 파일에서 가져다 쓰도록 했다.
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-configmap
data:
airflow.cfg: |
{{ tpl (.Files.Get "airflow.cfg") . | indent 4 }} - 그 외에 ui 접근을 위한 ingress 와 db 설정을 위한 secret 이 포함되었다
띄우는게 일이다. 띄우기만 한다면 다들 쓰는건데 동작이야 하지 않겠는가 … 쩜쩜
사용하다보니..
airflow dag 개발이라든지 하는 부분은 다른 블로그에서도 잘 소개하고 있으니, 그냥 사용중에 배웠던 몇몇 이슈를 적어본다.
dag 저장소 구성
처음 airflow 를 접했을때 난감했던 부분이, job (dag) 을 정의한 파일을 업로드 하는 기능이 없다는 거 였다.
dag 을 저장할 디렉토리 위치를 airflow.cfg 에서 지정하고, airflow 가 거기서 알아서 dag 를 찾아 쓰게 되어있다.
내가 맨 처음에 dag 저장소로 사용하려고 했던게 PersistentVolume
이 었는데, 하나의 nfs volume 을 띄워서 거기에 dag 를 sync 하고, webserver, scheduler, worker 에 mount 하려는 의도였다.
하지만 사정상 안정적인 ReadWriteMany
volume 을 사용할 수 없었고, mount / umount 중 오류가 자주 발생했었다.
그래서 그냥 pod 하나에 묶인 webserver 와 scheduler 는 하나의 volume 을 공유하게 했고, worker 는 그냥 그때그때 생길때마다 git pull 을 하는 구조로 구성했다.
아마 안정적으로 ReadWriteMany
volume 을 사용할 수 있는 환경이라면 dag volume 을 하나로 쓸수 있지 않을까 싶다.
parallelism vs dag_concurrncy vs max_active_runs
k8s cluster 상에서 동시에 돌수 있는 pod 의 숫자를 제어해야 하는 경우가 있다.
이때 비슷한 용어가 세개 있는데 뭘까
https://airflow.apache.org/faq.html
parallelism
은 동시에 돌 수 있는 worker 의 갯수이다. airflow.cfg 에서 설정max_active_runs
는 동시에 돌수 있는 DagRuns 의 갯수이다. dag 에서 설정concurrency
는 동시에 돌 수 있는 task instance 의 갯수이다. dag 에서 설정
여기서, KubernetesExecutor
를 사용한다면 task instance 마다 worker 가 생겨서 돌기 때문에, worker 가 사용하는 리소스도 고려 해야 한다.
worker 가 KubernetesPodOperator
를 구동한다면 하나의 pod 가 더 생성되고, 해당 pod 의 리소스 사용은 dag 에서 제한할 수 있는데, worker pod 자체의 리소스 관리 옵션은 따로 없는 듯 하다.
따라서 worker 에서 빡센? 작업을 하지 않도록 해야 하고, 특별히 빡센 작업을 하게 된다면 KubernetesPodOperator
로 따로 지정하고 리소스 관리를 하는게 좋겠다.
기타
- 2020-10-19 업데이트 : 그동안의 깨달음을 바탕으로 전체 구조 그림, 글 좀 수정..
- 2020-10-23 업데이트 : 차트 세부사항을 업데이트 해서 글 하나 더 썼어요 (여기)