E2E Build Data Lakehouse with Reddit API & Iceberg: Part 2

In the previous section, we discussed cleaning the data and loading it into the OpenSearch & MinIO.
📌Build Data Lakehouse From Historical Data
This is the most crucial stage of the project: we will be creating a lakehouse using the data we have collected. If you would like to read my previous post about the lakehouse, you can find it here.
We will use similar technologies here too.
- Data Catalog: Nessie
- Table Format: Apache Iceberg
- Query Tool: Dremio
Let’s start by configuring Spark settings…
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType,StringType, FloatType
import pyspark
from pyspark.sql import SparkSession
import os
full_path_to_warehouse = 's3a://warehouse'
branch_name = "main"
# Nessie authentication türü. Diğer seçenekler (NONE, BEARER, OAUTH2 or AWS)
auth_type = "NONE"
s3_endpoint = "http://192.168.89.83:9000"
accessKeyId=''
secretAccessKey=''
nessie_url = "http://192.168.89.83:19120/api/v1"
spark = (
SparkSession.builder
.master("local")
.appName("Spark Unıty Iceberg Demo")
.config("spark.driver.memory", "16g")
.config("spark.hadoop.fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")\
.config('spark.jars.packages','org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.2,org.apache.hadoop:hadoop-aws:3.4.0')
.config("spark.hadoop.fs.s3a.access.key", accessKeyId)
.config("spark.hadoop.fs.s3a.secret.key", secretAccessKey)
.config("spark.hadoop.fs.s3a.path.style.access", True)
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
# Spark Amazon S3 varsayılan API'sine değil lokaldeki MinIO'ya gitsin.
.config("spark.hadoop.fs.s3a.endpoint", s3_endpoint)
.config("fs.s3a.connection.ssl.enabled", "false")
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
#Configuring Catalog
.config("spark.sql.catalog.nessie.uri", nessie_url)
.config("spark.sql.catalog.nessie.ref", branch_name)
.config("spark.sql.catalog.nessie.authentication.type", auth_type)
.config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
.config("spark.sql.catalog.nessie.warehouse", full_path_to_warehouse)
.config("fs.s3a.connection.ssl.enabled", "false")
.getOrCreate()
)
Read parquet files from S3(MinIO)
schema = StructType([
StructField("filename", StringType(), True),
StructField("SCORE", FloatType(), True),
StructField("SUBREDDIT", StringType(), True),
StructField("ETL_DATE" ,StringType(), True),
StructField("TITLE", StringType(), True),
StructField("TEXT",StringType(), True),
StructField("SUBREDDIT_TYPE",StringType(), True),
StructField("SUBREDDIT_SUBSCRIBER_COUNT",IntegerType(), True),
StructField("URL" ,StringType(), True),
StructField("TS" ,StringType(), True),
StructField("VOTE_RATIO",FloatType(), True),
StructField("TOPIC",StringType(), True),
StructField("USER_NAME",StringType(), True),
StructField("WLS",FloatType(), True)
])
# df_parquet= spark.read.parquet("s3a://reddit/*.parquet", inferSchema=True, schema=schema)
df= spark.read.parquet("s3a://reddit/*", inferSchema=True, schema=schema)
df.show(2)
At this stage, we write queries that answer questions such as the topics that receive the most data or the users that post the most.
from pyspark.sql.functions import date_format
most_subscriber_per_day_df = df.groupBy(date_format("ETL_DATE", "yyyy-MM-dd").alias("date"),
df.USER_NAME
).count()
most_subscriber_per_day_df.show()
daily_message_df = df.groupBy(date_format("ETL_DATE", "yyyy-MM-dd").alias("date")).count()
daily_message_df.show()
most_write_user = df\
.filter(df.USER_NAME.isNotNull())\
.groupBy("USER_NAME")\
.count()\
.orderBy("count", ascending=False).coalesce(2)
most_write_user.show()
After the query, You must create a namsespace
for Nessie.
CATALOG_NAME = "nessie"
DB_NAME = "reddit"
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME};")
Let’s insert data from the table with a partition…
⚡ Iceberg Partitioning
Apache Iceberg is an open-source table format used for storing large datasets. Partitioning is an optimization technique that improves query performance by dividing database tables into smaller parts based on specific attributes.
Iceberg simplifies this process by using hidden partitioning. Users do not need to manually manage partition columns; Iceberg automatically generates partition values during table writing and uses them at query time. This allows users to get accurate query results without understanding the physical table layout.
Iceberg can partition timestamps at different granularities such as year, month, day, and hour. It can also partition categorical columns by identity, hash buckets, or truncation. This flexibility allows database tables to change partitioning schemes over time based on data volume.
Iceberg’s hidden partitioning approach allows queries to work independently of the table layout and enables partitioning configurations to evolve.
Create a table with a partition for query data
TABLE_NAME = "daily_message"
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions (
date DATE,
count INTEGER
)
USING iceberg
PARTITIONED BY (date);
""")
#--------------------------------------------------------------------------#
TABLE_NAME = "most_write_user"
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions (
USER_NAME STRING,
count INTEGER
)
USING iceberg
PARTITIONED BY (USER_NAME);
""")
#--------------------------------------------------------------------------#
TABLE_NAME = "topic_count"
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} (
topic STRING,
count INTEGER
)
USING iceberg
""")
#--------------------------------------------------------------------------#
TABLE_NAME = "most_subscriber_per_day"
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions (
date TIMESTAMP,
USER_NAME STRING,
count INTEGER
)
USING iceberg
PARTITIONED BY (date);
""")
#--------------------------------------------------------------------------#
TABLE_NAME = "avg_score_per_topic"
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} (
topic STRING,
AVG_SCORE DOUBLE
)
USING iceberg
""")
Insert data into the iceberg table
# unpartitioned tables
avg_score_per_topic.write.mode("append").insertInto(f"{CATALOG_NAME}.{DB_NAME}.avg_score_per_topic")
topic_count.write.mode("append").insertInto(f"{CATALOG_NAME}.{DB_NAME}.topic_count")
#partitioned tables
most_subscriber_per_day_df\
.withColumn("date",most_subscriber_per_day_df['date'].cast(DateType()))\
.orderBy("date")\
.writeTo(f"{CATALOG_NAME}.{DB_NAME}.most_subscriber_per_day_withpartitions").append()
Summary
- We transformed the dataset retrieved through the Reddit API into the Apache Iceberg open table format within an Apache Spark environment.
- After data conversion, we explored the key features of Iceberg, focusing on advanced partitioning strategies.
- Specifically, we examined the concepts of partitioning by branches and tags, which enhance data organization.
- These partitioning strategies in Iceberg improve data access speed and query efficiency, optimizing the table structure for analytical workflows.
- Furthermore, we implemented data versioning in Iceberg, enabling us to track changes and revert to previous data states if needed, which is critical for maintaining data integrity in the lakehouse.
If you’re interested in data engineering & data science topics, you can also check out my other blog posts.