Great Expectations ile Veri Kalitesini Arttırma

Emre Evcimen
Bentego Teknoloji
Published in
9 min readMar 14, 2023

--

Merhabalar bu yazımda sizlere Great Expectations aracından bahsedeceğim. Veriyi kaynak sistemden alarak hedef sisteme istenilen formatta aktarmak çok önemlidir. Bunun önemli olduğu kadar başka bir önemli konu ise gelen verinin doğruluğu ve kalitesidir. Veriler istenilen formatta, doğrulukta ve kalitede olursa bu verilerle ortaya çıkan ML Modelleri, Raporlar vb. işler veri sahibini daha doğru adımlarla aksiyon almaya götürecektir. Bu yazıda bahsedeğimiz Great Expectations aracı ile verilerin doğruluğunu ve kalitesini nasıl arttırabiliriz ona bakacağız ama gelin ilk önce Great Expectations aracının tanımına ve genel işlevine bir bakalım.

Great Expectations veri doğrulama aracıdır ve veri kalitesini kontrol etmek için kullanılır. Bu araç sayesinde verilerin doğru ve tutarlı olup olmadığı kontrol edilerek veri güvenilirliğini arttırabiliriz. Ek olarak bu süreçleri otomatikleştirerek ve hızlı bir şekilde çalışmasına olanak tanımış oluruz.

Great Expectations’ı kullabilmek için gereken adımlar aşağıdaki gibidir:

→ Python’da sanal ortam(virtual environment) oluşturmak (Opsiyonel bir seçenektir ancak önerilen bu adımın gerçekleştirilmesidir. )

emre@emre-lenovo:~$ python3 -m venv ge-working
emre@emre-lenovo:~$ source ge-working/bin/activate
(ge-working) emre@emre-lenovo:~$

→ pip install great_expectation diyerek great_expectations paketini indirmek

(ge-working) emre@emre-lenovo:~$ pip install great_expectations
(ge-working) emre@emre-lenovo:~$ pip show great_expectations

Output:
Name: great-expectations
Version: 0.16.0
Summary: Always know what to expect from your data.
Home-page: https://github.com/great-expectations/great_expectations
Author: The Great Expectations Team
Author-email: team@greatexpectations.io
License: Apache-2.0
Location: /home/emre/ge-working/lib/python3.8/site-packages
Requires: colorama, Ipython, nbformat, marshmallow, tqdm, scipy, ruamel.yaml, pandas, altair, jsonschema, requests, typing-extensions, jinja2, pytz, packaging, tzlocal, notebook, makefun, urllib3, cryptography, Click, mistune, ipywidgets, jsonpatch, importlib-metadata, numpy, python-dateutil, pyparsing, pydantic
Required-by:

→ great_expectations — version diyerek kurulumu kontrol edebiliriz.

(ge-working) emre@emre-lenovo:~$ great_expectations --version
great_expectations, version 0.16.0

Yukarıdaki adımlarla birlikte Great Expectations aracının kurulumunu tamamlamış olduk. Şimdi ise veri kalitesini ve doğruluğunu kontrol edeceğimiz bir kaynak oluşturalım. Bunun için ben PostgreSQL’i kullanacağım.

(ge-working) emre@emre-lenovo:~$ psql -U postgres -W
Password:
psql (12.14 (Ubuntu 12.14-0ubuntu0.20.04.1))
Type "help" for help.

postgres=# CREATE DATABASE ge_medium;
CREATE DATABASE
postgres=# \c ge_medium;
Password:
You are now connected to database "ge_medium" as user "postgres".
ge_medium=# CREATE SCHEMA cst;
CREATE SCHEMA
ge_medium=# CREATE TABLE cst.customer_info(
ge_medium(# customer_id serial primary key,
ge_medium(# customer_name varchar(50),
ge_medium(# customer_surname varchar(50),
ge_medium(# customer_city_id int,
ge_medium(# customer_is_active char(1),
ge_medium(# customer_created_date date default current_date);
CREATE TABLE
ge_medium=# insert into cst.customer_info(customer_name,customer_surname,customer_city_id,customer_is_active)
ge_medium-# values ('emre','evcimen','34','e'),('emin','yurdakul','47','h'),('ayşe','dal','38','e'),
ge_medium-# ('ayberk','erdoğan',7,'h'),('irem','er',28,'e');
INSERT 0 5
ge_medium=# select * from cst.customer_info;
customer_id | customer_name | customer_surname | customer_city_id | customer_is_active | customer_created_date
-------------+---------------+------------------+------------------+--------------------+-----------------------
1 | emre | evcimen | 34 | e | 2023-03-14
2 | emin | yurdakul | 47 | h | 2023-03-14
3 | ayşe | dal | 38 | e | 2023-03-14
4 | ayberk | erdoğan | 7 | h | 2023-03-14
5 | irem | er | 28 | e | 2023-03-14
(5 rows)

Evet kaynak tablomuzu oluşturduk ve içine birkaç veri ekledik. Tüm hazırlıklar tamamsa Great Expectations aracını kullanarak veri kalitesini kontrol edelim. İlk önce yapmamız gereken projeyi yöneteceğimiz dizine gidip aşağıdaki komutu çalıştırmak olacak.

(ge-working) emre@emre-lenovo:~/Desktop/DE-EmreEvcimen2023/ge-medium$ great_expectations init
Using v3 (Batch Request) API

___ _ ___ _ _ _
/ __|_ _ ___ __ _| |_ | __|_ ___ __ ___ __| |_ __ _| |_(_)___ _ _ ___
| (_ | '_/ -_) _` | _| | _|\ \ / '_ \/ -_) _| _/ _` | _| / _ \ ' \(_-<
\___|_| \___\__,_|\__| |___/_\_\ .__/\___\__|\__\__,_|\__|_\___/_||_/__/
|_|
~ Always know what to expect from your data ~

Let's create a new Data Context to hold your project configuration.

Great Expectations will create a new directory with the following structure:

great_expectations
|-- great_expectations.yml
|-- expectations
|-- checkpoints
|-- plugins
|-- .gitignore
|-- uncommitted
|-- config_variables.yml
|-- data_docs
|-- validations

OK to proceed? [Y/n]: Y

great_expectations init diyerek gelen soruya da “Y” diyerek araç kullanımı için ilk adımı atmış oluyoruz. Bu komuttan sonra bulunduğunuz dizinde great_expectations adında bir klasör oluşacaktır.

Önemli bir dipnot olarak da great_expectations ile ilgili çalıştıracağınız tüm komutları ilgili dizinde çalıştırmalısınız aksi takdirde hata alırsınız. Bu dipnotu paylaştıktan sonra gelin şimdi Great Expectations aracının kontrol edeceği veri kaynağı için bir tanımlama yapalım. Bunu hem CLI’dan hem de python kodu ile yapabilirsiniz .

CLI için great_expactations datasource new diyerek interaktif bir şekilde tanımlama yapabilirsiniz. Ancak ben bu yazıda python üzerinde hazırlamış olduğum kodlar ile devam edeceğim.

import great_expectations as gx
from great_expectations.cli.datasource import sanitize_yaml_and_save_datasource, check_if_datasource_name_exists


context = gx.get_context()
datasource_name = "data_quality_medium"


host = "localhost"
port = "5432"
username = "postgres"
password = "****"
database = "ge_medium"
schema_name = "cst"


table_name = "customer_info"

example_yaml = f"""
name: {datasource_name}
class_name: Datasource
execution_engine:
class_name: SqlAlchemyExecutionEngine
credentials:
host: {host}
port: '{port}'
username: {username}
password: {password}
database: {database}
drivername: postgresql
data_connectors:
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
default_inferred_data_connector_name:
class_name: InferredAssetSqlDataConnector
include_schema_name: True
introspection_directives:
schema_name: {schema_name}
default_configured_data_connector_name:
class_name: ConfiguredAssetSqlDataConnector
assets:
{table_name}:
class_name: Asset
schema_name: {schema_name}
"""

context.test_yaml_config(yaml_config=example_yaml)
sanitize_yaml_and_save_datasource(context, example_yaml, overwrite_existing=False)

Yukarıdaki python kodu ile gerekli tanımlamayı yapıyoruz. Daha sonrasında CLI kullanarak da kontrolünü yapabiliriz.

(ge-working) emre@emre-lenovo:~/Desktop/DE-EmreEvcimen2023/ge-medium$ python ge-create-data-source.py 
(ge-working) emre@emre-lenovo:~/Desktop/DE-EmreEvcimen2023/ge-medium$ great_expectations datasource list

Output:
Using v3 (Batch Request) API
1 Datasource found:

- name: data_quality_medium
class_name: Datasource

Şimdi ise bu kaynağa bağlanarak kontrol edilmesini istediğimiz kuralları (expectations suite) ve bu süreci kontrol etmesi için bir tetikleme (checkpoint) nesneleri yaratacağız. Expectations Suite özetle kontrol edilecek kuralları tutan bir görev görmekte. Checkpoint ise hangi kural bloğunu hangi kaynağa bağlanarak yapacağını belirtiğimiz bir görev üstlenmekte.

from ruamel.yaml import YAML
import great_expectations as gx
from great_expectations.core.batch import BatchRequest
from great_expectations.checkpoint import SimpleCheckpoint
from great_expectations.exceptions import DataContextError

yaml = YAML()

context = gx.get_context()

batch_request = {
"datasource_name": "data_quality_medium",
"data_connector_name": "default_configured_data_connector_name",
"data_asset_name": "customer_info",
}


expectation_suite_name = "customer.validate"
try:
suite = context.get_expectation_suite(expectation_suite_name=expectation_suite_name)
print(
f'Loaded ExpectationSuite "{suite.expectation_suite_name}" containing {len(suite.expectations)} expectations.'
)
except DataContextError:
suite = context.add_expectation_suite(expectation_suite_name=expectation_suite_name)
print(f'Created ExpectationSuite "{suite.expectation_suite_name}".')


validator = context.get_validator(
batch_request=BatchRequest(**batch_request),
expectation_suite_name=expectation_suite_name,
)

column_names = [f'"{column_name}"' for column_name in validator.columns()]
print(f"Columns: {', '.join(column_names)}.")
print(validator.head(n_rows=5, fetch_all=False))


validator.expect_table_row_count_to_be_between(max_value=15, min_value=5)
validator.expect_table_column_count_to_equal(value=6)
validator.expect_table_columns_to_match_ordered_list(
column_list= ["customer_id","customer_name","customer_surname","customer_city_id",
"customer_is_active","customer_created_date"]
)

validator.expect_column_values_to_not_be_null(column="customer_id")
validator.expect_column_distinct_values_to_be_in_set(column="customer_city_id",value_set =set(list(range(1,82))))

validator.save_expectation_suite(discard_failed_expectations=False)

##########################################################################################

my_checkpoint_name = "medium_checkpoint"

yaml_config = f"""
name: {my_checkpoint_name}
config_version: 1.0
class_name: SimpleCheckpoint
run_name_template: "%Y%m%d-%H%M%S-my-run-name-template"
validations:
- batch_request:
datasource_name: data_quality_medium
data_connector_name: default_inferred_data_connector_name
data_asset_name: cst.customer_info
data_connector_query:
index: -1
expectation_suite_name: customer.validate
"""
context.add_checkpoint(**yaml.load(yaml_config))

Python Scriptini çalıştırdıktan sonra CLI ile kontrol ettiğimizde hem expectations suite hem de checkpoint tanımlamaları başarılı şekilde oluşmuş gözüküyor.

(ge-working) emre@emre-lenovo:~/Desktop/DE-EmreEvcimen2023/ge-medium$ great_expectations suite list
Using v3 (Batch Request) API
1 Expectation Suite found:
- customer.validate
(ge-working) emre@emre-lenovo:~/Desktop/DE-EmreEvcimen2023/ge-medium$ great_expectations checkpoint list
Using v3 (Batch Request) API
Found 1 Checkpoint.
- medium_checkpoint

Bunları Python Script’i değilde CLI ile yapmak isteyenler aşağıdaki komutlar ile interaktif şekilde ilgili tanımlamaları yapabilir.

Exceptations Suite için:

great_expectations suite new

Checkpoint için :

great_expectations checkpoint new

Şimdi gelin isterseniz başka bir Python scriptinde bu checkpoint’i çalıştıralım ve sonuçları görelim.

import great_expectations as gx


my_checkpoint_name = "medium_checkpoint"

context = gx.get_context()

checkpoint_result = context.run_checkpoint(checkpoint_name=my_checkpoint_name)

context.build_data_docs()

validation_result_identifier = checkpoint_result.list_validation_result_identifiers()[0]
context.open_data_docs(resource_identifier=validation_result_identifier)

İlgili komutu çalıştırdıktan sonra web browser içerisinde bir html dosyası bizi karşılıyor.

Evet görmüş olduğunuz gibi karşımıza gerekli kuralların kontrol edilmesi ile birlikte bir rapor çıkıyor.

Status kısmı başarılı gözküyor. 5 kural da ilgili kaynak için sağlanmış gözüküyor. Başarı oranımız %100.

Python kodunda belirttiğimiz kuralların biz geliştiriceler değil tüm herkesin anlayabileceği formatta yukarıda görüyoruz. Müthiş 👌

→ İlk kuralımızda görüyoruz ki kaynakta görmek istediğimiz satır sayısı 5 ile 15 arasında olmalı eğer bundan fazla bir değer olursa sıkıntı var demektir. Bazı durumlarda satır sayısını kontrol ederek hedefe almak isteyebilirsiniz.

validator.expect_table_row_count_to_be_between(max_value=15, min_value=5)

→ İkinci kuralımız kolon sayısı 6'ya eşit olup olmama durumunu kontrol ediyor. Az veya fazla bir değer gördüğünde bizi uyarmasını istiyoruz.

validator.expect_table_column_count_to_equal(value=6)

→ Üçüncü kuralımız kolon isimlerini belirterek 6 kolon olacak ve isimleride bu şekilde olmalı. Bunları görmezsen, eksik veya fazla olduğu durumlarda yine uyarı vermesini istiyoruz.

validator.expect_table_columns_to_match_ordered_list(column_list=[“customer_id”,”customer_name”,”customer_surname”,”customer_city_id”,“customer_is_active”,”customer_created_date”])

→ Dördüncü kuralımızda eğer city_id kolununda 1–81 aralığı dışında bir değer görüyorsan ilgili değer yanlıştır ve bizi uyar diyoruz.

validator.expect_column_distinct_values_to_be_in_set(column=”customer_city_id”,value_set =set(list(range(1,82))))

→ Beşinci kuralımızda da customer_id kolonunda null değer olamaz eğer varsa beni bilgilendir diyoruz.

validator.expect_column_values_to_not_be_null(column=”customer_id”)

Şimdi test amaçlı gelin city_id değeri 82 olan bir kayıt girelim ve chekpoint’i tekrardan çalıştırıp sonuçları inceleyelim.

insert into cst.customer_info(customer_name,customer_surname,customer_city_id,customer_is_active) 
values ('ibrahim','yılmaz','82','h')

Görmüş olduğunuz gibi Statu kısmı Failed gözüküyor ve başarı oranımız %80'e düştü.

Data Docs sayesinde hatayı net bir şekilde hangi kuraldan ve neden dolayı takıldığını görüntüleyebiliyoruz.

Peki biz bu sonuçları Data Docs haricinde görüntüleyebilir miyiz ? Sorunun cevabı evet istersek bir database’de de depolayabiliriz. Gelin şimdi sonuçları PostgreSQL’e yazdırmak için gerekli adımları uygulayalım.

Great Exceptations projesini ilk başlatırken init komutu çalıştırmıştık hatırlıyorsanız o komut sonucunda great_expectations adında bir dizin oluşmuştu o dizinin içerisindeki great_expectations.yml dosyasına giderek aşağıda belirtilen kısımları ekliyoruz.

Evet Yukarıda görmüş olduğunuz ayarlamaları yaptıktan sonra credentials kısmında verdiğimiz dinamik değişken için yine great_expectations dizininde bulunan uncommitted klasöründeki config_variables.yml dosyasına giderek aşağıdaki tanımlamayı yapıyoruz.

Evet gerekli ayarlamaları yaptıktan sonra gelin checkpoint’i tekrardan çalıştıralım ve ilgili db’ye gidip analiz edelim.

Yukarıda görmüş olduğunuz gibi public şeması altında ge_validations_store adında bir tablo oluşturarak sonuçları burada biriktiyor. Artık biz SQL kullanarak da gerekli kontrolleri analiz edebiliriz. Ancak value kolonuna dikkat ettiyseniz bu kolon text tipinde gözüksede JSON formatta bir veri gelin isterseniz onun veri tipini değiştirerek bir görüntüleyelim.

select cast(value as json) value_json  from public.ge_validations_store gvs ;

Evet SQL yeteneğimizi kullanarak görmüş olacağınız üzere sonuçları analiz edebiliriz.

Peki Great Expectations aracını Airflow tarzı bir orkestra aracı ile nasıl kullanabiliriz. Senaryomuzda PostgreSQL üzerinde bulunan veriler Great Expectations tarafından analiz edilerek başarılı olması halinde Spark kodu çalıştıracak hata alması durumunda da bize Slack’ten bilgilendirme yapacak.

Bunun için yapmamız gereken ilk şey Airflow üzerinde GreatExpectationsOperator’ünü kullanamak için provider paketini yüklememiz gerekecek. Aşağıdaki komutla yükleme işlemini gerçekleştirebilirsiniz.

pip install airflow-provider-great-expectations

Dediğimiz senaryoyu deneyimleyebileceğimiz bir DAG dosyası hazırladım. Aşağıda sizlerle paylaşıyorum.

from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator


default_args = {
"owner": "emre",
"start_date" : datetime(2023,3,8),
"schedule_interval": "@daily",
"retries": 3,
"retry_delay": timedelta(minutes=1),
"tags": ["data-engineer"]
}

ge_root_dir = "/home/emre/Desktop/DE-EmreEvcimen2023/ge-medium/great_expectations"

def _get_message() -> str:
return "Data Validation Error!! Please Check Source Data"



with DAG(
dag_id = "ge-medium-working",
catchup=False,
default_args=default_args) as dag:


ge_validation_data = GreatExpectationsOperator(
task_id = "ge_validation_data",
data_context_root_dir= ge_root_dir,
checkpoint_name= "medium_checkpoint",
do_xcom_push= False
)

slack_notification = SlackWebhookOperator(
task_id = "send_slack_notification",
http_conn_id = "slack_conn",
message = _get_message(),
trigger_rule = "all_failed"
)

data_processing = SparkSubmitOperator(
task_id = "data_processing",
application="/home/emre/Desktop/DE-EmreEvcimen2023/ge-medium/spark-code/spark-code.py",
conn_id = "my_spark_conn",
verbose= False,
trigger_rule = "all_success"
)

ge_validation_data >> [data_processing, slack_notification]

Çalışacak olan Spark kodumuzu da aşağıya ekliyorum.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.master("local[*]").appName("GE-Medium-Project").getOrCreate()


url = "jdbc:postgresql://localhost:5432/ge_medium"
table_name = "cst.customer_info"
properties = {
"driver": "org.postgresql.Driver",
"user": "postgres",
"password": "****"
}


customer_info_df = spark.read.jdbc(url=url, table=table_name, properties=properties)

new_customer_info_df = customer_info_df \
.withColumn("customer_name", upper(col("customer_name"))) \
.withColumn("customer_surname", upper(col("customer_surname")))


new_customer_info_df \
.write \
.mode("overwrite") \
.option("header","true") \
.csv("/home/emre/Desktop/DE-EmreEvcimen2023/ge-medium/spark-code/customer_info")

Gelin şimdi Airflow Web UI üzerinde ilgili DAG’ı manuel olarak tetikleyerek çıktısına bakalım.

Evet görmüş olduğunuz gibi kaynak verimiz başarılı şekilde testten geçti ve spark kodu öyle çalıştı. Gelin isterseniz sıkıntılı bir veri girişi yaparak ilgili DAG’ı tekrardan çalıştıralım.

insert into cst.customer_info(customer_name,customer_surname,customer_city_id,customer_is_active) 
values ('emine','bozan','93','e');

Evet, şimdi kontrol kısmında sıkıntı oluştu ve spark kodu çalışmadı bunun yerine slack’ten bana aşağıdaki gibi bir mesaj geldi.

Umarım sizler için faydalı bir yazı olmuştur. Her türlü sorunuzu veya geri dönüşlerinizi benimle paylaşabilirsiniz. Okuduğunuz için teşekkürler. Görüşmek üzere ✋

--

--