Airflow 기초 #1 : 용어정리

오늘은 airflow 입문 한번 해봅시다.
공식 문서를 통해 몇가지 입문 방법이 제시되고 있는데요,

  1. 설치에 관한것들 : Quick Start + Installation
  2. 기본기 : Tutorials
  3. 몇가지 상황에 맞는 가이드 : How-to Guides

Airflow 입문에 가장 큰 걸림돌은 바로 설치인듯 합니다..
설치는 .. 다양한 상황이 있는거 같은데 사실 저는 Kubernetes 에서밖에 안써봐서.. (참조)
담에 다른 상황으로도 한번 설치 해보기로 하죠

오늘은 용어정리 한번 해보겠습니다
공식 문서의 Concepts 항목 아래에 있는 항목들인데요.
새로운 기능들이 계속 나오고 있지만, 기본기 중심으로 한번 훑어보겠습니다

(공식 문서, Concepts)
https://airflow.apache.org/docs/apache-airflow/stable/concepts/index.html

DAG

  • 원래 교과서에 나오는거는 Directed Acyclic Graph 이거인데
  • airflow 에서는 하나의 작업 flow 를 DAG 라고 불러요.
  • 개발하다보면, 어느 작업 범위를 하나의 DAG 로 잡아야 하나.. 하는 고민이 시작됩니다 ㅎㅎ 다른 글에서 제 의견을 드려봅죠

참고

Task

  • 가장 작은 작업단위로, (성공 / 실패 / 재시도)가 가능한 최소 단위입니다
  • TaskOperator 또는 Sensor 로 정의 합니다 (TaskFlow 라는것도 있는데, 오늘은 패스)
  • 여러 Task 간의 순서, 조건등을 정의해서 DAG 를 만들어요

참고

Operator

  • 미리 정의된 다양한 Operator 를 가지고 Task 를 만들수 있어요. 사실상 이게 일하는거죠
  • python 코드를 실행시키는 PythonOperator 라든가, k8s 파드를 하나 띄울수 있는 KubernetesPodOperator 라든가, db 조작, aws 조작 등등 별거별거 다있음
  • 그리고 우리도 BaseOperator 를 상속해서 customized operator 를 정의해서 쓸수 있어요

참고

Sensor

  • 특정 조건을 검사해서 해당 조건을 pass 할때까지 끝나지 않는 Task 를 만들수 있어요
  • timeout 을 지정해서 시간 내 pass 가 안되면 fail 시킬수도 있고요
  • 예를 들어, sql 로 테이블에 count 쿼리를 하고, 그 결과가 1 이상일 때 다음 Task 로 넘어간다는 그런 작업이 가능하죠 (1 이상이 아니면, 잠시 기다렸다가 다시 쿼리를 날려요. timeout 날때까지!)
  • 이후에 따라오는 Operator 의 선제 조건을 체크 하는데 쓰입니다

참고

DAG run

  • DAG 가 작업정의서라면, DAG run 은 실제 정의대로 동작하는 instance 라고 보시면됩니다
  • 동시에 최대 몇개의 DAG run 을 돌게 허용할 것인지 셋팅하는 파라미터가 있어요 (max_active_run)

참고

data_interval_start, data_interval_end

  • Airflow 진입 1차 장벽..
  • 이건 예제를 봐야 이해가 됨
1. 매일 오전 0시에 시작해서 로그 처리를 하는 DAG 를 정의 했다고 합시다.
2. 이 DAG 가 20221120시에 시작되었다면,
- 해당 작업 은 2022년 11월 1일 0시 부터 2022년 11월 2일 0시 까지의 로그를 처리 해야겠죠?
3. 그렇다면 아래와 같이 동작 합니다.
- 실제 코드가 도는 시간 = 20221120시 조금 지나서.
- data_interval_start = 20221110
- data_interval_end = 20221120
  • 예전에는 execution_date 라는 변수가 있었어요. 지금의 data_interval_start 랑 같은 값인데요. 근데 이 변수의 이름이 execution date 라서, 실제 코드가 도는 시간이라고 생각되기 쉬운데요.. 위 예제를 보면 그게 아니죠.. ㅎㅎㅎ 그래서 모두가 고통에 빠지는 변수명이었습니다.
  • execution_date 용어는 2.2 에서 DEPRECATED 되었습니다. 만나서 더러웠고 다시는 보지말자!!
  • logical_date 라는 변수도 가끔 출몰하는데요, data_interval_startdata_interval_end 로 정리하는게 가장 깔끔하네요
  • (DEPRECATED)execution_date = logical_date = data_interval_start

참고

template, macro

  • DAG 가 매일 돈다면, 매일 data_interval_start 값이 달라지겠죠. 근데 이 값을 어떤 bash 커맨드의 인자로 넘겨야 한다면?
  • BashOperator 정의 할때, 요렇게 쓸수 있어요 : echo {{ data_interval_start }} (그러면 echo 커맨드 뒤에 날짜가 합쳐져서 실제 task 가 돌게 되는거)
  • 미리 정의된 macro 도 있고, 우리도 custom macro 를 만들어 추가할 수도 있어요.
  • 이거는 jinja template 엔진을 사용합니다. python 에서 주로 사용하는 템플릿 엔진이죠!

참고

Variable

  • DAG 를 정의할때, 외부에 있는 값을 가져올수 있는거에요
  • 웹서버 페이지에서, Admin > Variable 에 정의해두고, DAG 코드에서 가져다 씁니다
  • 저는 docker image 버전이 빈번하게 바뀔때, variable 에 image version 을 적어두고, 코드에서는 이 variable 을 참조해서 kubernetes pod 를 띄우는 식으로 사용하곤 해요 (그렇게 안하면 자꾸 코드를 바꿔서 커밋 해야 해요)

참고

Connection

  • DB 나 외부 서버들에 대한 접속 정보를 저장해 둔담에, DAG 코드 에서 동적으로 가져다 쓸수 있어요
  • 비밀번호 같은거가 나름 마스킹 되어 저장되니까, 코드에 비밀번호 안써도 되니까 좋겠죠?
  • 이거도 웹서버 페이지에서, Admin > Connection 에 들어가서 정의해서 쓰는거에요

참고

제가 사실 뭐 airflow 오래 썼다고도 할수 있지만.. 이렇게 제가 주로 쓰던거 중심으로 훑어보니, 다양하게 쓴거 같지는 않네요
이정도만 알아도 일단 시작하는데는 크게 문제가 없을꺼 같은데요.
airflow 는 첨에 DAG 코드 짜서 잘 도는지 확인하기 까지 번거롭지만, 한번 돌기 시작하면 또 잘돌거든요
비즈니스 로직을 잘 짜는게 훨 중요하긴 한거죠.
물론 돌기만 하면 되는거랑 잘 사용하는거랑은 다르니까..

다음 글에서는 좀 더 제가 겪은 이슈들 중심으로 디테일한 실용적인 부분을 써보겠습니다
그럼 20000