Debezium ve Apache Pinot ile CDC(Change Data Capture) Analizi

Emre Evcimen
13 min readMar 7, 2023

Merhabalar bu yazımda sizlere OLTP bir sistemde gerçekleşen değişikleri Debezium ile anlık olarak yakalayarak Apache Pinot ile eş zamanlı takibini yapacağımız bir proje hazırlayacağım. Bunun için ilk önce bilmemiz gereken terimleri ve tool’ları kısaca bir özetleyelim.

Change Data Capture: CDC, bir veritabanı yönetim sistemi(DBMS) tarafından yapılan değişiklikleri takip etmek, bunları kaydetmek ve diğer sistemlerin bu değişiklerden haberdar olmasına sağlayan bir tekniktir.

CDC Önemi: CDC, veritabanındaki değişikleri hızlı bir şekilde algılayabilir ve bu değişikliklerin diğer sistemlerde veya uygulamalarda hemen yansıtılmasını sağlayabilir. Bu nedenle, CDC, veri entegrasyonu sürecinde veri uyumsuzluğunu azaltarak veri bütünlüğünü sağlayabilir.

Debezium: Debezium ise CDC tekniğini uygulamamızı sağlayacak açık kaynaklı bir CDC aracıdır. Debezium bir veritabınındaki değişikleri gerçek zamanlı olarak izler ve bu değişikleri Apache Kafka’ya mesaj olarak yayınlar. Debezium, veritabanı değişikliklerini takip etmek için veritabanının log dosyalarını kullanır ve böylece veritabanında hiçbir değişiklik yapmaz.

Apache Pinot(real-time OLAP Datastore): Apache Pinot, büyük ölçekli veri analizi için açık kaynaklı bir veri işleme ve sorgulama motorudur. Apache Pinot, özellikle gerçek zamanlı verilerin işlenmesi ve sorgulanması için tasarlanmıştır.Pinot, verilerin hızlı bir şekilde yüklenmesine, sorgulanmasına ve analiz edilmesine olanak tanır.

Apache Pinot’un ana bileşenleri şunlardır:

1 → Controller: Pinot’un ana yönetim bileşenidir. İş yüklerini kontrol eder ve verilerin dağıtımını yönetir.

2 → Broker: Veri taleplerinin kabul edildiği ve sorguların yapıldığı ana sorgulama arayüzüdür.

3 → Server: Verilerin depolandığı ve sorguların yapıldığı işlem düğümleridir.

4 → ZooKeeper: Pinot kümesinin yapılandırmasını, yönetimini ve koordinasyonunu yönetir.

5 → Helix: Pinot’un otomatik ölçeklenebilirlik ve yüksek kullanılabilirlik özelliklerinden sorumludur.

6 → Pinot Query Language (PQL): Pinot’a özgü bir sorgu dilidir. PQL, SQL benzeri bir dil kullanarak Pinot verilerini sorgulamak için kullanılır.7

7 → Pinot Control API: Pinot yöneticilerinin Pinot kümesini yönetmek için kullanabilecekleri bir API’dir.

Evet gerekli ön bilgileri edindiğimize göre gelin isterseniz proje mimarimize görsel olarak bir bakalım.

Mimaride görmüş olduğunuz gibi tool’ların çoğunu docker ile ayağa kaldırıyorum. Sadece yazmış olduğum python kodları lokalimden çalıştırıyorum. İlgili servisler için kullandığım docker-compose.yml dosyasını aşağıda bulabilirsiniz.

---
version: '3.3'
services:
zookeeper-1:
image: confluentinc/cp-zookeeper:latest
ports:
- '32181:32181'
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888


zookeeper-2:
image: confluentinc/cp-zookeeper:latest
ports:
- '42181:42181'
environment:
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_CLIENT_PORT: 42181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888


zookeeper-3:
image: confluentinc/cp-zookeeper:latest
ports:
- '52181:52181'
environment:
ZOOKEEPER_SERVER_ID: 3
ZOOKEEPER_CLIENT_PORT: 52181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SERVERS: zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888


kafka-1:
image: confluentinc/cp-kafka:latest
ports:
- '9092:9092'
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181,zookeeper-2:42181,zookeeper-3:52181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3


kafka-2:
image: confluentinc/cp-kafka:latest
ports:
- '9093:9093'
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181,zookeeper-2:42181,zookeeper-3:52181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29093,EXTERNAL://localhost:9093
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3

kafka-3:
image: confluentinc/cp-kafka:latest
ports:
- '9094:9094'
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181,zookeeper-2:42181,zookeeper-3:52181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29094,EXTERNAL://localhost:9094
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_NUM_PARTITIONS: 3

connect:
image: debezium/connect:1.4
ports:
- 8083:8083
restart: always
environment:
- BOOTSTRAP_SERVERS=kafka-1:29092,kafka-2:29093,kafka-3:29094
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
depends_on:
- kafka-1
- kafka-2
- kafka-3

schema-registry:
image: confluentinc/cp-schema-registry:latest
hostname: schema-registry
depends_on:
- kafka-1
- kafka-2
- kafka-3
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka-1:29092,kafka-2:29093,kafka-3:29094'
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081


akhq:
image: tchiotludo/akhq:latest
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: "kafka-1:29092,kafka-2:29093,kafka-3:29094"
schema-registry:
url: "http://schema-registry:8081"
connect:
- name: "connect"
url: "http://connect:8083"
depends_on:
- schema-registry
ports:
- 8085:8080
links:
- kafka-1
- kafka-2
- kafka-3
- schema-registry
- connect

pinot-controller:
image: apachepinot/pinot:release-0.7.1
hostname: pinot-controller
volumes:
- ./pinot-docker-demo/pinot/controller:/tmp/data/controller
ports:
- "9000:9000"
command: StartController -zkAddress zookeeper-1:32181,zookeeper-2:42181,zookeeper-3:52181
restart: always
depends_on:
- kafka-1
- kafka-2
- kafka-3

pinot-broker:
image: apachepinot/pinot:release-0.7.1
hostname: pinot-broker
ports:
- "8099:8099"
restart: always
command: StartBroker -zkAddress zookeeper-1:32181,zookeeper-2:42181,zookeeper-3:52181
depends_on:
- kafka-1
- kafka-2
- kafka-3
- pinot-controller

pinot-server:
image: apachepinot/pinot:release-0.7.1
hostname: pinot-server
volumes:
- ./pinot-docker-demo/pinot/server:/tmp/data/server
ports:
- "8098:8098"
restart: always
command: StartServer -zkAddress zookeeper-1:32181,zookeeper-2:42181,zookeeper-3:52181
depends_on:
- kafka-1
- kafka-2
- kafka-3
- pinot-controller

postgres:
image: debezium/postgres
ports:
- 5434:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=123456
- POSTGRES_DB=DebeziumDB
depends_on:
- pinot-server

portainer:
image: portainer/portainer-ce:latest
container_name: portainer
restart: unless-stopped
security_opt:
- no-new-privileges:true
volumes:
- /etc/localtime:/etc/localtime:ro
- /var/run/docker.sock:/var/run/docker.sock:ro
- ./portainer-data:/data
ports:
- 9001:9000
depends_on:
- postgres

Servisleri ayağa kaldırmak ve durumlarını hem CLI’dan hem de docker-compose.yml içerisinde gördüğünüz Portainer servisi ile kontrol etmek için aşağıdaki adımlarını uygulayalım.

docker-compose up -d

Output:
Creating network "apachepinot_default" with the default driver
Creating apachepinot_zookeeper-3_1 ... done
Creating apachepinot_zookeeper-2_1 ... done
Creating apachepinot_zookeeper-1_1 ... done
Creating apachepinot_kafka-2_1 ... done
Creating apachepinot_kafka-3_1 ... done
Creating apachepinot_kafka-1_1 ... done
Creating apachepinot_connect_1 ... done
Creating apachepinot_pinot-controller_1 ... done
Creating apachepinot_schema-registry_1 ... done
Creating apachepinot_akhq_1 ... done
Creating apachepinot_pinot-broker_1 ... done
Creating apachepinot_pinot-server_1 ... done
Creating apachepinot_postgres_1 ... done
Creating portainer ... done
docker-compose ps 

Output:

Name Command State Ports
------------------------------------------------------------------------------------
apachepinot_akhq_1 docker- Up (health: 0.0.0.0:8085->8080
entrypoint.sh starting) /tcp,:::8085->8080
./akhq /tcp
apachepinot_connect /docker- Up 0.0.0.0:8083->8083
_1 entrypoint.sh start /tcp,:::8083->8083
/tcp, 8778/tcp,
9092/tcp, 9779/tcp
apachepinot_kafka-1 /etc/confluent/dock Up 0.0.0.0:9092->9092
_1 er/run /tcp,:::9092->9092
/tcp
apachepinot_kafka-2 /etc/confluent/dock Up 9092/tcp, 0.0.0.0:
_1 er/run 9093->9093/tcp,:::
9093->9093/tcp
apachepinot_kafka-3 /etc/confluent/dock Up 9092/tcp, 0.0.0.0:
_1 er/run 9094->9094/tcp,:::
9094->9094/tcp
apachepinot_pinot- ./bin/pinot- Up 8096/tcp,
broker_1 admin.sh Start ... 8097/tcp,
8098/tcp, 0.0.0.0:
8099->8099/tcp,:::
8099->8099/tcp,
9000/tcp
apachepinot_pinot- ./bin/pinot- Up 8096/tcp,
controller_1 admin.sh Start ... 8097/tcp,
8098/tcp,
8099/tcp, 0.0.0.0:
9000->9000/tcp,:::
9000->9000/tcp
apachepinot_pinot- ./bin/pinot- Up 8096/tcp,
server_1 admin.sh Start ... 8097/tcp, 0.0.0.0:
8098->8098/tcp,:::
8098->8098/tcp,
8099/tcp, 9000/tcp
apachepinot_postgre docker- Up 0.0.0.0:5434->5432
s_1 entrypoint.sh /tcp,:::5434->5432
postgres /tcp
apachepinot_schema- /etc/confluent/dock Up 0.0.0.0:8081->8081
registry_1 er/run /tcp,:::8081->8081
/tcp
apachepinot_zookeep /etc/confluent/dock Up 2181/tcp,
er-1_1 er/run 2888/tcp, 0.0.0.0:
32181->32181/tcp,:
::32181->32181/tcp
, 3888/tcp
apachepinot_zookeep /etc/confluent/dock Up 2181/tcp,
er-2_1 er/run 2888/tcp,
3888/tcp, 0.0.0.0:
42181->42181/tcp,:
::42181->42181/tcp
apachepinot_zookeep /etc/confluent/dock Up 2181/tcp,
er-3_1 er/run 2888/tcp,
3888/tcp, 0.0.0.0:
52181->52181/tcp,:
::52181->52181/tcp
portainer /portainer Up 8000/tcp, 0.0.0.0:
9001->9000/tcp,:::
9001->9000/tcp,
9443/tcp

http://localhost:9001 → Portainer Web UI

Servislerin ayağa kalkması belli bir zaman alabilir. Kontrolleri sağladığımızda hepsinin State Kısmını CLI’da Up Portainer’da running olarak görüyorsak her şey yolunda gidiyor demektir.

Gerekli kontrolleri yaptıktan sonra artık projemize başlayabiliriz. Yazının en başında ne demiştik Change Data Capture bir RDBMS sistemdeki tablolarda gerçekleşen değerleri yakalamımızı sağlıyor. O zaman bize RDBMS sisteminde varolan ve UPSERT işlemi gerçekleştirebileceğimiz bir tablo lazım. Proje için oluşturacağım tablo yapısı aşağıdaki gibidir.

İlgili tablo’yu DBeaver üzerinden docker ile ayağa kaldırdığım container’ın connection bilgileri ile bağlantı sağlayarak oluşturuyorum.

create table public.game
(
game_id SERIAL primary key,
room_id int,
map_name varchar(250),
red_team_member smallint,
blue_team_member smallint,
game_status varchar(50),
winner_team varchar(250),
started_date timestamp default current_timestamp,
finished_date timestamp
);

-- Debezium'un verinin önceki ve sonraki halini görmesi için
ALTER TABLE public.game REPLICA IDENTITY FULL;

Evet CDC yapımızın değişiklikleri takip etmesini isteyebileceğimiz bir tablomuz mevcut artık.

Sırada CDC tekniğini uygulayacak tool’umuz olan Debezium için bir connect oluşturmak. Bu işlemi lokalimde kurulu olan Postman ile yapacağım. Postman kurulu olmayan kişiler curl komutunu kullanarak da süreci ilerletebilir.

{
"name": "debezium-medium-project",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "123456",
"database.dbname": "DebeziumDB",
"database.server.name": "postgres",
"table.include.list": "public.game",
"table.whitelist": "public.game"
}
}

AKHQ Web UI üzerinden kontrol ettiğimizde connect başarılı şekilde oluşmuş olarak gözüküyor. O zaman python ile hazırladığımız generator scriptini çalıştırarak ilgili tabloya biraz veri atalım.

import psycopg2
import random as rd

def database_connection():
try:
conn = psycopg2.connect(
host="localhost",
port="5434",
database="DebeziumDB",
user="postgres",
password="123456"
)
return conn
except psycopg2.Error as e:
print(e.pgerror)


def load_data_psql():
conn = database_connection()
cur = conn.cursor()
query = """
INSERT INTO public.game (room_id, map_name, red_team_member, blue_team_member, game_status)
VALUES (%s, %s, %s, %s, %s) """
maps = ["map_a","map_b","map_c","map_d"]
game_status = ["Game Started","Game finished"]
rooms = list(range(1,21))
dict_data = []
for _ in range(1,21):
map_ = rd.choice(maps)
red_team_member = rd.randint(1,6)
blue_team_member = rd.randint(1,6)
game_stat = game_status[0]
room_id = rd.choice(rooms)
data_ = {
"map_name": map_,
"red_team_member": red_team_member,
"blue_team_member": blue_team_member,
"game_status": game_stat,
"room_id": room_id
}
dict_data.append(data_)
rooms.remove(room_id)

for row in dict_data:
cur.execute(query, (row["room_id"], row["map_name"], row["red_team_member"]
, row["blue_team_member"], row["game_status"]))


conn.commit()
cur.close()
conn.close()


load_data_psql()

python scriptini çalıştırdıktan sonra hem PostgreSQL’i hem de AKHQ Web UI üzerinden Kafka’yı kontrol ettiğimizde başarılı şekilde hem RDBMS sisteme verileri yazdığımızı hemde CDC yapısını kurgulayan Debezium servisinin başarılı şekilde çalıştığını görmekteyiz.

CLI üzerinden de ilgili topic’in içeriğini kontrol etmek istersek aşağıdaki komutu kullanabiliriz. Ben lokalimde kafka cli komutlarını çalıştırabildiğim için aşağıdaki sorguyu çalıştırabilirim. Ancak sizin yoksa ilk önce ilgili container’a bağlanıp oradan çalıştırmanız gerekli.

kafka-console-consumer \
--bootstrap-server localhost:9092 \
--from-beginning \
--topic postgres.public.game | jq
# Docker Container'a bağlanarak ise:

docker-compose exec kafka-1 bash
kafka-console-consumer \
--bootstrap-server localhost:29092 \
--from-beginning \
--topic postgres.public.game

Debezium tarafından RDBMS sistemde yapılan değişiklerin yakalanıp Kafka’ya aktarımı için örnek bir mesaj yapısını aşağıda sizlerle paylaşıyorum.

{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "game_id"
},
{
"type": "int32",
"optional": true,
"field": "room_id"
},
{
"type": "string",
"optional": true,
"field": "map_name"
},
{
"type": "int16",
"optional": true,
"field": "red_team_member"
},
{
"type": "int16",
"optional": true,
"field": "blue_team_member"
},
{
"type": "string",
"optional": true,
"field": "game_status"
},
{
"type": "string",
"optional": true,
"field": "winner_team"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "started_date"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "finished_date"
}
],
"optional": true,
"name": "postgres.public.game.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "game_id"
},
{
"type": "int32",
"optional": true,
"field": "room_id"
},
{
"type": "string",
"optional": true,
"field": "map_name"
},
{
"type": "int16",
"optional": true,
"field": "red_team_member"
},
{
"type": "int16",
"optional": true,
"field": "blue_team_member"
},
{
"type": "string",
"optional": true,
"field": "game_status"
},
{
"type": "string",
"optional": true,
"field": "winner_team"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "started_date"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "finished_date"
}
],
"optional": true,
"name": "postgres.public.game.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"field": "transaction"
}
],
"optional": false,
"name": "postgres.public.game.Envelope"
},
"payload": {
"before": null,
"after": {
"game_id": 9,
"room_id": 11,
"map_name": "map_d",
"red_team_member": 3,
"blue_team_member": 1,
"game_status": "Game Started",
"winner_team": null,
"started_date": 1678140431031417,
"finished_date": null
},
"source": {
"version": "1.4.2.Final",
"connector": "postgresql",
"name": "postgres",
"ts_ms": 1678140431037,
"snapshot": "false",
"db": "DebeziumDB",
"schema": "public",
"table": "game",
"txId": 558,
"lsn": 23828016,
"xmin": null
},
"op": "c",
"ts_ms": 1678140431222,
"transaction": null
}
}

Bizim projemizde daha çok odaklanacağımız kısım payload kısmı olacak.

....
"payload": {
"before": null,
"after": {
"game_id": 9,
"room_id": 11,
"map_name": "map_d",
"red_team_member": 3,
"blue_team_member": 1,
"game_status": "Game Started",
"winner_team": null,
"started_date": 1678140431031417,
"finished_date": null
}
...
update public.game
set red_team_member= 6
where game_id=6;

update public.game
set blue_team_member= 7
where game_id=6;

update public.game
set blue_team_member= 3
where game_id=12;

update public.game
set blue_team_member= 6
where game_id=12;

update public.game
set blue_team_member= 5
where game_id=7;

update public.game
set blue_team_member= 4
where game_id=7;

update public.game
set finished_date = current_timestamp
where game_id=6;

update public.game
set winner_team = 'blue_team'
where game_id=6;

update public.game
set blue_team_member= 4
where game_id=7;

update public.game
set blue_team_member= 7
where game_id=7;

update public.game
set finished_date = current_timestamp
where game_id=7;

update public.game
set winner_team = 'read_team'
where game_id=7;

Evet şimdi kaynak sistemimizde biraz değişiklik yapalım ve kafka topic üzerindeki değişime bakalım.

Görmüş olduğunuz gibi ilk önce 20 kayıt insert ettik daha sonrasında 12 adet update işlemi gerçekleştirdik. Hepsini yakalamış ve ilgili topic’e iletmiş. Süper :)

.......
"payload": {
"before": {
"game_id": 6,
"room_id": 15,
"map_name": "map_c",
"red_team_member": 6,
"blue_team_member": 7,
"game_status": "Game Started",
"winner_team": null,
"started_date": 1678140431031417,
"finished_date": 1678152627096661
},
"after": {
"game_id": 6,
"room_id": 15,
"map_name": "map_c",
"red_team_member": 6,
"blue_team_member": 7,
"game_status": "Game Started",
"winner_team": "blue_team",
"started_date": 1678140431031417,
"finished_date": 1678152627096661
},
"source": {
"version": "1.4.2.Final",
"connector": "postgresql",
"name": "postgres",
"ts_ms": 1678141841198,
"snapshot": "false",
"db": "DebeziumDB",
"schema": "public",
"table": "game",
"txId": 567,
"lsn": 23839624,
"xmin": null
},
"op": "u",
"ts_ms": 1678141841308,
"transaction": null
}
}

Evet update edilen bir kaydın da Debezium tarafından yakalanması ve kafka’ya iletilmesi yukarıdaki gibi bir yapıda oluyor. Görmüş olduğunuz gibi before kısmı bir önceki gibi null olarak değil değişiklik yapılmadan önceki değerleri içererek gözüküyor.

İstediğimiz gibi yapılan değişikleri yakalabiliyoruz. Ancak formata baktığımızda uzun bir json yapısı öylece bize bakıyor. game_id bazlı yapılan değişikliklere bakmak istesek bunun sorgusunu yazmak oldukça zor ve efor gerektiyor. İşte burada imdadımıza Apache Pinot yetişiyor. Gelin isterseniz şimdi Apache Pinot tarafına geçelim.

Apache Pinot tarafında sorgu yazabilmemiz için öncelikle bir tablo ve bir de şema tanımlaması oluşturmamız gerekiyor.

# debezium-schema-definition.json
{
"schemaName": "medium_debezium",
"dimensionFieldSpecs": [
{
"name": "op",
"dataType": "STRING",
"defaultNullValue": ""
},
{
"name": "tsms",
"dataType": "LONG"
},
{
"name": "room_id",
"dataType": "INT"
},
{
"name": "game_id",
"dataType": "INT"
},
{
"name": "map_name",
"dataType": "STRING",
"defaultNullValue": ""
},
{
"name": "before_red_team_member",
"dataType": "STRING",
"defaultNullValue": ""
},
{
"name": "after_red_team_member",
"dataType": "STRING",
"defaultNullValue": ""
},
{
"name": "before_blue_team_member",
"dataType": "STRING",
"defaultNullValue": ""
},
{
"name": "after_blue_team_member",
"dataType": "STRING",
"defaultNullValue": ""
},
{
"name": "before_game_status",
"dataType": "STRING",
"defaultNullValue": ""
},
{
"name": "after_game_status",
"dataType": "STRING",
"defaultNullValue": ""
},
{
"name": "before_winner_game",
"dataType": "STRING",
"defaultNullValue": ""
},
{
"name": "after_winner_game",
"dataType": "STRING",
"defaultNullValue": ""
},
{
"name": "before_finished_date",
"dataType": "STRING",
"defaultNullValue": ""
},
{
"name": "after_finished_date",
"dataType": "STRING",
"defaultNullValue": ""
}
],
"metricFieldSpecs": [],
"dateTimeFieldSpecs": [
{
"name": "ts_ms",
"dataType": "LONG",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
#debezium-table-definition.json

{
"tableName": "medium_debezium",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts_ms",
"timeType": "MILLISECONDS",
"schemaName": "medium_debezium",
"replicasPerPartition": "1"
},
"tenants": {},
"fieldConfigList": [],
"tableIndexConfig": {
"noDictionaryColumns": [],
"loadMode": "MMAP",
"nullHandlingEnabled": "true",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "simple",
"stream.kafka.topic.name": "postgres.public.game",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "PLAINTEXT://kafka-1:29092,PLAINTEXT://kafka-2:29093,PLAINTEXT://kafka-3:29093",
"realtime.segment.flush.threshold.time": "12h",
"realtime.segment.flush.threshold.size": "100000",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}
},
"metadata": {
"customConfigs": {}
},
"ingestionConfig": {
"transformConfigs": [
{
"columnName": "op",
"transformFunction": "JSONPATHSTRING(payload, '$.op')"
},
{
"columnName": "tsms",
"transformFunction": "JSONPATHSTRING(payload, '$.ts_ms')"
},
{
"columnName": "room_id",
"transformFunction": "JSONPATHSTRING(payload, '$.after.room_id')"
},
{
"columnName": "game_id",
"transformFunction": "JSONPATHSTRING(payload, '$.after.game_id')"
},
{
"columnName": "map_name",
"transformFunction": "JSONPATHSTRING(payload, '$.after.map_name')"
},
{
"columnName": "before_red_team_member",
"transformFunction": "JSONPATHSTRING(payload, '$.before.red_team_member')"
},
{
"columnName": "after_red_team_member",
"transformFunction": "JSONPATHSTRING(payload, '$.after.red_team_member')"
},
{
"columnName": "before_blue_team_member",
"transformFunction": "JSONPATHSTRING(payload, '$.before.blue_team_member')"
},
{
"columnName": "after_blue_team_member",
"transformFunction": "JSONPATHSTRING(payload, '$.after.blue_team_member')"
},
{
"columnName": "before_game_status",
"transformFunction": "JSONPATHSTRING(payload, '$.before.game_status')"
},
{
"columnName": "after_game_status",
"transformFunction": "JSONPATHSTRING(payload, '$.after.game_status')"
},
{
"columnName": "before_winner_game",
"transformFunction": "JSONPATHSTRING(payload, '$.before.winner_game')"
},
{
"columnName": "after_winner_game",
"transformFunction": "JSONPATHSTRING(payload, '$.after.winner_game')"
},
{
"columnName": "before_finished_date",
"transformFunction": "JSONPATHSTRING(payload, '$.before.finished_date')"
},
{
"columnName": "after_finished_date",
"transformFunction": "JSONPATHSTRING(payload, '$.before.finished_date')"
}
]
}
}

ilgili tanımlamaları oluşturduktan sonra Apache Pinot üzerinde sorgu yazacağımız tabloyu oluşturabiliriz. Bunun için aşağıdaki adımları uygulamalıyız.

docker-compose exec pinot-controller bash
vi debezium-schema-definition.json
vi debezium-table-definition.json
/opt/pinot/bin/pinot-admin.sh  \
AddTable \
-tableConfigFile /opt/pinot/debezium-table-definition.json \
-schemaFile /opt/pinot/debezium-schema-definition.json -exec

Output:
Executing command: AddTable -tableConfigFile /opt/pinot/debezium-table-definition.json -schemaFile /opt/pinot/debezium-schema-definition.json -controllerProtocol http -controllerHost 192.168.192.9 -controllerPort 9000 -exec
Sending request: http://192.168.192.9:9000/schemas to controller: pinot-controller, version: Unknown
{"status":"Table medium_debezium_REALTIME succesfully added"}

http://localhost:9000

Apache Pinot tarafında istediğimiz yapıda tabloyu oluşturduk. Web UI kısmında kontrollerimizi yaptığımızda da görüyoruz ki tablo oluşmuş ve Status kısmı iyi yeşil ışıkta gözüküyor. Gelin şimdi sorgumuzu çalıştıralım.

SELECT tsms,
op,
room_id,
game_id,
map_name,
before_red_team_member,
after_red_team_member,
before_blue_team_member,
after_blue_team_member,
before_game_status,
after_game_status,
before_winner_game,
after_winner_game
FROM medium_debezium limit 100

Görmüş olduğunuz gibi Pinot üzerinde yazdığımız SQL sorgusu ile birlikte CDC takibini rahatlıkla yapabiliriz.

Python ile de Apache Pinot’a sorgu göndermek mümkün bunun için pinotdb kütüphanesini yüklemeniz gerekli → pip install pinotdb komutunu çalıştırabilirsiniz.

from pinotdb import connect
from datetime import datetime
import pandas as pd
import numpy as np


def game_detail(game_id):
conn = connect(host='localhost', port=8099, path='/query/sql', scheme='http')
curs = conn.cursor()
curs.execute(f"""
SELECT tsms,
op,
room_id,
game_id,
map_name,
before_red_team_member,
after_red_team_member,
before_blue_team_member,
after_blue_team_member,
before_game_status,
after_game_status,
before_winner_game,
after_winner_game
FROM medium_debezium
where game_id = {game_id}
""")
df = pd.DataFrame(
curs,
columns=[item[0] for item in curs.description]
)
df["date"] = [pd.to_datetime(x,infer_datetime_format=True, unit='ms') for x in df["tsms"]]
df.drop('tsms', axis=1, inplace=True)
df.to_csv(f"cdc-{game_id}-tracking.csv", index=False, mode="+w")


game_detail(6)
game_detail(7)

İlgili fonksiyon çalıştıktan sonra csv olarak elde ettiğim dosyaları aşağıda görebilirsiniz.

Görmüş olduğunuz gibi csv dosyasını incelediğimde game_id’si 6 olan kayıt için yapılan insert işlemleri ve update işlemlerini Apache Pinot sayesinde de anlık olarak adım adım takip edebiliyoruz.

Umarım faydalı bir yazı olmuştur. Herhangi bir sorunuz olması halinde benimle iletişime geçebilirsiniz. Teşekkürler:)

Kaynak:

--

--