DE- Örnek Proje 1

Emre Evcimen
9 min readMar 23, 2023

Merhabalar, bu yazımda sizlere veri mühendisliği tarafı için hazırladığım örnek bir projeyi anlatacağım.Veri mühendisliği, veri bilimi projelerinin temel bir aşamasıdır. Bu aşama, veri kaynaklarının temizlenmesi, birleştirilmesi, dönüştürülmesi ve depolanması gibi işlemleri içerir. Veri mühendisleri, veri bilimcilerin kullanabileceği yüksek kaliteli, güvenilir ve anlamlı veri setleri oluşturarak, veri biliminin anahtar başarısını sağlarlar. Veri mühendisliği, veri bilimi projelerindeki en zorlu görevlerden biri olabilir. Ancak, uygun bir yöntem ve araç setiyle, veri mühendisliği süreci daha verimli ve etkili hale getirilebilir. Bu yazıda, veri mühendisliği sürecinin nasıl tasarlanacağı ve uygulanacağına dair örnek bir proje sunacağım. Bu projede, API servisinden elde edilen verinin istenilen formata ve güvenirliğe ulaştırarak son kullanıcıya sunulması sürecini ele alacağız.

Proje de gerçekleşen adımlara ve mimariye görsel olarak bir bakalım.

Yukarıdaki görseli yazılı olarak aktaracak olursam bir API servisine istenilen parametreleri ileterek json formatında bir dönüş alıyoruz. Daha sonrasında yazmış olduğumuz python script’i ile gelen bu json formatındaki veriyi pandas kütüphanesini kullanarak DataFrame formatına çeviriyoruz ve bunuda parquet formatında lokalimize kaydediyoruz. Lokalde kaydedilen parquet formatındaki dosyayı daha sonra Hadoop ekosistemindeki veri depoladığımız kısım olan HDFS’e aktarmak için bir linux komutu yazıyoruz. HDFS’e aktarılan veriyi Apache Spark kullanarak son kullanıcının istediği formata göre işliyoruz. İlgili veriyi son kullanıcıya sunmadan önce veri kalitesini test etmek için oluşturduğumuz kurallara uyup uymadığını kontrol ediyoruz. Eğer uygunsa Apache Hive üzerinde bir external table oluşturarak son kullanıcıya sunuyoruz. Hata alması durumunda ise Slack’te belirttiğimiz kanala bir hata bildirimi iletmesini istiyoruz. Genel olarak görseldeki adımları ve proje hedefimizi açıklamış olduk. Gelin isterseniz Airflow Web UI ve Code kısmına bakalım.

from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.apache.hive.operators.hive import HiveOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from football_api import get_league_data
from airflow.models import Variable

default_args = {
"owner": "emre",
"start_date" : datetime(2023,3,23),
"schedule_interval": "@daily",
"retries": 2,
"retry_delay": timedelta(minutes=1),
"tags": ["data-engineer"]
}

def _get_message() -> str:
return "Data Validation Error!! Please Check Data"

with DAG( dag_id = "de-pipeline",
catchup=False,
default_args=default_args
) as dag:

task_is_api_active = HttpSensor(
task_id="is_api_activate",
http_conn_id= "api_football_conn",
endpoint="/?action=get_teams&league_id={}&APIkey={}" \
.format(Variable.get("league_id"), Variable.get("apifootball_api_key_secret"))
)

task_get_league_data = PythonOperator(
task_id="task_get_league_data",
python_callable=get_league_data,
op_args=[Variable.get("league_id"), Variable.get("apifootball_api_key_secret")]
)

task_move_data_hdfs = BashOperator(
task_id = "task_move_data_hdfs",
bash_command= """
hdfs dfs -mkdir -p /league_data && \
hdfs dfs -put /home/emre/datasets/*.parquet /league_data
"""
)

task_data_processing = SparkSubmitOperator(
task_id = "task_data_processing",
conn_id="my_spark3_conn",
application="/home/emre/Desktop/DE-EmreEvcimen2023/airflow-project/spark/spark-data-processing.py",
verbose=False
)

task_data_validation_ge = SSHOperator(
task_id = "task_data_validation_ge",
ssh_conn_id= "my_ssh_conn",
command= 'source /home/emre/ge-working/bin/activate;'
'cd /home/emre/Desktop/DE-EmreEvcimen2023/airflow-project/validation/;'
'spark-submit --master spark://emre-lenovo:7077 /home/emre/Desktop/DE-EmreEvcimen2023/airflow-project/validation/data_ge_validation.py'

)

task_create_external_table = HiveOperator(
task_id = "task_create_external_table",
hive_cli_conn_id = "my_hive_conn",
hql = """
DROP TABLE IF EXISTS football.football_ds;
CREATE EXTERNAL TABLE football.football_ds(
team_key int,
team_name varchar(500),
team_badge varchar(500),
coach_age int,
coach_country int,
coach_name varchar(500),
player_age int,
player_assists int,
player_birthdate date,
player_blocks int,
player_clearances int,
player_country varchar(500),
player_crosses_total int,
player_dispossesed int,
player_dribble_attempts int,
player_dribble_succ int,
player_duels_total int,
player_duels_won int,
player_fouls_committed int,
player_goals int,
player_goals_conceded int,
player_id int,
player_image varchar(500),
player_injured int,
player_inside_box_saves int,
player_interceptions int,
player_is_captain boolean,
player_key int,
player_key_passes int,
player_match_played int,
player_name varchar(500),
player_number int,
player_passes int,
player_passes_accuracy int,
player_pen_comm int,
player_pen_missed int,
player_pen_scored int,
player_pen_won int,
player_rating int,
player_red_cards int,
player_saves int,
player_shots_total int,
player_substitute_out int,
player_substitutes_on_bench int,
player_tackles int,
player_type varchar(500),
player_woordworks int,
player_yellow_cards int
)
STORED AS ORC
LOCATION '/league_data/football-orc';
""",
trigger_rule = "all_success"
)

task_invalid_data_notification = SlackWebhookOperator(
task_id = "task_invalid_data_notification",
http_conn_id = "slack_conn",
message = _get_message(),
trigger_rule = "all_failed"
)

task_is_api_active >> task_get_league_data >> task_move_data_hdfs >> task_data_processing >> task_data_validation_ge >> [task_create_external_table, task_invalid_data_notification ]

Mevcut DAG’ımız üzerindeki task’ları teker teker inceleyerek ne görev üstlendiklerine bir bakalım.

task_is_api_active = HttpSensor(
task_id="is_api_activate",
http_conn_id= "api_football_conn",
endpoint="/?action=get_teams&league_id={}&APIkey={}" \
.format(Variable.get("league_id"), Variable.get("apifootball_api_key_secret"))
)

İlk task’ımız olan task_is_api_active ile verilerimizi alacağımız API servisinin canlı olup olmadığını kontrol ediyoruz. Bu task’ta önemli olan kısımlar http_conn_id ve format içerisindeki değişkenler. Bunları oluşturmak için Airflow Web UI ekranlarını kullanabilirsiniz.

http_conn_id için yapmış olduğum ayarlar: (Admin → Connection )

league_id ve APIkey deişkenleri için yapmış olduğum ayarlar: (Admin → Variables)

API servisi çalışır durumda ise daha sonrasında ilgili servisten verileri kendi lokalimize çekmek için kullandığımız python script’ini çalıştıracağımız task olan task_get_league_data inceleyelim.

from football_api import get_league_data

task_get_league_data = PythonOperator(
task_id="task_get_league_data",
python_callable=get_league_data,
op_args=[Variable.get("league_id"), Variable.get("apifootball_api_key_secret")]
)

DAG kodunun tamamına baktığınızda veriyi ilgili servisten çekecek herhangi bir kod bulunmuyor. Aslında bu işlemi dags klasörünün altında oluşturduğumuz başka bir python dosyasındaki fonksiyonu çağırarak ilgili task’a tanıtıyoruz. Bu işlemi gerçekleştiren python scriptini de aşağıda paylaşıyorum.

import requests
import pandas as pd
import json


def get_league_data(league_id, API_KEY):
URL= f"""
https://apiv3.apifootball.com/?action=get_teams&league_id={league_id}&APIkey={API_KEY}"""

response = requests.get(URL)
response_json = json.loads(response.text)
df = pd.DataFrame(response_json)
df.to_parquet("/home/emre/datasets/england_non_league_premier.parquet",index=False)

Evet bu işlemlerle birlikte API servisindeki verileri lokalimize almış olduk. Şimdi verilerin bulunduğu parquet dosyasını HDFS’e gönderelim.

 task_move_data_hdfs = BashOperator(
task_id = "task_move_data_hdfs",
bash_command= """
hdfs dfs -mkdir -p /league_data && \
hdfs dfs -put /home/emre/datasets/*.parquet /league_data
"""
)

Evet yukarıda görmüş olduğunuz gibi bu işlemi BashOperator kullanarak gerçekleştiriyoruz. Ben Hadoop’u kendi lokalime kurduğum için Edge sunucu görevide üstleniyor. Eğer sizin başka bir sunucuda bunu çalıştırma gibi durumunuz olursa veriyi o sunucuya çekip bu aşamada da SSHOperator kullanabilirsiniz.

Evet verimizi hadoop ortamına da aktardığımıza göre artık Apache Spark’ın gücünü kullanarak verimizi işleyebiliriz ve son kullanıcının istediği formata dönüştürebiliriz.

task_data_processing = SparkSubmitOperator(
task_id = "task_data_processing",
conn_id="my_spark3_conn",
application="/home/emre/Desktop/DE-EmreEvcimen2023/airflow-project/spark/spark-data-processing.py",
verbose=False
)

Airflow üzerinde Spark Job’ınızı birden çok yöntemle çalıştırabilirsiniz bunun için ilgili linki ziyaret etmenizi tavsiye ederim. Ben SparkSubmitOperator’u kullanarak bu işlemi gerçekleştireceğim.

my_spark3_conn bağlantısı için yaptığım ayarlar: (Admin → Connections)

application kısmında çalışan Spark Script’ini de aşağıda paylaşıyorum.

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, from_json
from pyspark.sql.types import IntegerType, BooleanType, DateType


expect_columns = ["team_name", "team_badge", "player_image", "player_name", "player_is_captain",
"player_country", "player_type", "player_birthdate", "coach_name", "couch_country"]

spark = SparkSession \
.builder \
.master("yarn") \
.appName("football-app") \
.getOrCreate()

df = spark \
.read \
.format("parquet") \
.option("header","True") \
.option("inferSchema","True") \
.parquet("/league_data/england_non_league_premier.parquet")

df = df.select("team_key","team_name","team_badge",explode("coaches").alias("coaches_exploded"),"players") \
.select("team_key","team_name","team_badge","coaches_exploded.*",explode("players").alias("players_exploded")) \
.select("team_key","team_name","team_badge","coach_age","coach_country","coach_name","players_exploded.*")


for column in df.columns:
if column not in expect_columns:
df = df.withColumn(f"{column}", col(column).cast(IntegerType()))
elif column == "player_birthdate":
df = df.withColumn(f"{column}", col(column).cast(DateType()))
elif column == "player_is_captain":
df = df.withColumn(f"{column}", col(column).cast(BooleanType()))
else:
pass

df.write.option("header","True").orc("/league_data/football-orc")

Spark Job’ımızın çalışmasıyla birlikte veriyi son kullanıcıya sunabileceğimiz bir formata dönüştürdük. Ancak bazı durumlarda veri kalitesini kontrol etmekte sizden beklenen ve sürece dahil etmeniz gereken bir parça olabilir. Ben bunun için açık kaynaklı bir araç olan Great Expectatitons’u kullandım. Bu araç ile ilgili daha önce detaylı bir yazı yazmıştım. Dilerseniz inceleyebilirsiniz.

task_data_validation_ge = SSHOperator(
task_id = "task_data_validation_ge",
ssh_conn_id= "my_ssh_conn",
command= 'source /home/emre/ge-working/bin/activate;'
'cd /home/emre/Desktop/DE-EmreEvcimen2023/airflow-project/validation/;'
'spark-submit --master spark://emre-lenovo:7077 /home/emre/Desktop/DE-EmreEvcimen2023/airflow-project/validation/data_ge_validation.py'
)

my_ssh_conn bağlantısı için yaptığım ayarlar:

Kod üzerinden oluşturduğumuz kurallara bir göz atalım.

from pyspark.sql import SparkSession
import great_expectations as ge
from great_expectations.dataset import SparkDFDataset

spark = SparkSession \
.builder \
.master("yarn") \
.appName("football-ds-ge-validate") \
.getOrCreate()

df = spark \
.read \
.format("orc") \
.option("header","True") \
.orc("/league_data/football-orc")

context = ge.data_context.DataContext()

ge_df = SparkDFDataset(df)
ge_df.expect_table_column_count_to_equal(value=48)
ge_df.expect_column_values_to_not_be_null("team_key")
ge_df.save_expectation_suite(filepath="great_expectations/uncommitted/validations/emre.json")
results = ge_df.validate()
if dict(results)["success"] and dict(results)["statistics"]["success_percent"] == 100.0:
print("Raw Data Validation valid")
else:
raise ValueError('Raw Data Validation invalid')
ge_df.expect_table_column_count_to_equal(value=48)
ge_df.expect_column_values_to_not_be_null("team_key")

Görmüş olduğunuz gibi 2 kural mevcut. İlk kuralımız kolon sayısının 48 olup olmama durumunu kontrol edecek. İkinci kuralımız ise “team_key” kolonunun da null değer olup olmadığını kontrol edecek. Örnek olarak kolon sayısı 48'den az veya fazla olduğunda ya da team_key kolonunda null değer olduğunda bizim Spark Job’ımız hata verecek. ( → Raw Data Validation invalid) . Peki airflow kısmına baktığınızda command kısmına dikkat etmenizi istiyorum. Burada açıklamam gereken birkaç nokta olacak.

command= 'source /home/emre/ge-working/bin/activate;'
'cd /home/emre/Desktop/DE-EmreEvcimen2023/airflow-project/validation/;'
'spark-submit --master spark://emre-lenovo:7077 /home/emre/Desktop/DE-EmreEvcimen2023/airflow-project/validation/data_ge_validation.py'

Great Expectations kurulumu yaparken sanal ortam içerisinde bir kurulum gerçekleştirmiştim ve ilgili ge(Great Expectations) komutlarını bu sanal ortam haricinde çalıştıramıyorum. Bu yüzden ilk olarak ilgili sanal ortamı aktif ediyorum. Daha sonrasında bir great expectations projesi yarattığınızda great_expectations init komutunu çalıştırdığınızda ilgili dizine great_expectations adında bir klasör oluşacak. Bu klasör veri kontrolü yapacağınız script ile aynı dizinde olmalı aksi halde sanal ortamda bile olsanız kodu çalıştıramaz ve hata alırsınız. Bu yüzden cd komutu ile hem ilgili kodun hem de great_expectations klasörünün olduğu dizine gidiyorum. Daha sonrasında kontrolü yapacak scriptimizi gönül rahatlığıyla çalıştırabiliriz.

Eğer veriler bu kontrolden geçerlerse artık son kullanıcıya sunulabilir. Bunun için ben hedef sistem olarak Apache Hive’ı seçtim.

task_create_external_table = HiveOperator(
task_id = "task_create_external_table",
hive_cli_conn_id = "my_hive_conn",
hql = """
DROP TABLE IF EXISTS football.football_ds;
CREATE EXTERNAL TABLE football.football_ds(
team_key int,
team_name varchar(500),
team_badge varchar(500),
coach_age int,
coach_country int,
coach_name varchar(500),
player_age int,
player_assists int,
player_birthdate date,
player_blocks int,
player_clearances int,
player_country varchar(500),
player_crosses_total int,
player_dispossesed int,
player_dribble_attempts int,
player_dribble_succ int,
player_duels_total int,
player_duels_won int,
player_fouls_committed int,
player_goals int,
player_goals_conceded int,
player_id int,
player_image varchar(500),
player_injured int,
player_inside_box_saves int,
player_interceptions int,
player_is_captain boolean,
player_key int,
player_key_passes int,
player_match_played int,
player_name varchar(500),
player_number int,
player_passes int,
player_passes_accuracy int,
player_pen_comm int,
player_pen_missed int,
player_pen_scored int,
player_pen_won int,
player_rating int,
player_red_cards int,
player_saves int,
player_shots_total int,
player_substitute_out int,
player_substitutes_on_bench int,
player_tackles int,
player_type varchar(500),
player_woordworks int,
player_yellow_cards int
)
STORED AS ORC
LOCATION '/league_data/football-orc';
""",
trigger_rule = "all_success"
)

my_hive_conn bağlantısı için yaptığım ayarlar:

Yukarıda görmüş olduğunuz script Apache Spark ile veriyi işledikten sonra veriyi depoladığım dizin olan (/league_data/football-orc) alana erişerek bir external table oluşturmakta. Hive’da, external table, Hive veritabanı tarafından yönetilmeyen bir veri depolama yerinde bulunan bir tablodur. Yani, veri dosyaları HDFS veya diğer dosya sistemlerinde bulunabilir ve Hive, bu verilere erişmek için harici bir bağlantı noktası sağlar. İlgili tablo’yu sildiğinizde silme işlemi sadece Hive’da gerçekleşir. Veriye HDFS üzerinden hala erişebilir durumda olursunuz.

Eğer veri kalitesi kurallarına uymaz ise bize Slack üzerinden bildirim ileteceğini söylemiştik bunun için de aşağıdaki task’ı kurguluyoruz.

 def _get_message() -> str:
return "Data Validation Error!! Please Check Data"

task_invalid_data_notification = SlackWebhookOperator(
task_id = "task_invalid_data_notification",
http_conn_id = "slack_conn",
message = _get_message(),
trigger_rule = "all_failed"
)

Evet tüm aşamalardan bahsettiğimize göre artık DAG’ımızı çalıştırabiliriz.

DAG başarılı şekilde çalıştı. Şimdi ilgili işlemler yapılmış mı bizzat gidip kontrollerimizi yapalım.

emre@emre-lenovo:~/datasets$ ls -l
total 104
-rw-r--r-- 1 root root 102616 Mar 23 15:47 england_non_league_premier.parquet

İlk olarak veriler lokalime parquet formatında gelecekti. Kontrol ettiğimizde başarılı şekilde geldiğini görüyoruz.

emre@emre-lenovo:~/datasets$ hdfs dfs -ls /league_data
Found 2 items
-rw-r--r-- 1 root supergroup 102616 2023-03-23 15:48 /league_data/england_non_league_premier.parquet
drwxr-xr-x - root supergroup 0 2023-03-23 15:48 /league_data/football-orc
emre@emre-lenovo:~/datasets$ hdfs dfs -ls /league_data/football-orc
Found 2 items
-rw-r--r-- 1 root supergroup 0 2023-03-23 15:48 /league_data/football-orc/_SUCCESS
-rw-r--r-- 1 root supergroup 45933 2023-03-23 15:48 /league_data/football-orc/part-00000-0a90d2ff-8da0-45a5-8a83-0061729e0036-c000.snappy.orc

Verimiz lokalden Hadoop ortamına başarılı şekilde geçmiş ve Spark kodumuz gerekli işlemeyi yaptıktan sonra orc formatındaki veriyi istediğimiz HDFS dizinine göndermiş gözüküyor. Süper 👌 🙏

Son olarak Hive tarafına bakarak verimizi sorgulayım.

emre@emre-lenovo:~/datasets$ beeline -u jdbc:hive2://localhost:10000
Connecting to jdbc:hive2://localhost:10000
2023-03-23 15:57:18,202 INFO jdbc.Utils: Supplied authorities: localhost:10000
2023-03-23 15:57:18,203 INFO jdbc.Utils: Resolved authority: localhost:10000
Connected to: Apache Hive (version 2.3.9)
Driver: Hive JDBC (version 2.3.7)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 2.3.7 by Apache Hive
0: jdbc:hive2://localhost:10000> use football;
No rows affected (0.113 seconds)
0: jdbc:hive2://localhost:10000> select team_key,team_name,coach_name,player_name from football_ds limit 5;
+-----------+------------+-------------+----------------+
| team_key | team_name | coach_name | player_name |
+-----------+------------+-------------+----------------+
| 2795 | Aveley | K. Rowland | M. Gelashvili |
| 2795 | Aveley | K. Rowland | J. Mochalski |
| 2795 | Aveley | K. Rowland | J. Ring |
| 2795 | Aveley | K. Rowland | S. Sheehan |
| 2795 | Aveley | K. Rowland | G. Winn |
+-----------+------------+-------------+----------------+
5 rows selected (0.212 seconds)
0: jdbc:hive2://localhost:10000>

Evet verilerimizi Hive üzerinde başarılı şekilde sorgulayabildik. Ya hata alsaydı o zaman nasıl bir senaryo olacaktı ona bakalım.

Görmüş olduğunuz gibi hata alması durumunda hive’da tablo oluşturmak yerine bana Slack üzerinden bir bildirim gönderdi.

Umarım sizler için faydalı bir yazı olmuştur. Herhangi bir soru ve öneriniz olması halinde benimle iletişime geçebilirsiniz. Teşekkürler 🙏

--

--