Iceberg Made Simple with Amazon S3 Tables

Iceberg Made Simple with Amazon S3 Tables

Data lakes have become essential for organizations seeking to store vast amounts of structured and unstructured data. However, managing these repositories presents several challenges:

  • Data Consistency and Integrity: Traditional data lakes often lack support for ACID (Atomicity, Consistency, Isolation, Durability) transactions, leading to issues with data consistency, especially during concurrent read and write operations.
  • Schema Evolution: As data evolves, modifying schemas without disrupting existing processes becomes complex, often requiring downtime or extensive data migrations.
  • Efficient Data Retrieval: Without proper data partitioning and indexing, querying large datasets can be inefficient, resulting in slow response times and increased computational costs.
  • Data Versioning and Time Travel: Tracking historical data changes and accessing previous data versions are challenging, hindering auditing and rollback capabilities.

Apache Iceberg addresses these challenges by introducing a high-performance table format for data lakes:

  • ACID Transactions: Iceberg ensures data consistency through support for ACID transactions, allowing safe concurrent operations.
  • Schema Evolution: It enables seamless schema modifications, such as adding or dropping columns, without requiring data rewrites or causing disruptions.
  • Partitioning and Indexing: Iceberg provides advanced partitioning strategies and indexing, optimizing query performance by reducing the amount of data scanned.
  • Time Travel: It maintains a history of data changes, allowing users to query data as of a specific point in time, facilitating auditing and data recovery.

Amazon S3 Tables and Their Advantages

While Apache Iceberg offers robust solutions for data lake challenges, implementing and managing Iceberg tables on standard Amazon S3 buckets can introduce complexities:

  • Operational Overhead: Managing Iceberg tables requires handling tasks such as metadata management, snapshot creation, and data compaction, which can be resource-intensive.
  • Performance Optimization: Achieving optimal query performance necessitates manual tuning and maintenance, including partition management and file optimization.
  • Scalability Constraints: As data volumes grow, maintaining high transaction throughput and low-latency queries becomes increasingly challenging without specialized infrastructure.

Amazon S3 Tables offer a fully managed solution for storing and managing tabular data at scale, addressing common challenges associated with self-managed Apache Iceberg tables on standard S3 buckets.

Key Features and Capabilities of Amazon S3 Tables:

  • Purpose-Built Storage: S3 Tables introduce table buckets, a new bucket type specifically designed for tabular data, delivering up to 3x faster query performance and up to 10x higher transactions per second compared to self-managed Iceberg tables in general-purpose S3 buckets.
  • Automated Maintenance: S3 Tables handle routine maintenance tasks such as data compaction, snapshot management, and unreferenced file cleanup automatically, reducing operational overhead and ensuring consistent performance.
  • Namespaces for Logical Organization: S3 Tables support namespaces, allowing you to group tables logically within a table bucket. This structure facilitates better organization and management of tables, enabling you to apply access controls and policies at the namespace level.
  • Enhanced Security and Access Control: S3 Tables provide robust security features, including identity-based and resource-based policies. You can manage access for both table buckets and individual tables using AWS Identity and Access Management (IAM) and Service Control Policies in AWS Organizations. S3 Tables use a unique service namespace (s3tables), allowing you to design policies specifically for the S3 Tables service and its resources.
  • Encryption: S3 Tables ensure data protection through encryption at rest and in transit. Table buckets have bucket encryption enabled by default, applying to all tables within the bucket at no additional cost. Data in transit is protected using Transport Layer Security (TLS) 1.2 and above through HTTPS.
  • Monitoring and Auditing with AWS CloudTrail: S3 Tables integrate with AWS CloudTrail, providing detailed logs of all API calls and activities. CloudTrail captures both management events (e.g., creating or deleting tables) and data events (e.g., accessing or modifying table data), enabling comprehensive monitoring and auditing of your S3 Tables environment.

By leveraging these features, Amazon S3 Tables simplify the management of tabular data, enhance performance, and provide robust security and monitoring capabilities, making them a compelling choice for organizations looking to efficiently manage their data lakes

Integrations with AWS and Third-Party Services

Amazon S3 Tables seamlessly integrate with a variety of AWS analytics services and third-party tools, enhancing data processing and analysis capabilities.

AWS Service Integrations: (in-preview)

  • Amazon Athena: Athena allows you to run SQL queries directly on data stored in S3 Tables without the need for data loading or complex ETL processes. This integration facilitates interactive querying of large datasets.
  • Amazon Redshift: With S3 Tables, Amazon Redshift can query data in place, enabling efficient data warehousing and analytics without duplicating data. This integration streamlines data workflows and reduces storage costs.
  • Amazon EMR: Amazon EMR supports processing data in S3 Tables using big data frameworks like Apache Spark. This integration allows for scalable data processing and analytics.
  • AWS Glue: AWS Glue provides a fully managed ETL service that can catalog, clean, and transform data stored in S3 Tables, facilitating seamless data preparation for analytics. Integration with the AWS Glue Data Catalog enables centralized metadata management.
  • Amazon Kinesis Data Firehose: Kinesis Data Firehose can deliver real-time streaming data to S3 Tables, enabling near real-time analytics and data processing. This integration supports use cases such as log analytics and event monitoring.

Third-Party Integrations:

S3 Tables, leveraging the Apache Iceberg format, are currently compatible with Apache Spark open-source data processing engine:

  • Apache Spark: You can configure Apache Spark to interact with S3 Tables using the Amazon S3 Tables Catalog for Apache Iceberg, enabling efficient data processing and analytics.

Sample Use Case: Batch Data Processing with Amazon S3 Tables, and AWS EMR

Scenario:

A retail company collects daily sales data from multiple stores. They aim to consolidate this data into a centralized data lake for batch processing, enabling comprehensive sales analysis and reporting.

Create Synthetic Sales data:

generate_sales_data.py

import csv
import random
from datetime import datetime, timedelta

def generate_sales_data(num_records, output_file):
    start_date = datetime(2024, 1, 1)
    num_stores = 10

    with open(output_file, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['store_id', 'sale_date', 'sale_amount'])

        for _ in range(num_records):
            store_id = random.randint(1, num_stores)
            days_offset = random.randint(0, 364)
            sale_date = (start_date + timedelta(days=days_offset)).strftime('%Y-%m-%d')
            sale_amount = round(random.uniform(100.00, 1000.00), 2)
            writer.writerow([store_id, sale_date, sale_amount])

    print(f'{num_records} records have been written to {output_file}')

if __name__ == "__main__":
    num_records = 10000
    output_file = 'data/daily_sales.csv'
    generate_sales_data(num_records, output_file)

Create S3 Table

aws s3tables create-table-bucket --region us-east-1 --name iceberg-s3tables-data

# {
#     "arn": "arn:aws:s3tables:us-east-1:xxxx:bucket/iceberg-s3tables-data"
# }

Create EMR Roles

aws emr create-default-roles

aws iam attach-role-policy --role-name EMR_EC2_DefaultRole --policy-arn arn:aws:iam::aws:policy/AmazonS3TablesFullAccess

aws iam attach-role-policy --role-name EMR_EC2_DefaultRole --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess

Create EMR Cluster

aws emr create-cluster \
  --name emr-cluster-for-iceberg-s3tables-demo \
  --release-label emr-7.5.0 \
  --applications Name=Spark Name=JupyterEnterpriseGateway Name=JupyterHub \
  --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,KeyName=iceberg-s3-tables,SubnetId=subnet-6480713b \
  --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=r5.2xlarge \
  --configurations file://emr-configurations.json \
  --service-role EMR_DefaultRole \
  --tags 'Environment=dev' 'Name=emr-cluster-for-iceberg-s3tables-demo' \
  --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
  --auto-termination-policy IdleTimeout=3600 \
  --ebs-root-volume-size 100 \
  --region us-east-1
-------------------------------------------------------------
file: emr-configurations.json
[
    {
      "Classification": "spark-defaults",
      "Properties": {
        "spark.driver.memory": "5692M",
        "spark.executor.memory": "5692M",
        "spark.yarn.am.memory": "5692M",
        "spark.dynamicAllocation.enabled": "true",
        "spark.shuffle.service.enabled": "true"
      }
    },
    {
      "Classification": "yarn-site",
      "Properties": {
        "yarn.nodemanager.resource.memory-mb": "57344",
        "yarn.scheduler.maximum-allocation-mb": "57344",
        "yarn.scheduler.minimum-allocation-mb": "1024"
      }
    },
    {
      "Classification": "iceberg-defaults",
      "Properties": {
        "iceberg.enabled": "true"
      }
    }
  ]
  

Open EMR JupyterHub Notebook with login# The user name is jovyan and the password is jupyter


from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder \
    .appName("Iceberg S3 Tables Demo") \
    .getOrCreate()
  
-------------------------------------------------------------

%%configure -f
{
"conf":{
  "spark.jars.packages": "software.amazon.s3tables:s3-tables-catalog-for-iceberg-runtime:0.1.3",
  "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
  "spark.sql.catalog.demoblog": "org.apache.iceberg.spark.SparkCatalog",
  "spark.sql.catalog.demoblog.catalog-impl": "software.amazon.s3tables.iceberg.S3TablesCatalog",
  "spark.sql.catalog.demoblog.warehouse": "arn:aws:s3tables:us-east-1:xxxxxx:bucket/iceberg-s3tables-data"
 }
}
-------------------------------------------------------------

sales_df = spark.read.format("csv").option("header", "true").load("s3://source-bucket/daily_sales/*.csv")
print(f"Number of records are {sales_df.count()} ")

Output:

Number of records are 10000
-------------------------------------------------------------

spark.sql("CREATE NAMESPACE IF NOT EXISTS demoblog.example_namespace")
spark.sql("show namespaces in demoblog").show()
table_store = "demoblog.example_namespace.dailysales"

Output:

+-----------------+
|        namespace|
+-----------------+
|example_namespace|
+-----------------+
-------------------------------------------------------------

sales_df.writeTo(table_store).using("iceberg").createOrReplace()
spark.sql(f"select count(*) from {table_store}").show()

Output:

+--------+
|count(1)|
+--------+
|   10000|
+--------+
-------------------------------------------------------------

# Alter table and add a new column
import pyspark.sql.utils
try:
    spark.sql(f"ALTER TABLE {table_store} ADD COLUMNS count INTEGER")
except pyspark.sql.utils.AnalysisException:
    print("Column already exists")
-------------------------------------------------------------

# Set default value to the new column
from pyspark.sql import functions as sf
sales_df.withColumn("count",sf.lit(100)).writeTo(table_store).using("iceberg").createOrReplace()
-------------------------------------------------------------

# Check snapshot history of the table
spark.sql(f"SELECT * FROM {table_store}.history LIMIT 10").show()

Output:

+--------------------+-------------------+---------+-------------------+
|     made_current_at|        snapshot_id|parent_id|is_current_ancestor|
+--------------------+-------------------+---------+-------------------+
|2024-12-27 23:03:...|7537788598025187317|     NULL|              false|
|2024-12-27 23:06:...| 803915337298815352|     NULL|               true|
+--------------------+-------------------+---------+-------------------+
-------------------------------------------------------------

# Time travel - query the table before adding new column

spark.sql(f"SELECT * FROM {table_store} for system_version as of 7537788598025187317 LIMIT 2").show()

Output:

+--------+----------+-----------+
|store_id| sale_date|sale_amount|
+--------+----------+-----------+
|       2|2024-11-19|     813.06|
|       6|2024-10-28|     244.81|
+--------+----------+-----------+
-------------------------------------------------------------

# Time travel - latest snapshot

spark.sql(f"SELECT * FROM {table_store} for system_version as of 803915337298815352 LIMIT 2").show()

Output:

+--------+----------+-----------+-----+
|store_id| sale_date|sale_amount|count|
+--------+----------+-----------+-----+
|       2|2024-11-19|     813.06|  100|
|       6|2024-10-28|     244.81|  100|
+--------+----------+-----------+-----+

Conclusion:

By leveraging Amazon S3 Tables with Apache Iceberg, AWS Glue, and Amazon Athena, organizations can build robust batch data processing pipelines. This setup ensures data consistency, supports schema evolution, and provides powerful querying capabilities, including time travel, facilitating advanced analytics and informed decision-making.

For more detailed information and advanced configurations, refer to the following resources:

Working with Amazon S3 Tables and table buckets - Amazon Simple Storage Service
Learn about S3 Tables, an Amazon S3 storage solution that’s purpose-built for tables and optimized for analytic workloads.