Apache İceberg

Emre Evcimen
6 min readMar 27, 2024

Merhabalar, bu yazımda sizlere Apache İceberg’i teorik olarak daha sonrasında da uygulamalı örneklerle anlatmaya çalışacağım.İlk olarak Apache İceberg nedir, neden kullanılır ve nasıl çalışır bir ona bakalım.

Apache İceberg Nedir?

Apache Iceberg, büyük ölçekli veri depolama,sorgulama sistemlerinde kullanılan açık kaynaklı bir veri tabanı formatı ve işlem motorudur. Iceberg, büyük veri tabanlarının performansını artırmak, veri bütünlüğünü sağlamak ve veri işleme çalışmalarını daha güvenilir hale getirmek için tasarlanmıştır.

Apache İceberg’in kullanım nedenlerini aşağıdaki gibi örneklendirebiliriz.

Partitioning: Iceberg, verileri fiziksel olarak bölme yeteneğine sahiptir. Bu, sorguların yalnızca ilgili parçaları işlemesini sağlar, böylece sorgu performansı artar.

Transactionality: Iceberg, ACID özelliklerini destekler. Bu, veri yazma, güncelleme ve silme işlemlerinin güvenilir bir şekilde gerçekleştirilmesini sağlar.

Time Travel: Iceberg, geçmişteki veri sürümlerine erişim sağlar. Bu, veri bütünlüğünü kontrol etmek ve analiz yapmak için önemli bir özelliktir.

Schema Evolution: Iceberg, veri şemasının zamanla değişmesine izin verir. Bu, veri modelinde yapılan değişikliklerin kolayca yönetilmesini sağlar.

Kompakt Veri Formatı: Iceberg, sıkıştırılmış ve sütun tabanlı bir veri formatı kullanır. Bu, depolama alanını optimize eder ve sorgu performansını artırır.

Apache İceberg’in genel işleyişini resimde görebilirsiniz.

Metadata Katmanı (Metadata Layer): Bu katman, Iceberg tablolarının meta bilgilerini depolar ve yönetir. Metadata Layer, tabloların şemasını, bölme bilgilerini, sütun istatistiklerini, geçmiş veri sürümlerini ve diğer meta bilgilerini saklar. Bu meta bilgileri, tabloların yapılandırılmasını, veri işleme işlemlerini ve sorguların optimize edilmesini sağlar. Metadata genellikle bir veritabanında (örneğin, SQLite, Apache Derby) veya dağıtılmış bir depoda (örneğin, Apache Hive Metastore, Apache HBase) saklanır.

Veri Katmanı (Data Layer): Bu katman, verilerin fiziksel olarak depolandığı ve yönetildiği katmandır. Iceberg, genellikle Parquet veya ORC gibi sıkıştırılmış sütun tabanlı dosya formatlarını kullanır. Dosya katmanı, veri yazma, güncelleme, silme ve okuma işlemlerini yönetir. Ayrıca, dosya katmanı veri tablosunu fiziksel olarak bölerek tabakalandırma(partitioning) işlemlerini de gerçekleştirir.

Iceberg Catalog: İceberg tablosu ile işlem yapacak kişinin(read,write) ilk kontakt yeridir.Iceberg Catalog, bir Iceberg Tablosu için geçerli meta veri noktasını, yani bir tablonun geçerli meta veri dosyasına yönelik güncel noktayı saklar.

Teorik kısımlarına genel hatlarıyla değindiğimize göre gelin şimdi Spark ile birlikte kullanımına uygulamalı şekilde bir bakalım.

from pyspark.sql import SparkSession
import pyspark.sql.functions as F


url = "jdbc:sqlserver://your_hostname:1433;database=AdventureWorks2019;encrypt=true;trustServerCertificate=true"
properties = {
"user": "your_username",
"password": "your_password",
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

table_name = "Production.Product"



spark = SparkSession.builder.appName("apache-iceberg-working") \
.config("spark.jars.packages","org.apache.iceberg:iceberg-spark3-runtime:0.13.2") \
.config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.hive_prod","org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.hive_prod.type","hive") \
.config("spark.sql.catalog.hive_prod.uri","thrift://localhost:9083") \
.getOrCreate()

s_columns = ["ProductID" ,"Name" ,"ProductNumber" ,"MakeFlag" ,"Color" ,"ProductSubcategoryID" ,"StandardCost" ,"ListPrice","SellStartDate"]
productDF = spark.read.jdbc(url=url, table=table_name, properties=properties)
productDF = productDF.select(*s_columns)


spark.sql("""CREATE TABLE hive_prod.de.d_product (
ProductID INT,
Name STRING,
ProductNumber STRING,
MakeFlag BOOLEAN ,
Color STRING,
ProductSubCategoryID INT,
StandardCost DECIMAL(19,4),
ListPrice DECIMAL(19,4),
SellStartDate TIMESTAMP
)
USING iceberg""")


productDF.write.format("iceberg").mode("overwrite").saveAsTable("hive_prod.de.d_product")
spark.sql("SELECT * FROM hive_prod.de.d_product").show()

"spark.sql.catalog.hive_prod" = "org.apache.iceberg.spark.SparkCatalog" 
"spark.sql.catalog.hive_prod.type" = "hive"
"spark.sql.catalog.hive_prod.uri" = "thrift://localhost:9083"

Teori kısmında Catalog için ne demiştik İceberg kısmında yazma veya okuma işlemi yapmak istiyorsak kuracağımız ilk kontakt burasıydı. Apache Iceberg Catalog için birden fazla sistemi desteklemektedir.(Hadoop,Hive,Minio,S3 vb.) Ben metadata bilgilerini saklamak için Hive Metastore’u tercih ettim. O yüzden yukarıdaki parametreleri set ederek Spark içerisinde oluşucak Catalog’a bunu bildiriyorum.

Kodu çalıştırdığımda görmüş olduğunuz gibi Iceberg tablosunu başarılı şekilde oluşturup tabloya yazdı. Oluşturduğu bu tabloyu hive tablolarının hdfs de bulunduğu dizine bendeki path(/user/hive/warehouse/) kaydediyor. Gelin yapısına bir bakalım.

hdfs dfs -ls /user/hive/warehouse/de.db/d_product 
Found 2 items
drwxrwxrwx - emre supergroup 0 2024-03-27 12:56 /user/hive/warehouse/de.db/d_product/data
drwxrwxrwx - emre supergroup 0 2024-03-27 12:56 /user/hive/warehouse/de.db/d_product/metadata
hdfs dfs -ls /user/hive/warehouse/de.db/d_product/metadata
Found 4 items
-rw-r--r-- 1 emre supergroup 2720 2024-03-27 12:51 /user/hive/warehouse/de.db/d_product/metadata/00000-fbeaa252-1d1a-4eae-9414-4e0e03b31e8d.metadata.json
-rw-r--r-- 1 emre supergroup 4745 2024-03-27 12:56 /user/hive/warehouse/de.db/d_product/metadata/00001-0991683d-a7e5-400e-9aaf-504f9f23b840.metadata.json
-rw-r--r-- 1 emre supergroup 6442 2024-03-27 12:56 /user/hive/warehouse/de.db/d_product/metadata/560f4281-2bb7-4ce6-8d87-ebaf9c8c950b-m0.avro
-rw-r--r-- 1 emre supergroup 3779 2024-03-27 12:56 /user/hive/warehouse/de.db/d_product/metadata/snap-8740618567573872932-1-560f4281-2bb7-4ce6-8d87-ebaf9c8c950b.avro
hdfs dfs -ls /user/hive/warehouse/de.db/d_product/data
Found 1 items
-rw-r--r-- 1 emre supergroup 9306 2024-03-27 12:56 /user/hive/warehouse/de.db/d_product/data/00000-0-91308c0d-b6b6-4c60-867d-fd6c4f0829cb-00001.parquet

Iceberg tablosunun arka tarafta nasıl depolandığına baktığımızda metadata ve data dosyalarına ayrıldığını görüyoruz. Teori kısmında değindiğimiz gibi verileri metadata ve data olmak üzere farklı katmanlarda saklıyor.

Şimdi gelin isterseniz bir UPDATE sorgusu çalıştıralım.

spark.sql("UPDATE hive_prod.de.d_product SET Color='Pink' WHERE ProductID=1")
spark.sql("SELECT * FROM hive_prod.de.d_product WHERE ProductID=1").show()

Kodu ilk çalıştırdığımda ProductID’si bir olan kaydın Color kolonu null olarak gelmekteydi. Update sorgusunu çalıştırdığımız başarılı şekilde ilgili kolonu güncellemiş olduk.

İceberg tablolarının metadata bilgileri sorgulamak istersek eğer tablomuzu yazdıktan sonra * .files, *. entries , * .snapshots ,*. manifests ,*. partitions gibi keyword’ler girerek bu değerlere ulaşabiliriz.

spark.sql("SELECT * FROM hive_prod.de.d_product.snapshots").show()

snapshots bilgisi bize tabloyla ilgili geçmişe yönelik nasıl işlemler olduğuyla ilgili bilgi vermekte ve time travel bir sorgu çalıştırmak istediğimizde bunu yapmamıza olanak sağlayacak bilgileri tutar.Yukarıda gördüğümüz gibi uygulama boyunca yaptığımız iki işlem vardı. İlki tabloyu oluşturup veri yazmak daha sonrasında da bir update sorgusu çalıştırmak. Bu işlemlerle ilgili detay bilgiyi 2 farklı satırda bize sunuyor. Gelin isterseniz time travel bir sorgu çalıştıralım. Yani tablonun update etmeden önceki halini sorgulayalım.

spark.read \
.option("snapshot-id", 8740618567573872932) \
.format("iceberg") \
.load("hive_prod.de.d_product") \
.show()

Sorgu sonucuna baktığımızda ProductID’si 1 olan kaydın Color kolonunun değeri null gözüküyor. Yani biz update etmeden önceki hali.

Spark haricinde Iceberg tablolarını sorgulayabileceğimiz herhangi bir UI var mı diye soracak olursanız onun içinde Dremio aracını sizlere tavsiye ederim şimdi gelin Dremio aracını kullanarak tablomuzu sorgulayalım.

Dremio servisini başlatma:

emre@emre-lenovo:~/dremio/bin$ ./dremio start
emre@emre-lenovo:~/dremio/bin$ ./dremio status
dremio is running.

9047 portuna giderek Dremio Web UI ekranına ulaşabilirsiniz.

Iceberg tablosunu okumadan önce Hive Metastore bağlantımızı oluşturalım.

Bağlantımızı oluşturduktan sonra Iceberg tablomuza sorgu atabiliriz.

Görmüş olduğunuz gibi Iceberg tablosunu UI kullanarak da sorgulamış olduk.

Yukarıda görüldüğü gibi Update işlemi de yapabiliyoruz.

Dremio üzerinden metadata bilgilerine ulaşmak istersek de aşağıdaki gibi kullanımlar mevcut.

Dremio üzerinden time travel bir sorgu atmak istediğimizde ise aşağıdaki gibi bir kullanımla ilerleyebiliriz.

Son olarak Presto aracını kullanarak da Iceberg tablolarını sorgulayabiliriz.

Presto için Iceberg bağlantısı oluşturma ve servisi çalıştırma:

emre@emre-lenovo:~/Presto/presto-server-0.283/etc/catalog$ pwd
/home/emre/Presto/presto-server-0.283/etc/catalog
emre@emre-lenovo:~/Presto/presto-server-0.283/etc/catalog$ cat iceberg.properties
connector.name=iceberg
hive.metastore.uri=thrift://localhost:9083
iceberg.catalog.type=hive

emre@emre-lenovo:~/Presto/presto-server-0.283/bin$ ./launcher start

DBeaver kullanarak Presto sunucusuna bağlanma:

Presto kullanarak da Iceberg tablolarımızı sorgulayabiliriz.

Şimdi Python’da Iceberg için oluşturulmuş pyiceberg kütüphanesi üzerinde birkaç işlem yapalım.

Kütüphanenin yüklenmesi:

(icebergenv) emre@emre-lenovo:~$ pip3 install "pyiceberg[s3fs,hive]"
(icebergenv) emre@emre-lenovo:~$ pyiceberg --uri thrift://localhost:9083 list
de
default
football
nyc
school

Her sefereinde metadata bilgileri için uri gibi bilgileri girmek istemezseniz aşağıdaki gibi bir yaml dosyası oluşturabilirsiniz.

vi ~/.pyiceberg.yaml
---
catalog:
default:
uri: thrift://localhost:9083
s3.endpoint: hdfs://localhost:9000
(icebergenv) emre@emre-lenovo:~$ pyiceberg list
de
default
football
nyc
school
from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import GreaterThanOrEqual


catalog = load_catalog("default")
print(catalog.list_namespaces())
print(catalog.list_tables("de"))
# export CLASSPATH=`$HADOOP_HOME/bin/hdfs classpath --glob`

product_table = catalog.load_table("de.d_product")

scan = product_table.scan(
row_filter=GreaterThanOrEqual("ListPrice", 1),
selected_fields=("ProductID", "Color", "MakeFlag"),
limit=100
)

scan_df = scan.to_pandas()

print(scan_df.head())

PyIceberg kütüphanesi kullanarak da Iceberg tablolarıyla ilgili çoğu işlemleri yönetebiliyoruz.

Bu yazımda sizlere Apache İceberg’i genel hatlarıyla açıklayıp daha sonrasında da kullanımlarıyla ilgili örnekler vermeye çalıştım. Umarım sizler için faydalı olmuştur. Herhangi bir sorunuz ve öneriniz olursa benimle iletişime geçebilirsiniz👏👏

--

--