DE- Örnek Proje 2

Emre Evcimen
8 min readApr 5, 2023

Merhabalar daha öncesinde DE- Örnek Proje 1 başlıklı yazımda sizlere veri mühendisliği alanında yapılabilecek örnek bir projeyi anlatmıştım. Bu yazımda da başka bir örnek projeyi ele alacağım. Proje’nin genel tanımını yapmadan önce görsel mimariyi aşağıda paylaşıyorum.

Proje Tanımı: Spark ile lokalimizde bulunan kompleks(Örn: bir kolonda sözlük yapısında birden fazla verinin bulunması vb.) yapıdaki parquet formatındaki veriyi Spark ile birden fazla veri setine ayırarak bu veri setlerini PostgreSQL’e yazmak. Daha sonrasında DBT aracını kullanarak stg(staging) ve rpt(report) katmanları oluşturarak veriyi görselleştirilecek hale getirmek(Veri Modelleme) ve bunu Airflow aracı ile otomasyonunu sağlamak. Son olarak da Metabase üzerinde örnek görselleştirmeler hazırlayacağız.

İlk olarak gelin kısaca veri setlerine bir bakalım.

Not: parq komutunu kullanmak için pip install parquet-cli komutunu çalıştırmalısınız.

tmdb_5000_credits.parquet:

parq tmdb_5000_credits.parquet 
Output:
# Metadata
<pyarrow._parquet.FileMetaData object at 0x7fa9bd123db0>
created_by: parquet-cpp-arrow version 10.0.0
num_columns: 4
num_rows: 4803
num_row_groups: 1
format_version: 2.6
serialized_size: 2763

tmdb_5000_credits.parquet:

 parq tmdb_5000_movies.parquet 
Output:
# Metadata
<pyarrow._parquet.FileMetaData object at 0x7f33dff08720>
created_by: parquet-cpp-arrow version 10.0.0
num_columns: 20
num_rows: 4803
num_row_groups: 1
format_version: 2.6
serialized_size: 11311

tmdb_5000_credits.parquet dosyasındaki baştan ilk 3 satıra baktığımızda dikkatimizi çekmesi gereken kolonlar cast ve crew. Görmüş olduğuz gibi liste içerisinde birden fazla sözlük veri tipinde değer içeriyorlar. Aslında bu iki kolon işlenerek farklı veri setleri haline getirilmesi gerekiyor. Aksi halde görselleştirme ve sorgulama konusunda son kullanıcı için verimsiz bir halde bulunmuş olacaklar. Aynı şekilde tmdb_5000_movies.parquet dosyasındaki kolonlara bakacak olursak yukarıdaki duruma benzer bir çok kolon mevcut olduğunu görebiliriz.

Burada Apache Spark kullanarak ilgili kısımları farklı veri setlerine ayırarak PostgreSQL’de depolayacağız. Spark tarafında bu işlemi gerçekleştirmek için yazdığım kodu aşağıda paylaşıyorum.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode_outer, from_json, to_date
from pyspark.sql.types import ArrayType, StructField, StructType, IntegerType, StringType, FloatType, DoubleType

spark = SparkSession \
.builder \
.master("yarn") \
.appName("tmdb-app") \
.config("spark.jars.packages", "org.postgresql:postgresql:42.2.18") \
.getOrCreate()

url = "jdbc:postgresql://localhost:5432/tmdb"

properties = {
"user": "postgres",
"password": "*****"
}

write_database_df = []

table_names = ["raw_cast","raw_crew","raw_movies","raw_genres","raw_keywords","raw_production_companies",
"raw_production_countries","raw_spoken_languages"]

df_credits = spark \
.read \
.parquet("file:///home/emre/Desktop/medium_dp/spark/tmdb_5000_credits.parquet")



cast_column_schema = (
ArrayType(
StructType([
StructField("cast_id", IntegerType()),
StructField("character", StringType()),
StructField("credit_id", StringType()),
StructField("gender", IntegerType()),
StructField("id", IntegerType()),
StructField("name", StringType()),
StructField("order", IntegerType())
])
)
)

crew_column_schema = (ArrayType(
StructType([
StructField("credit_id", StringType()),
StructField("department", StringType()),
StructField("gender", IntegerType()),
StructField("id", IntegerType()),
StructField("job", StringType()),
StructField("name", StringType())
])
)
)


df_credits_nested= df_credits \
.withColumn("cast", from_json(col("cast"), cast_column_schema)) \
.withColumn("crew", from_json(col("crew"), crew_column_schema))


cast_df = df_credits_nested \
.select("movie_id", "title", explode_outer("cast")) \
.select("movie_id","title", "col.*")

cast_df = cast_df \
.withColumn("movie_id",col("movie_id").cast(IntegerType()))

write_database_df.append(cast_df)




crew_df = df_credits_nested \
.select("movie_id", "title", explode_outer("crew")) \
.select("movie_id", "title", "col.*")

crew_df = crew_df.withColumn("movie_id", col("movie_id").cast(IntegerType()))

write_database_df.append(crew_df)



df_movies = spark \
.read \
.parquet("file:///home/emre/Desktop/medium_dp/spark/tmdb_5000_movies.parquet")


df_movies_lst = df_movies \
.select("id","title", "budget", "homepage", "original_language", "original_title", \
"overview", "popularity", "release_date", "revenue", "runtime", "status", \
"tagline", "vote_average", "vote_count") \
.withColumnRenamed("id", "movie_id")


movies_df = df_movies_lst.withColumn("movie_id", col("movie_id").cast(IntegerType())) \
.withColumn("budget", col("budget").cast(DoubleType())) \
.withColumn("popularity", col("popularity").cast(FloatType())) \
.withColumn("release_date", to_date("release_date", "yyyy-MM-dd")) \
.withColumn("revenue", col("revenue").cast(DoubleType())) \
.withColumn("runtime", col("runtime").cast(IntegerType())) \
.withColumn("vote_average", col("vote_average").cast(FloatType())) \
.withColumn("vote_count", col("vote_count").cast(IntegerType()))

write_database_df.append(movies_df)



column_schema = (ArrayType(
StructType([
StructField("id", IntegerType()),
StructField("name", StringType()),
])
) )

specific_column = ["genres","keywords","production_companies"]


for cols in specific_column:
specific_clmn_df = df_movies \
.withColumnRenamed("id","movie_id") \
.withColumn("movie_id", col("movie_id").cast(IntegerType())) \
.withColumn(cols, from_json(col(cols), column_schema)) \
.select("movie_id", explode_outer(cols)) \
.select("movie_id","col.*")
globals()[cols + "_df"] = specific_clmn_df
write_database_df.append(globals()[cols + "_df"])


for cols in ["production_countries", "spoken_languages"]:
ps_df_movies_raw = df_movies.select("id",cols).withColumnRenamed("id", "movie_id")
if cols == "production_countries" :
iso_col = "iso_3166_1"
else :
iso_col = "iso_639_1"
schema = (
ArrayType(
StructType([
StructField(iso_col, StringType()),
StructField("name", StringType()),
])
)
)
ps_df_movies_raw = ps_df_movies_raw.withColumn(cols, from_json(cols, schema))
ps_df = ps_df_movies_raw.select("movie_id", explode_outer(cols)).select("movie_id",f"col.{iso_col}","col.name").withColumn("movie_id", col("movie_id").cast(IntegerType()))
globals()[cols + "_df"]= ps_df
write_database_df.append(globals()[cols + "_df"])

for num , tbls in enumerate(write_database_df):
tbls.write.jdbc( url=url,
table=f"{table_names[num]}",
mode="overwrite",
properties=properties
)

Yazmış olduğum Spark kodu yukarıda bahsettiğim veri setlerini okuyarak belirttiğimiz kolonlara işlem uygulayıp birden fazla veri seti oluşturup ortayan çıkan Spark DataFrame’leri sırasıyla PostgreSQL’e yazıyor. Spark kodunu detaylı olarak açıklamayacağım ancak sizin kod kısmında kafanıza takılan veya bir öneriniz olması halinde benimle iletişime geçebilirsiniz. En son PostgreSQL’e yazılacak tabloların isimlerini ve yapılarını aşağıda paylaşıyorum.

Görmüş olduğunuz gibi elimizde 2 adet veri seti varken Spark kodumuzu başarılı şekilde çalıştırdıktan sonra 8 adet tablo ortaya çıkarmış olduk.

Şimdi DBT tarafına geçip orada nasıl işlemler gerçekleştirmişim ona bakalım.

Basit bir DBT Proje yapısı yukarıdaki gibidir. Bu proje özelinde daha çok işlem gerçekleştirdiğim kısımlarıda kırmızı kutucuk içerisinde belirttim. Models dosyasını içerisinde Spark ile PostgreSQL’e ilettiğimiz tabloları okuyup yine PostgreSQL içerisinde dbt_stg ve dbt_rpt şemaları altında görselleştirme ve modellemeye daha uygun hallere getirmek için veri dönüştürme işlemleri gerçekleştirip verileri ilgili şema altına öyle depoluyoruz.

Yukarıda gördüğünüz gibi iki katman içinde farklı dosya path’leri oluşturdum. Gelin iki katmandan da örnek sql ve yml dosyalarına bir bakalım.

dbt_project.yml:


# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'tmdb_dbt'
version: '1.0.0'
config-version: 2

# This setting configures which "profile" dbt uses for this project.
profile: 'tmdb_dbt'

# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

target-path: "target" # directory which will store compiled SQL files
clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_packages"


# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models

# In this example config, we tell dbt to build all models in the example/
# directory as views. These settings can be overridden in the individual model
# files using the `{{ config(...) }}` macro.
models:
tmdb_dbt:
# Config indicated by + and applies to all files under models/example/
stg:
+materialized: table
+schema: stg
reports:
+materialized: table
+schema: rpt

stg_movies.sql:

{{
config(
materialized = 'table',
)
}}
select movie_id,title,budget , original_language ,original_title, overview ,popularity ,release_date,
revenue ,runtime, status, tagline ,vote_average ,vote_count
from {{ source('tmdb', 'raw_movies') }}

stg_genres.sql:

{{
config(
materialized = 'table',
)
}}
select movie_id ,coalesce(id,-9999) as id , name
from {{ source('tmdb', 'raw_genres') }}

stg_cast.sql:

{{
config(
materialized = 'table',
)
}}
select movie_id, title, cast_id, "character" ,
coalesce(credit_id,'0000000000') as credit_id ,
gender, id, "name"
from {{source('tmdb','raw_cast')}}

Yukarıda gördüğünüz gibi stg katmanına verileri göndermeden önce null değerlere sahip olan kolonları belirttiğim değerlerle doldurup ilgili katmana öyle iletiyorum. En başta gördüğünüz config içerisinde materialized değerini table olarak belirtmemizin sebebi PostgreSQL içerisinde bu işlemi tablo olarak kaydetmesini istiyoruz. Source kısmında belirttiğimiz değerlerede aslında sources.yml içerisinde bulunan bilgiler ışığında ulaşılıyor. sources.yml dosyasını da aşağıda paylaşıyorum.

sources.yml

version: 2

sources:
- name: tmdb
database: tmdb
schema: public
tables:
- name: raw_cast
- name: raw_crew
- name: raw_movies
- name: raw_genres
- name: raw_keywords
- name: raw_production_companies
- name: raw_production_countries
- name: raw_spoken_languages

rpt_top3rvnByProductionCompany.sql:

{{
config(
materialized = 'table',
)
}}

with rpt_top3rvnByProductionCompany as (
select sm.title ,sm.revenue,
spc.name as production_company,ssl."name" as spoken_language_name,
row_number() over(partition by spc.name order by sm.revenue desc) as rank_
from {{ ref('stg_movies') }} sm
join {{ ref('stg_production_companies') }} spc
on sm.movie_id =spc.movie_id
join {{ ref('stg_spoken_languages') }} ssl on sm.movie_id = ssl.movie_id
)
select * from rpt_top3rvnByProductionCompany
where rank_<=3

Rapor katmanında da stg aşamasında oluşturduğumuz tabloları referans göstererek son kullanıcı tarafından yaygın şekilde görüntülenen raporları bu katmanda tablo olarak oluşturuyoruz.

DBT tarafını da bitererek artık mevcut verimizi görselleştirecek ve anlamlı değerlere ulaşılacak formata getiriyoruz. Ancak bu sürecin belirli aralıklarla sürekli çalışması gerektiğini düşünürsek bunu otomatize etmemiz gerekiyor. İşte burada da Airflow aracını kullanacağız. Bu süreci gerçekleştirmesi için oluşturduğum dag dosyasına aşağıda görebilirsiniz.

Not: Airflow üzerinde DBT Projenizi tetiklemek istiyorsanız pip install airflow-dbt komutunu çalıştırarak ilgili kütüphaneyi ilgili sanal ortamınıza import etmeniz gerekir.

from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow_dbt.operators.dbt_operator import (
DbtRunOperator,
DbtDocsGenerateOperator
)

default_args = {
"owner": "emre",
"start_date" : datetime(2023,4,4),
"schedule_interval": "@daily",
"retries": 2,
"retry_delay": timedelta(minutes=1),
"tags": ["data-engineer"]
}

with DAG( dag_id = "de-dbt-pipeline",
catchup=False,
default_args=default_args
) as dag:

task_1 = SparkSubmitOperator(
task_id = "extract_data_with_spark",
conn_id="my_spark3_conn",
application="/home/emre/Desktop/medium_dp/spark/extract_data_spark.py"
)

task_2 = DbtRunOperator(
task_id = "run_dbt_task",
profiles_dir= "/home/emre/.dbt/",
dir = "/home/emre/Desktop/medium_dp/tmdb_dbt/"
)

task_3 = DbtDocsGenerateOperator(
task_id = "docs_dbt_task",
profiles_dir= "/home/emre/.dbt/",
dir = "/home/emre/Desktop/medium_dp/tmdb_dbt/"
)

task_1 >> task_2 >> task_3

Evet yukarıda görmüş olduğunuz gibi ilgili DAG’ımız başarılı şekilde çalıştı. Gelin isterseniz tablolar oluşmuş mu bir bakalım.

Süper istediğimiz şemalar ve tablolar oluşturulmuş durumda. 👏🤗 Airflow tarafındaki DAG’a bakacak olursak son task’ta dbt docs komutunu çalıştıracak Operatörü(DbtDocsGenerateOperator) kullanmıştık. Bunun amacı evet biz .yml ve .sql uzantılı dosyaların içine bir şeyler karalıyoruz ama bunu biraz daha insanın anlayabileceği ve evet ya ben bunu yapmak istemiştim her şey yolunda gözüküyor diyebileceği bir Web UI ekranı sağlıyor. Bunu görüntülemek için aşağıdaki komutu çalıştırıyoruz ve o bize otomatik olarak bir Web UI ekranı çıkartıyor.

(dbt-venv) emre@emre-lenovo:~/Desktop/medium_dp/tmdb_dbt$ dbt docs serve

İlgili komutu çalıştırdıktan sonra Web UI bizi karşılıyor. Burada kırmızı okla belirttiğim yere tıklarsanız görsel olarak nasıl bir modelleme yaptığınız daha net olarak göreceksiniz.

Harika işte ortaya çıkan modelimizin görselleşmiş hali burada ilgili tabloya tıklayarak hangi tablolardan beslendiğini ve modelin nasıl bir durumda olduğunu daha net olarak görüyoruz.

Tüm süreci tamamladığımıza göre artık Metabase kısmına geçerek örnek olması için birkaç görselleştirme yapalım.

java -jar metabase.jar

Yukarıdaki komutu çalıştırarak Metabase servisini başlatıyorum.

tmdb üzerindeki tablolara erişmesi için Veritabanı bağlantısını yukarıdaki bilgileri girerek oluşturuyoruz.

Daha sonra ilgili tabloları kullanarak yukarıdaki gibi örnek görselleştirmeleri yaratabiliriz. İlk görselleştirmemizde aksiyon kategorisinde en uzun süreli olan 5 Filmi görüntülüyoruz. İkinci görselleştirmede ise Film Üretici Firması olan 1492 Pictures firmasının en çok gelir elde ettiği ilk 3 filmi Pie formatında gösteriyoruz.

Umarım faydalı bir yazı olmuştur. Geri dönüşleriniz olursa rahatlıkla iletebilirsiniz. Teşekkürler. Görüşmek üzere…👋👋

--

--