Kubernetes 위에서 Airflow 사용하기

회사에서 batch scheduler 로 Airflow 를 사용할 일이 있었다.
나는 Kuberentes 를 공부하고자 하는 의도로 겸사겸사 Airflow 를 Kubernetes 위에서 운용하려고 했다.
당시 때마침 Airflow 의 Kubernetes 지원이 시작되고 있어서 삽질을 시작해 보았다.
다행히 구성에 성공하여 약 3개월간 큰 이슈없이 사용하고 있으니 성공이라 해야 하나..

내가 사용한 버전은 1.10.2 버전이다.

구성

공식? 설치 가이드는 여기에 있다.

https://github.com/apache/airflow/tree/master/scripts/ci/kubernetes

kube/deploy.sh 를 실행시키면 되는 구조인데, 스크립트를 열어보면 별로 실행시키고 싶지가 않다..
보통 kubernetes 배포라면 helm 이라든지 툴이 많은데, 이걸 직접 template 만들고 shell script 로 쓰다니.. ㅠㅠ

Airflow Helm Chart 는 여기에 있다. 공식 chart 인지는 모르겠지만..

https://github.com/helm/charts/tree/master/stable/airflow

근데 이건 기존의 Celery 구성을 그대로 Kubernetes 에 올리는 설정이라, 새로 개발된 Kubernetes Executor 를 사용하지 않았다.
Kubernetes native 라면 필요할때 executor 가 올라가서 필요한 만큼만의 resource 를 사용해야 하는데, Celery 가 상주해있는 모양은 k8s native 답지 않아 보였다. 이에 따라 Kubernetes native Airflow 의 Helm chart 를 작성하기에 이르렀다.. (삽질의 시작)

여러 삽질 끝에 Kubernetes native Airflow 를 쓰려면 아래와 같은 구성이 필요함을 깨달았다.

  • DB (MySql or Postgresql 등) - 각종 상태 저장
  • Airflow Webserver - web ui 를 호스팅한다, airflow webserver 명령어로 실행
  • Airflow Scheduler - webserver 등 에서 지시를 받아 실제 job 을 스케쥴링 (실행) 한다, airflow scheduler 명령어로 실행
  • dag 를 저장할 storage
  • 그리고 잘.. 설정된 airflow.cfg 설정파일과 docker image, kubernetes 설정
  • 보너스로 log 를 저장할 s3 저장소

대략적인 구성이 아래와 같이 잡힌다.

  • dag 는 Github 에 저장하고, 필요할때마다 git pull 해온다.
    • airflow pod 안에 git-sync 컨테이너를 두어 주기적으로 git pull 을 하게 하고, 나머지 webserver, scheduler 에서는 dag 를 read only 로 사용한다.
  • airflow 가 KubernetesExecutor 를 사용하게 되면, task 마다 별도의 worker pod 를 실행해서 task 를 실행하게 된다.
    • task 가 KubernetesPodOperator 를 사용해서 구성되어있다면, KubernetesExecutor 는 또 pod 를 생성해서 거기서 작업을 실행한다.
    • task 가 그외의 Operator 를 사용한다면, KubernetesExecutor pod 안에서 해당 작업 (예를 들어 SimpleHttpOperator 등) 이 실행된다.
  • airflow 를 포함하는 docker image 는 webserver, scheduler, worker 가 같은 이미지를 사용하게 된다.
    • 물론 설정을 하다보면 다 다른 이미지를 쓰게 설정이 가능한 구조이지만.. 다르게 할 필요가 있을까?
  • log 의 경우, KuberentesExecutor 가 각각 생겼다가 사라지기 때문에 어딘가 별도로 저장하지 않으면 이력 추적이 불가하다. 다행히 Airflow 는 log backend 로 s3 를 지원한다.
    • 근데 회사에서는 s3 를 쓰기 어려워서 minio 를 따로 띄웠다. minio 의 정보는 여기서 얻어보자. (https://min.io/)
    • 각각의 KubernetesExecutor 가 직접 s3 에 log 를 저장하는건지, 아니면 scheduler 등을 통해서 저장하는지는 잘 모르겠다.

그러면 실제 설정 파일을 어떻게 했는지 하나씩 소개해본다.

docker image

현재 (2019-07) 기준 airflow 는 2.x 가 개발중이고, 1.10.x 에는 막바지에 Kubernetes 관련 기능이 구현 되었다.
그래서 현재 stable 한 1.10.x 브랜치에서는 docker image 부분이 좀 빈약한데, 그냥 master 브랜치에서 docker image 부분을 퍼와서 써봤다.

https://github.com/humbledude/airflow/blob/v1-10-stable/Dockerfile

현재의 master branch 의 Dockerfile 과는 좀 거리가 있지만 그때는 심플하니 좋았다
그리고 entrypoint.sh 도 어디선가 적당히 가져다 썼다.

https://github.com/humbledude/airflow/blob/v1-10-stable/scripts/docker/entrypoint.sh

docker image 는 그냥 airflow 전체 dependency 를 설치한 그런 image 이다.

airflow.cfg

airflow.cfg 는 회사에서 필요에 의해 개발하다 보니, 전용 설정이 많아서 전체를 그대로 소개하긴 어렵고, kubernetes native 로 쓰기위해 설정한 부분을 소개한다.
이 airflow.cfg 는 helm chart 에서 가져다 쓰기 때문에, templating 문법이 좀 있는데, 그러려니 하자

[core]

# executor 를 KubernetesExecutor 로 해야 task 마다 worker pod 를 생성한다
executor = KubernetesExecutor

[kubernetes]

# worker 가 사용할 image 를 지정하는 부분이다. 각자 만든 이미지를 쓰자. 내꺼는 사설 repo 에 있음.
worker_container_repository = {{ .Values.image.worker.repo }}
worker_container_tag = {{ .Values.image.worker.tag }}

# 처음에 debugging 할때 여기를 False 로 해두면, worker pod 가 종료 된 후에 삭제 되지 않아서 log 등을 참조할 수 있다
delete_worker_pods = True

# airflow.cfg 를 configmap 에 등록하고 그걸 가져다 쓰게 한다. 그러면 각종 airflow pod 에서 가져다 쓰기 쉽다.
airflow_configmap = airflow-configmap

# worker (Kubernetes Executor) 가 뜰때, dag 를 어디서 가져오게 할것인지 골라야 하는데,
# 나는 git 에서 직접 땡겨오라고 했다. nfs 등을 쓸 수 있지만 잘 안되었어서..
# 그 worker 가 쓸 git 정보이다.
git_repo = {{ .Values.github.repo }}
git_branch = {{ .Values.github.branch }}
git_user = {{ .Values.github.username }}
git_password = {{ .Values.github.token }}
git_sync_dest = {{ include "github.repo" . }}
git_dags_folder_mount_point = {{ include "config.dir.dags-repo" . }}
git_subpath =

# worker 가 git 을 땡겨올때 git command 를 따로 image 에 넣지 않고, git-sync 라는 image 를 써서 땡겨온다
git_sync_container_repository = k8s.gcr.io/git-sync
git_sync_container_tag = v3.1.1
git_sync_init_container_name = git-sync-clone

k8s 구성 (helm)

k8s 구성 또한 전용 설정이 많아 그대로 소개하긴 어렵다 ㅜㅜ
혹시나 요청이 있다면 정리해볼지도..
기본 틀은 여기를 참고했다.

https://github.com/apache/airflow/tree/v1-10-stable/scripts/ci/kubernetes/kube

shell 로 직접 구현한 템플레이팅을 거의 그대로 helm 으로 가져다 썼다.

ClusterRoleBinding

  • Airflow pod 에서 각종 다른 pod 를 생성하기 위해 권한이 필요하다.
    • cluster-admin 이라는 ClusterRole 에 바인딩 하는데, 이 cluster-admin 은 문서에 의하면 k8s의 super user 란다. 혹시 너무 큰 권한이 부담스럽다면 다른 pod 를 생성할 수 있는 별도의 role 을 구성해야겠다.

Deployment

  • 원본initContainer 에서 tmp/airflow-test-env-init.sh 를 하는 부분이 있는데, 이거는 뭔지 몰라서 지웠다. 아마 airflow initdb 커맨드를 실행할꺼 같은데, 나는 webserver 컨테이너에서 airflow initdb 커맨드를 실행한다.
  • 나는 initContainer 에서 git 에서 dag 를 한번 땡겨줬다.
    initContainers:
    - name: init-git
    image: {{ .Values.image.git.repo }}:{{ .Values.image.git.tag }}
    imagePullPolicy: IfNotPresent
    volumeMounts:
    - name: airflow-dags
    mountPath: /tmp/git
    args:
    - --repo={{ template "github.url" . }}
    - -one-time
  • 그 외에는 동일하게 webserver 와 scheduler 의 container 를 띄운다

ConfigMap

  • 원본은 ConfigMap 에 그냥 airflow.cfg 파일을 통째로 복붙했는데, 나는 별도 파일에서 가져다 쓰도록 했다.
    apiVersion: v1
    kind: ConfigMap
    metadata:
    name: airflow-configmap
    data:
    airflow.cfg: |
    {{ tpl (.Files.Get "airflow.cfg") . | indent 4 }}
  • 그 외에 ui 접근을 위한 ingress 와 db 설정을 위한 secret 이 포함되었다

띄우는게 일이다. 띄우기만 한다면 다들 쓰는건데 동작이야 하지 않겠는가 … 쩜쩜

사용하다보니..

airflow dag 개발이라든지 하는 부분은 다른 블로그에서도 잘 소개하고 있으니, 그냥 사용중에 배웠던 몇몇 이슈를 적어본다.

dag 저장소 구성

처음 airflow 를 접했을때 난감했던 부분이, job (dag) 을 정의한 파일을 업로드 하는 기능이 없다는 거 였다.
dag 을 저장할 디렉토리 위치를 airflow.cfg 에서 지정하고, airflow 가 거기서 알아서 dag 를 찾아 쓰게 되어있다.
내가 맨 처음에 dag 저장소로 사용하려고 했던게 PersistentVolume 이 었는데, 하나의 nfs volume 을 띄워서 거기에 dag 를 sync 하고, webserver, scheduler, worker 에 mount 하려는 의도였다.
하지만 사정상 안정적인 ReadWriteMany volume 을 사용할 수 없었고, mount / umount 중 오류가 자주 발생했었다.

그래서 그냥 pod 하나에 묶인 webserver 와 scheduler 는 하나의 volume 을 공유하게 했고, worker 는 그냥 그때그때 생길때마다 git pull 을 하는 구조로 구성했다.
아마 안정적으로 ReadWriteMany volume 을 사용할 수 있는 환경이라면 dag volume 을 하나로 쓸수 있지 않을까 싶다.

parallelism vs dag_concurrncy vs max_active_runs

k8s cluster 상에서 동시에 돌수 있는 pod 의 숫자를 제어해야 하는 경우가 있다.
이때 비슷한 용어가 세개 있는데 뭘까

https://airflow.apache.org/faq.html

  • parallelism 은 동시에 돌 수 있는 worker 의 갯수이다. airflow.cfg 에서 설정
  • max_active_runs 는 동시에 돌수 있는 DagRuns 의 갯수이다. dag 에서 설정
  • concurrency 는 동시에 돌 수 있는 task instance 의 갯수이다. dag 에서 설정

여기서, KubernetesExecutor 를 사용한다면 task instance 마다 worker 가 생겨서 돌기 때문에, worker 가 사용하는 리소스도 고려 해야 한다.
worker 가 KubernetesPodOperator 를 구동한다면 하나의 pod 가 더 생성되고, 해당 pod 의 리소스 사용은 dag 에서 제한할 수 있는데, worker pod 자체의 리소스 관리 옵션은 따로 없는 듯 하다.
따라서 worker 에서 빡센? 작업을 하지 않도록 해야 하고, 특별히 빡센 작업을 하게 된다면 KubernetesPodOperator 로 따로 지정하고 리소스 관리를 하는게 좋겠다.

기타

  • 2020-10-19 업데이트 : 그동안의 깨달음을 바탕으로 전체 구조 그림, 글 좀 수정..
  • 2020-10-23 업데이트 : 차트 세부사항을 업데이트 해서 글 하나 더 썼어요 (여기)