Apache Airflow ile Data Pipeline Oluşturma

Emre Evcimen
7 min readMar 10, 2023

Merhabalar bu yazımda sizlere hazırlamış olduğum bir veri akışını Apache Airflow kullanarak nasıl organize ettiğimi anlatacağım. Öncelikle Apache Airflow nedir ve neden kullanılır ona bakalım.

Apache Airflow: Apache Airflow, veri işlemeyi ve iş akışlarını yönetmeyi kolaylaştıran açık kaynaklı bir araçtır. Burada altını çizmek istediğim bir konu olacak evet açık kaynaklı ancak Astronomer firması bu teknolojiyi sahiplenerek büyümesine katkı sağlıyor ve Enterprise şekilde de bu teknolojiden yararlanmanızı sağlıyor. Hatta Apache Airflow için sertifika eğitimleri ve sınavlarıda mevcut. (https://academy.astronomer.io/ sitesini ziyaret edebilirsiniz.) Airflow, veri işleme ve analitik iş akışlarının yönetimi için bir platfrom sağlar. İş akışları, Airflow’da “DAG”(Directed Acyclic Graph) olarak adlandırılan bir dizi adımdan oluşur. Örneğin benim bu yazıda ele alacağım projenin mimarisine bakarsak bunun tümünü bir DAG olarak düşünebilirsiniz.(İlgili proje mimarisini aşağıda paylaşıyor olacağım.) Airflow bu akışları tasarlamanız için Python programlama dilini kullanmaktadır. Airflow üzerinde birden çok veri kaynağına erişebilen geniş bir ekosisteme sahiptir ve kullanıcıların özelleştirilebilir iş akışları oluşturmasını sağlar.

Apache Airflow kullanımında önemli komponentler ve kavramlar vardır. Gelin isterseniz onlara da kısaca değinelim.

1 → Scheduler: Airflow’un iş yürütme motorudur ve belirli bir zamanlama planı veya kullanıcı tarafından tetiklenen bir iş akışını yürütmekten sorumludur. Airflow’un en önemli parçalarından biridir.

2 → Operators: İş akışının adımlarını yürütmekten sorumludur. Her operatör belirli bir görevi yerine getirir ve belirli bir işlemi gerçekleştirmek için yazılır.(Örn: PythonOperator, BashOperator,PostgresOperator…)

3 → DAGs: İş akışının temel bileşenidir ve adım adım iş akışının işlemlerini tanımlar. DAG’lar farklı operatörleri birbirine bağlamak için kullanılan iş akışı yapısını sağlar.

4 → Variables: Airflow içerisinde kullanılan değişkenlerdir. Bu değişkenler, iş akışında kullanılan verileri saklamak için kullanılır ve farklı işlemlerde kullanılabilir.

5 → Connections: Airflow, farklı kaynaklar ve veritabanları ile etkileşim kurabilmek için connections kullanır. Connections, iş akışında kullanılan verileri okumak veya yazmak için kullanılır.

6 → Hooks: Bağlantılarla etkileşim kurmak için kullanılan arayüzlerdir. Hooks, farklı veri kaynaklarına bağlanmak için kullanılır.

7 → Executors: Airflow, iş akışını yürütmek için kullanılan motorlardır. Executors, iş akışının nasıl yürütüleceğini belirleyen yapıdır. Airflow Sequantial,Local,Celery,Kubernetes vb. birçok farklı executor türü destekler. Siz lokalinizde çalışmak isterseniz Local Executor kurulumu yapmanızı tavsiye ederim. Ancak gerçek hayat senaryolarında özellikle Airflow’un Veri Mühendisi, Veri Bilimci ve DEVOPS ekipleri tarafından kullanımının yoğun olduğu ortamlarda Celery Executor kullanımı daha uygun olacaktır.

Evet gerekli ön bilgilendirmeyi tamamladık isterseniz proje akışına da bir bakalım.

Görmüş olduğunuz akışın tümünü bir DAG olarak düşünebilirsiniz. Kutucuk içerisinde gördüğünüz kısımlarda birer Task’tır. Bu task’lar Operatör’lerin kullanımıyla oluşturulur ve dependency dediğimizde yapı sayesinde de ilgili task’ların belirli akışta çalışmasını sağlarız.

Şimdi DAG’ın bu akışla ne yapması gerektiğini kısaca bir anlatalım.

İlgili DAG ilk önce benim belirttiğim formatta ne kadar .parquet uzantılı dosya varsa hepsini silecek. Daha sonra veriyi alacağım API servisine giderek servisinin ayakta olup olmadığına bakacak. Bunu birden çok parametre ile yapacağı için bu Task’ı TaskGroup oluşturarak DAG’a dahil ediyoruz. Eğer API servisi de sağlıklı bir şekilde response dönüyorsa artık benim bir diğer aşamam devreye girecek ve verilen parametre bilgilerine göre veriyi alıp benim belirttiğim formatta hedef path’e iletecek. Bu işlemde birden çok parametre için yapılacak olduğundan bunuda TaskGroup yaratarak DAG’a dahil edeceğiz.Bu işlem sonucunda bir sürü parquet dosyası ortaya çıkacak. Bu dosyaları Spark ile okuyup birleşitirip hedef database de istediğimiz tabloya yazacağız ama bundan önce hedef tablomuz eğer mevcutta varsa bunu sileceğiz. Özetle DAG’ımız bu işlemleri yapmaktadır.

Gelin şimdi bu akışın Airflow Web UI ve Kod kısımlarına da bir bakalım.

from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from api_get_data import get_holiday_country
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
from airflow.providers.postgres.operators.postgres import PostgresOperator

default_args = {
"owner": "emre",
"start_date" : datetime(2023,3,8),
"schedule_interval": "@daily",
"retries": 3,
"retry_delay": timedelta(minutes=1),
"tags": ["data-engineer"]
}

with DAG(
dag_id = "medium-data-pipeline",
catchup=False,
default_args=default_args) as dag:

delete_parquet_files = BashOperator(
task_id = "delete_parquet_files",
bash_command= "rm /home/emre/Desktop/DE-EmreEvcimen2023/DE-ETL-Project/data/*.parquet"
)

with TaskGroup("control_api_activate") as control_api_activate:
with TaskGroup("extract_data_holiday") as extract_data_holiday:

task_get_holiday_es=PythonOperator(
task_id="task_get_holiday_country_es",
python_callable=get_holiday_country,
op_args=[Variable.get("calendarific_api_key_secret"),"ES",Variable.get("calendarific_year")]
)

task_get_holiday_us=PythonOperator(
task_id="task_get_holiday_country_us",
python_callable=get_holiday_country,
op_args=[Variable.get("calendarific_api_key_secret"),"US",Variable.get("calendarific_year")]
)

task_get_holiday_tr=PythonOperator(
task_id="task_get_holiday_country_tr",
python_callable=get_holiday_country,
op_args=[Variable.get("calendarific_api_key_secret"),"TR",Variable.get("calendarific_year")]
)

task_get_holiday_de=PythonOperator(
task_id="task_get_holiday_country_de",
python_callable=get_holiday_country,
op_args=[Variable.get("calendarific_api_key_secret"),"DE",Variable.get("calendarific_year")]
)


task_is_api_active_es = HttpSensor(
task_id="is_api_activate_es",
http_conn_id= "api_holidays",
endpoint="holidays/?api_key={}&country={}&year={}" \
.format(Variable.get("calendarific_api_key_secret"),"ES",Variable.get("calendarific_year"))
) >> extract_data_holiday

task_is_api_active_us = HttpSensor(
task_id="is_api_activate_us",
http_conn_id= "api_holidays",
endpoint="holidays/?api_key={}&country={}&year={}" \
.format(Variable.get("calendarific_api_key_secret"),"US",Variable.get("calendarific_year"))
) >> extract_data_holiday

task_is_api_active_tr = HttpSensor(
task_id="is_api_activate_tr",
http_conn_id= "api_holidays",
endpoint="holidays/?api_key={}&country={}&year={}" \
.format(Variable.get("calendarific_api_key_secret"),"TR",Variable.get("calendarific_year"))
) >> extract_data_holiday

task_is_api_active_de = HttpSensor(
task_id="is_api_activate_de",
http_conn_id= "api_holidays",
endpoint="holidays/?api_key={}&country={}&year={}" \
.format(Variable.get("calendarific_api_key_secret"),"DE",Variable.get("calendarific_year"))
) >> extract_data_holiday

drop_table_if_exists = PostgresOperator(
task_id = "drop_table_if_exists",
postgres_conn_id = "dataops6_conn",
sql = """
DROP TABLE IF EXISTS public.country_holiday;
"""
)


data_processing = SparkSubmitOperator(
task_id = "data_processing",
conn_id="my_spark_conn",
application="/home/emre/Desktop/DE-EmreEvcimen2023/DE-ETL-Project/dags/data_processing.py",
verbose= False
)


delete_parquet_files >> control_api_activate >> drop_table_if_exists >> data_processing

Kod kısmında ekstradan eklemek istediğim kısım olacak çünkü extract_data_holiday TaskGroup’unda PythonOperator’ün ilgili api’dan verileri almasını sağlayan fonksiyon get_holiday_country başka bir python dosyasının içerisinde mevcut. İlgili fonksiyonun bulunduğu python dosyasını aşağıda sizlerle paylaşıyorum.

import requests
import pandas as pd
import datetime as dt

def get_holiday_country(API_KEY,COUNTRY,YEAR):

specific_country = ["US","DE","ES"]
destination_path= "/home/emre/Desktop/DE-EmreEvcimen2023/DE-ETL-Project/data"

response =requests.get(f"""
https://calendarific.com/api/v2/holidays?api_key={API_KEY}&country={COUNTRY}&year={YEAR}""")
response_json = response.json()["response"]["holidays"]
df = pd.DataFrame(response_json)
df = df.join(pd.DataFrame(df.pop("country").tolist()) \
.rename(columns={"id":"country_id","name":"country_name"}))
df["type"] = [x[0] for x in df["type"]]
df = df.join(pd.DataFrame(df.pop("date").tolist()))
df = df.join(pd.DataFrame(df.pop("datetime").tolist())).drop("iso", axis=1)
df_new = df["timezone"].apply(pd.Series)
df = pd.concat([df, df_new],axis=1).drop("timezone", axis=1).drop(0, axis=1)
if COUNTRY in specific_country:
new_df = df.explode("states")
new_df = pd.concat([new_df.drop(['states'], axis=1), new_df['states'].apply(pd.Series)
.rename(columns={ "id": "state_id",
"abbrev": "state_abbrev",
"name": "state_name",
"exception": "state_exception",
"iso": "state_iso"})], axis=1) \
.drop(0, axis=1)
object_cols = [col for col in new_df.columns if new_df[col].dtypes=="O"]
for col in object_cols:
new_df[col] = new_df[col].astype("string")

new_df.to_csv(f"{destination_path}/{COUNTRY.lower()}-{YEAR}-holiday.csv", index=False)
new_df.to_parquet(f"{destination_path}/{COUNTRY.lower()}-{YEAR}-holiday.parquet", index=False, engine="fastparquet")
else:
df.to_csv(f"{destination_path}/{COUNTRY.lower()}-{YEAR}-holiday.csv", index=False)
df.to_parquet(f"{destination_path}/{COUNTRY.lower()}-{YEAR}-holiday.parquet", index=False, engine="fastparquet")

Ek olarak ilgili veriler API’den alındığında daha sonrasında bu verileri birleştirdiğim ve hedef tabloya aktarmamı sağlayan task olan data_processing task’ı içerisinde hazırlamış olduğum spark kodunu da aşağıya ekliyorum.

import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
import os

findspark.init("/home/emre/spark-3.0.1-bin-hadoop2.7")

path = "/home/emre/Desktop/DE-EmreEvcimen2023/DE-ETL-Project/data/"

parquet_files = [file for file in os.listdir(path) if ".parquet" in file]

spark = SparkSession \
.builder \
.master("local[*]") \
.appName("data-processing-app") \
.config("spark.jars.packages", "org.postgresql:postgresql:42.2.18") \
.getOrCreate()

schema = StructType([
StructField("name", StringType(), True),
StructField("description", StringType(), True),
StructField("type", StringType(), True),
StructField("primary_type", StringType(), True),
StructField("canonical_url", StringType(), True),
StructField("urlid", StringType(), True),
StructField("locations", StringType(), True),
StructField("states", StringType(), True),
StructField("country_id", StringType(), True),
StructField("country_name", StringType(), True),
StructField("year", IntegerType(), True),
StructField("month", IntegerType(), True),
StructField("day", IntegerType(), True),
StructField("hour", IntegerType(), True),
StructField("minute", FloatType(), True),
StructField("second", FloatType(), True),
StructField("offset", StringType(), True),
StructField("zoneabb", StringType(), True),
StructField("zonedst", FloatType(), True),
StructField("zoneoffset", FloatType(), True),
StructField("zonetotaloffset", FloatType(), True),
StructField("state_abbrev", StringType(), True),
StructField("state_exception", StringType(), True),
StructField("state_id", IntegerType(), True),
StructField("state_iso", StringType(), True),
StructField("state_name", StringType(), True),

])

all_data_df = spark.createDataFrame([], schema)


for file in parquet_files:
df = spark.read.parquet(f"{path}{file}")
for column in [column for column in all_data_df.columns if column not in df.columns]:
df = df.withColumn(column, lit(None))
all_data_df = all_data_df.union(df)


url = "jdbc:postgresql://localhost:5432/dataops6"
properties = {
"user": "postgres",
"password": "*****"
}

all_data_df.write.jdbc(url=url,
table="country_holiday",
mode="overwrite",
properties=properties)

DAG’ın kod kısmında görmüş olacağınız gibi birden çok connection ve variable kullanmışız bunları Web UI üzerinden nasıl tanımladık gelin ona bakalım.

Connection için:

Web UI üzerindeki Admin sekmesini içerisinde bulunan Connections’a tıklayarak ilgili ekrana gidiyoruz. Daha sonrasında “+” işaretine tıklayarak yeni bir bağlantı ekleyebiliyoruz.

HttpSensor içerisindeki connection’ın bağlantı bilgileri:

PostgresOperator içerisindeki connection’ın bağlantı bilgileri:

SparkSubmitOperator içerisindeki connection’ın bağlantı bilgileri:

Variables için:

Web UI üzerindeki Admin sekmesini içerisinde bulunan Variables’a tıklayarak ilgili ekrana gidiyoruz. Daha sonrasında “+” işaretine tıklayarak yeni bir değişken ekleyebiliyoruz.

calendarific_api_key_secret değişkeni için bilgiler:

calendarific_year değişkeni için bilgiler:

Artık DAG’ımızı çalıştırıp çıktısına bakabiliriz.

Dag’ımız başarılı şekilde çalıştı gelin isterseniz adım adım çıktılara bakalım.

emre@emre-lenovo:~/Desktop/DE-EmreEvcimen2023/DE-ETL-Project/data$ ls  *.parquet
de-2023-holiday.parquet es-2023-holiday.parquet tr-2023-holiday.parquet us-2023-holiday.parquet

Evet tam da istediğimiz gibi parquet formatındaki verilerimiz çalıştırdığımız parametrelere göre belirttiğimiz folder’a gelmiş.

Spark kodu’nun çıktısına bakmak içinde Graph sekmesinde ilgili task’a gelerek Log kısmına tıkladığınızda spark loglarını görebilirsiniz.

Spark task’ımızda başarılı şekilde çalışmış gözüküyor. Gelin şimdi DBeaver üzerinden PostgreSQL’de belirttiğimiz tablonun içerisine bakalım.

Tablomuza baktığımız da da verilerin sağlıklı şekilde ve istediğimiz formatta akış sağladığını görebiliriz.

Umarım sizler için yararlı bir yazı olmuştur. Herhangi bir sorunuzu veya önerinizi benimle paylaşabilirsiniz. Okuduğunuz için teşekkürler. Görüşmek üzere :)

--

--