Streaming Data Pipeline Proje Örneği (Kafka,Spark,HBase,Phoenix,Streamlit)

Emre Evcimen
Bentego Teknoloji
Published in
6 min readAug 23, 2023

--

Herkese Merhabalar, bugünkü yazımda sizlere örnek bir canlı veri akışı projesi aktaracağım.

İlk önce proje mimarisini görsel olarak bir inceleyelim.

Akışı yazılı olarak ifade etmek gerekirse yazdığımız Python koduyla satış verisi(Avro formatında) üretiyoruz ve bu verileri Kafka’da oluşturduğumuz topic’e iletiyoruz. Daha sonrasında Spark Streaming kullanarak topic’i canlı bir şekilde dinleyip verileri HBase’e gönderiyoruz. HBase üzerindeki verileri SQL yeteneklerimizi kullanarak sorgulamak istediğimiz için Phoenix aracını devreye alıyoruz. Burada shell ekranı yerine DBeaver kullanarak daha esnek çalışabiliriz. Son olarak da Python’ın Streamlit kütüphanesini kullanarak analiz ettiğimiz verileri dashboard’lar oluşturarak son kullanıcıya sunuyoruz.

Gerekli servislerin çalışma durumlarını kontrol edelim.

İlk ekran görüntüsünde görmüş olduğumuz üzere Hbase servisimiz çalışır durumda gözüküyor.(HMaster,HRegion ve Hadoop Bileşenleri)

İkinci ekran görüntüsünde de Kafka servisimizin, verileri Avro formatında gönderdiğimiz için Schema Registry servisimiz ve hem Kafka’nın hem de HBase’in kullandığı Zookeeper servisimiz çalışır durumda gözüküyor.

HBase üzerinde verileri yazacağımız tablonun oluşturulması için aşağıdaki komutu çalıştırıyoruz.

hbase-shell
...
hbase:002:0> create 'order', 'order-data'
...
Created table order
Took 2.3048 seconds
=> Hbase::Table - order

Şimdi Kafka’ya veri üretecek Python kodumuzu inceleyip akışı başlatabiliriz.

import pandas as pd
from faker import Faker
import random as rd
import time
from confluent_kafka import avro
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.avro import AvroProducer,CachedSchemaRegistryClient
import re
import datetime

SCHEMA_REGISTRY_URL = "http://localhost:8881"
BROKER_URL = "localhost:9092"

admin_client = AdminClient({'bootstrap.servers': BROKER_URL})
order_topic = [NewTopic('order-topic', num_partitions=1, replication_factor=1)]

fs = admin_client.create_topics(order_topic)

schema = avro.loads(
"""{
"type":"record",
"name":"myrecord",
"fields": [
{
"name": "CustomerID",
"type": "int"
},
{
"name": "CustomerName",
"type": "string"
},
{
"name": "Email",
"type": "string"
},
{
"name": "Address",
"type": "string"
},
{
"name": "State",
"type": "string"
},
{
"name": "PhoneNumber",
"type": "string"
},
{
"name": "InvoiceDate",
"type": "string"
},
{
"name": "StockCode",
"type": "string"
},
{
"name": "Description",
"type": "string"
},
{
"name": "UnitPrice",
"type": "float"
},
{
"name": "Quantity",
"type": "int"
}
]
}"""
)

schema_registry = CachedSchemaRegistryClient({"url": SCHEMA_REGISTRY_URL})

data = pd.read_csv('OnlineRetail.csv', sep = ';')

data["InvoiceNo"] = data["InvoiceNo"].str.replace('A','')
data["InvoiceNo"] = data["InvoiceNo"].str.replace('C','')
data["InvoiceNo"] = data["InvoiceNo"].astype("int64")
data["StockCode"] = data["StockCode"].astype("string")
data["Description"] = data["Description"].astype("string")
data["UnitPrice"] = data["UnitPrice"].str.replace(',','.').astype("float")

cols = ['StockCode','Description', 'UnitPrice']
Stock_Desc_Unit_lst = data[cols].values.tolist()

fake = Faker()

def create_data(x):
project_data = {}
for i in range(x):
project_data[i] = {}
project_data[i]['CustomerID'] = i + 1
project_data[i]['Name'] = fake.name()

return project_data

df = pd.DataFrame(create_data(5000)).transpose()

cols = ['CustomerID','Name']
CustomerID_Name = df[cols].values.tolist()
Q_df = data[data["Quantity"]>0]
Quantity = Q_df["Quantity"].values.tolist()

now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def produce_kafka_dict(topic_name,json_as_dict):
producer=AvroProducer({"bootstrap.servers": BROKER_URL}, schema_registry=schema_registry)

producer.produce( topic=topic_name,
value=json_as_dict,
value_schema=schema)
producer.flush()

while True:
#for i in range(50):
order={}
n = rd.randint(1,10)
time.sleep(rd.randint(1,15))
CID_Name = rd.choice(CustomerID_Name)
order["CustomerID"]= CID_Name[0]
order["CustomerName"] = CID_Name[1]
order["Email"] = CID_Name[1].lower().replace(' ','.') + "@gmail.com"
order["Address"] = fake.address().replace('\n',' ')
order["State"] = re.findall("[A-Z]{2}", order["Address"])[0]
order["PhoneNumber"] = fake.phone_number()[0:10]
order["InvoiceDate"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
for a in range(n):
Stock_Desc_Unit = rd.choice(Stock_Desc_Unit_lst)
order["StockCode"] = Stock_Desc_Unit[0]
order["Description"] = Stock_Desc_Unit[1]
order["UnitPrice"] = Stock_Desc_Unit[2]
order["Quantity"] = rd.choice(Quantity)
produce_kafka_dict('order-topic',order)
print(order)

Evet kodumuzu çalıştırdığımızda ve Kafka topic’ini dinlediğimizde verilerin başarılı şekilde aktığını görebiliyoruz.

Şimdi ise Kafka’dan verileri okuyup HBase’e yazacak olan Spark kodumuzu inceleyelim.

from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.sql.avro.functions import from_avro
from confluent_kafka.schema_registry import SchemaRegistryClient


kafka_url = "localhost:9092"
schema_registry_url = "http://localhost:8881"
kafka_producer_topic = "order-topic"
schema_registry_subject = f"{kafka_producer_topic}-value"


spark = SparkSession \
.builder \
.master("yarn") \
.appName("ordertopic_consumer") \
.config("spark.jars.packages", "org.apache.hbase:hbase-shaded-mapreduce:2.4.15,org.apache.spark:spark-avro_2.12:3.0.1") \
.config("spark.jars", "/home/emre/Desktop/DE-Project-Hbase-Phoenix/hbase-spark-1.0.1-SNAPSHOT_spark331_hbase2415.jar,/home/emre/Desktop/DE-Project-Hbase-Phoenix/hbase-spark-protocol-shaded-1.0.1-SNAPSHOT_spark331_hbase2415.jar") \
.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

def get_schema_from_schema_registry(schema_registry_url, schema_registry_subject):
sr = SchemaRegistryClient({'url': schema_registry_url})
latest_version = sr.get_latest_version(schema_registry_subject)

return sr, latest_version

_, latest_version_ordertopic = get_schema_from_schema_registry(schema_registry_url, schema_registry_subject)
fromAvroOptions = {"mode":"PERMISSIVE"}


def write_to_hbase(batch_df, epoch_id):

hbase_schema = """CustomerID INT :key, CustomerName STRING order-data:CustomerName,
Email STRING order-data:Email, Address STRING order-data:Address, State STRING order-data:State,
PhoneNumber STRING order-data:PhoneNumber, InvoiceDate TIMESTAMP order-data:InvoiceDate,
StockCode STRING order-data:StockCode, Description STRING order-data:Description,
UnitPrice FLOAT order-data:UnitPrice, Quantity INT order-data:Quantity
"""

batch_df.write.format("org.apache.hadoop.hbase.spark") \
.option("hbase.columns.mapping",hbase_schema) \
.option("hbase.namespace", "default") \
.option("hbase.table", "order") \
.option("hbase.spark.use.hbasecontext", False).save()



checkpointDir = "file:///tmp/streaming/kafka-order"

ordertopic_df = spark \
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", kafka_url)\
.option("subscribe", kafka_producer_topic)\
.option("startingOffsets", "earliest")\
.load()

ordertopic_df = ordertopic_df.withColumn("fixedValue", func.expr("substring(value, 6, length(value)-5)"))

decoded_output = ordertopic_df.select(
from_avro(
func.col("fixedvalue"), latest_version_ordertopic.schema.schema_str, fromAvroOptions
)
.alias("ordertopic"))

order_value_df = decoded_output \
.select("ordertopic.*")

order_value_df = order_value_df \
.withColumn("InvoiceDate", func.to_timestamp(func.col("InvoiceDate")))\
.withColumn("CustomerID",func.col("CustomerID").cast("integer")) \

order_value_df.printSchema()

query = order_value_df.writeStream \
.foreachBatch(write_to_hbase) \
.start()

query.awaitTermination()

Spark kodunu çalıştırdığımızda Kafka’daki verilerin başarılı şekilde HBase’e yazıldığını görmekteyiz. count komutunu kullanarak da veri akışının takibini yapabilirsiniz.

Yukarıdaki ekran görüntülerinde de görmüş olacağınız üzere HBase’in kendine has bir sorgu dili mevcut.Biz eğer burada depolanan verileri SQL yetenekleri kullanarak sorgulamak istersek bunun için açık kaynak ürünü olan Apache Phoenix’i kullanabiliriz. Dilerseniz DBeaver üzerinden bağlantı sağlayarak HBase üzerindeki verilerimizi sorgulayalım.

DBeaver ile Apache Phoenix sunucusuna bağlanmak istediğinizde DBeaver’ın default olarak indirdiği driver’ı değilde sizde kurulu olan versiyonundaki client’ı eklemeniz gerekecek.

Bende Apache Phoenix’ın kurulu olduğu dizine gittiğimde oradaki phoenix-client-hbase-2.5.jar’ı driver olarak kendim belirtiyorum.

Bağlantıyı test ettiğimizde başarılı olduğunu görmekteyiz.

CREATE VIEW "order"
(
"ID" UNSIGNED_INT PRIMARY KEY,
"order-data"."CustomerName" VARCHAR,
"order-data"."Email" VARCHAR,
"order-data"."Address" VARCHAR,
"order-data"."State" VARCHAR,
"order-data"."PhoneNumber" VARCHAR,
"order-data"."InvoiceDate" UNSIGNED_DATE,
"order-data"."StockCode" VARCHAR,
"order-data"."Description" VARCHAR,
"order-data"."UnitPrice" UNSIGNED_FLOAT,
"order-data"."Quantity" UNSIGNED_INT
)

-- DROP VIEW "order"

SELECT "CustomerName","Email","State","InvoiceDate",UPPER("StockCode") "StockCode",ROUND("UnitPrice",2) AS "UnitPrice","Quantity"
FROM "order";

Phoenix üzerinde oluşturduğumuz View sayesinde HBase üzerindeki verileri SQL yeteneklerimizi kullanarak analiz edebiliriz.

Her şey yolunda gittiğine göre son aşamamız olan analiz edilen verinin Streamlit aracılığıyla son kullanıcıya gösterilmesi kaldı.

Gelin Streamlit için yazdığımız kodu ve nasıl çalıştırmamız gerektiğine bir bakalım.

import jaydebeapi as jdbc
import os
import pandas as pd
from datetime import datetime
import streamlit as st

st.set_page_config(layout='wide')
st.header("Sales Report")

cwd = os.getcwd()
jar = cwd + '/streamlit/phoenix-client-hbase-2.5.jar'
drivername = 'org.apache.phoenix.jdbc.PhoenixDriver'
url = 'jdbc:phoenix:localhost:2181/'
curr_date = datetime.today().strftime('%Y-%m-%d')

conn = jdbc.connect(url= url, jars=jar ,jclassname=drivername,
driver_args={"phoenix.schema.isNamespaceMappingEnabled": "true"})

cursor = conn.cursor()

query = f'''
SELECT *
FROM "order" WHERE SUBSTR(TO_CHAR("InvoiceDate"),1,10) = '{curr_date}'

'''

query_2 = f'''
SELECT "State", ROUND(SUM("UnitPrice" * "Quantity"),2) AS "Income"
FROM "order" WHERE SUBSTR(TO_CHAR("InvoiceDate"),1,10) = '{curr_date}'
GROUP BY "State"
ORDER BY 2 DESC
LIMIT 10
'''

query_3 = f'''
SELECT SUBSTR(TO_CHAR("InvoiceDate"),12,2) AS "HOUR", ROUND(SUM("UnitPrice" * "Quantity"),2) AS "Income"
FROM "order" WHERE SUBSTR(TO_CHAR("InvoiceDate"),1,10) = '{curr_date}'
GROUP BY SUBSTR(TO_CHAR("InvoiceDate"),12,2)
ORDER BY 1 DESC

'''

df_raw_data = pd.read_sql(query, conn)

df_top_income_state = pd.read_sql(query_2, conn)

df_top_income_hour = pd.read_sql(query_3, conn)

with st.expander('View Raw Data'):
st.dataframe(df_raw_data, hide_index=True)

with st.expander('Top 10 State by Income'):
st.dataframe(df_top_income_state, hide_index=True)

st.bar_chart(df_top_income_state, x="State", y="Income")

with st.expander('Top 10 State by Hour'):
st.dataframe(df_top_income_hour, hide_index=True)

st.line_chart(df_top_income_hour, x="HOUR", y="Income")
streamlit run streamlit/strlmt-code.py 
------
You can now view your Streamlit app in your browser.
Local URL: http://localhost:8501
Network URL: http://xxx.xxx.xx.xxx:8501

Streamlit uygulamasını başlattığınızda yukarıdaki gibi size erişebileceğiniz URL’ler paylaşılmakta. Herhangi bir web browser üzerinden giriş yaparak kontrol sağlayabiliriz.

Böylece Apache Phoenix üzerinde analiz ettiğimiz verileri son kullanıcıya göstermiş olduk.

Umarım sizler için faydalı bir yazı olmuştur. Herhangi bir sorunuz veya görüşünüz varsa benimle iletişime geçebilirsiniz. Görüşmek üzere👋👋

--

--