Tag: pyspark

  • Iceberg: Features and Hands-on (Part 2)

    As we have already discussed in the previous blog about Apache Iceberg’s basic concepts, setup process, and how to load data. Further, we will now delve into some of Iceberg’s advanced features, including upsert functionality, schema evolution, time travel, and partitioning.

    Upsert Functionality

    One of Iceberg’s key features is its support for upserts. Upsert, which stands for update and insert, allows you to efficiently manage changes to your data. With Iceberg, you can perform these operations seamlessly, ensuring that your data remains accurate and up-to-date without the need for complex and time-consuming processes.

    Schema Evolution

    Schema evolution is another of its powerful features. Over time, the schema of your data may need to change due to new requirements or updates. Iceberg handles schema changes gracefully, allowing you to add, remove, or modify columns without having to rewrite your entire dataset. This flexibility ensures that your data architecture can evolve in tandem with your business needs.

    Time Travel

    Iceberg also provides time travel capabilities, enabling you to query historical data as it existed at any given point in time. This feature is particularly useful for debugging, auditing, and compliance purposes. By leveraging snapshots, you can easily access previous states of your data and perform analyses on how it has changed over time.

    Setup Iceberg on the local machine using the local catalog option or Hive

    You can also configure Iceberg in your Spark session like this:

    import pyspark
    spark = pyspark.sql.SparkSession.builder 
    .config('spark.jars.packages','org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.1.0') 
        .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') 
        .config('spark.sql.catalog.spark_catalog.type', 'hive') 
        .config('spark.sql.catalog.local', 'org.apache.iceberg.spark.SparkCatalog') 
        .config('spark.sql.catalog.local.type', 'hadoop') 
        .config('spark.sql.catalog.local.warehouse', './Data-Engineering/warehouse') 
        .getOrCreate()

    Some configurations must pass while setting up Iceberg. 

    Create Tables in Iceberg and Insert Data

    CREATE TABLE demo.db.data_sample (index string, organization_id string, name string, website string, country string, description string, founded string, industry string, num_of_employees string) USING iceberg

    df = spark.read.option("header", "true").csv("../data/input-data/organizations-100.csv")
    
    df.writeTo("demo.db.data_sample").append()

    We can either create the sample table using Spark SQL or directly write the data by mentioning the DB name and table name, which will create the Iceberg table for us.

    You can see the data we have inserted. Apart from appending, you can use the overwrite method as well as Delta Lake tables. You can also see an example of how to read the data from an iceberg table.

    Handling Upserts

    This Iceberg feature is similar to Delta Lake. You can update the records in existing Iceberg tables without impacting the complete data. This is also used to handle the CDC operations. We can take input from any incoming CSV and merge the data in the existing table without any duplication. It will always have a single Record for each primary key. This is how Iceberg maintains the ACID properties.

    Incoming Data 

    input_data = spark.read.option("header", "true").csv("../data/input-data/organizations-11111.csv")
    # Creating the temp view of that dataframe to merge
    input_data.createOrReplaceTempView("input_data")
    spark.sql("select * from input_data").show()

    We will merge this data into our existing Iceberg Table using Spark SQL.

    MERGE INTO demo.db.data_sample t
    USING (SELECT * FROM input_data) s
    ON t.organization_id = s.organization_id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
    
    select * from demo.db.data_sample

    Here, we can see the data once the merge operation has taken place.

    Schema Evolution

    Iceberg supports the following schema evolution changes:

    • Add – Add a new column to the iceberg table
    • Drop – If any columns get removed from the existing tables
    • Rename – Change the name of the columns from the existing table
    • Update – Change the data type or partition columns of the Iceberg table
    • Reorder – Change in the order of the Iceberg table

    After updating the schema, there will be no need to overwrite or re-write the data again. Like previously, your table has four columns, and all of them have data. If you added two more columns, you wouldn’t need to rewrite the data now that you have six columns. You can still easily access the data. This unique feature was lacking in Delta Lake but is present here. These are just some characteristics of the Iceberg scheme evolutions.

    1. If we add any columns, they won’t impact the existing columns.
    2. If we delete or drop any columns, they won’t impact other columns.
    3. Updating a column or field does not change values in any other column.

    Iceberg uses unique IDs to track each column added to a table.

    Let’s run some queries to update the schema, or let’s try to delete some columns.

    %%sql
    
    ALTER TABLE demo.db.data_sample
    ADD COLUMN fare_per_distance_unit float AFTER num_of_employees;

    After adding another column, if we try to access the data again from the table, we can do so without seeing any kind of error. This is also how Iceberg solves schema-related problems.

    Partition Evolution and Sort Order Evolution

    Iceberg came up with this option, which was missing in Delta Lake. When you evolve a partition spec, the old data written with an earlier spec remains unchanged. New data is written using the new spec in a new layout. Metadata for each of the partition versions is kept separately. Because of this, when you start writing queries, you get split planning. This is where each partition layout plans files separately using the filter it derives for that specific partition layout.

    Similar to partition spec, Iceberg sort order can also be updated in an existing table. When you evolve a sort order, the old data written with an earlier order remains unchanged.

    %%sql
    
    ALTER TABLE demo.db.data_sample ADD PARTITION FIELD founded
    DESCRIBE TABLE demo.db.data_sample

    Copy on write(COW) and merge on read(MOR) as well

    Iceberg supports both COW and MOR while loading the data into the Iceberg table. We can set up configuration for this by either altering the table or while creating the iceberg table.

    Copy-On-Write (COW) – Best for tables with frequent reads, infrequent writes/updates, or large batch updates:

    When your requirement is to frequently read but less often write and update, you can configure this property in an Iceberg table. In COW, when we update or delete any rows from the table, a new data file with another version is created, and the latest version holds the latest updated data. The data is rewritten when updates or deletions occur, making it slower and can be a bottleneck when large updates occur. As its name specifies, it creates another copy on write of data.

    When reading occurs, it is an ideal process as we are not updating or deleting anything we are only reading so we can read the data faster.

    Merge-On-Read (MOR) – Best for tables with frequent writes/updates:

    This is just opposite of the COW, as we do not rewrite the data again on the update or deletion of any rows. It creates a change log with updated records and then merges this into the original data file to create a new state of file with updated records.

    Query engine and integration supported:

    Conclusion

    After performing this research, we learned about the Iceberg’s features and its compatibility with various metastore for integrations. We got the basic idea of configuring Iceberg on different cloud platforms and locally well. We had some basic ideas for Upsert, schema evolution and partition evolution.

  • Iceberg – Introduction and Setup (Part – 1)

    As we already discussed in our previous Delta Lake blog, there are already table formats in use, ones with very high specifications and their own benefits. Iceberg is one of them. So, in this blog, we will discuss Iceberg.

    What is Apache Iceberg?

    Iceberg, from the open-source Apache, is a table format used to handle large amounts of data stored locally or on various cloud storage platforms. Netflix developed Iceberg to solve its big data problem. After that, they donated it to Apache, and it became open source in 2018.  Iceberg now has a large number of contributors all over the world on GitHub and is the most widely used table format. 

    Iceberg mainly solves all the key problems we once faced when using the Hive table format to deal with data stored on various cloud storage like S3.

    Iceberg has similar features and capabilities, like SQL tables. Yes, it is open source, so multiple engines like Spark can operate on it to perform transformations and such. It also has all ACID properties. This is a quick introduction to  Iceberg, covering its features and initial setup.

    Why to go with Iceberg

    The main reason to use Iceberg is that it performs better when we need to load data from S3, or metadata is available on a cloud storage medium. Unlike Hive, Iceberg tracks the data at the file level rather than the folder level, which can decrease performance; that’s why we want to choose Iceberg. Here is the folder hierarchy that Iceberg uses while saving the data into its tables. Each Iceberg table is a combination of four files: snapshot metadata list, manifest list, manifest file, and data file.

    1. Snapshot Metadata File:  This file holds the metadata information about the table, such as the schema, partitions, and manifest list.
    2. Manifest List:  This list records each manifest file along with the path and metadata information. At this point, Iceberg decides which manifest files to ignore and which to read.
    3. Manifest File: This file contains the paths to real data files, which hold the real data along with the metadata.
    4. Data File: Here is the real parquet, ORC, and Avro file, along with the real data.

    Features of Iceberg:

    Some Iceberg features include:

    • Schema Evolution: Iceberg allows you to evolve your schema without having to rewrite your data. This means you can easily add, drop, or rename columns, providing flexibility to adapt to changing data requirements without impacting existing queries.
    • Partition Evolution: Iceberg supports partition evolution, enabling you to modify the partitioning scheme as your data and query patterns evolve. This feature helps maintain query performance and optimize data layout over time.
    • Time Travel: Iceberg’s time travel feature allows you to query historical versions of your data. This is particularly useful for debugging, auditing, and recreating analyses based on past data states.
    • Multiple Query Engine Support: Iceberg supports multiple query engines, including Trino, Presto, Hive, and Amazon Athena. This interoperability ensures that you can read and write data across different tools seamlessly, facilitating a more versatile and integrated data ecosystem.
    • AWS Support: Iceberg is well-integrated with AWS services, making it easy to use with Amazon S3 for storage and other AWS analytics services. This integration helps leverage the scalability and reliability of AWS infrastructure for your data lake.
    • ACID Compliance: Iceberg ensures ACID (Atomicity, Consistency, Isolation, Durability) transactions, providing reliable data consistency and integrity. This makes it suitable for complex data operations and concurrent workloads, ensuring data reliability and accuracy.
    • Hidden Partitioning: Iceberg’s hidden partitioning abstracts the complexity of managing partitions from the user, automatically handling partition management to improve query performance without manual intervention.
    • Snapshot Isolation: Iceberg supports snapshot isolation, enabling concurrent read and write operations without conflicts. This isolation ensures that users can work with consistent views of the data, even as it is being updated.
    • Support for Large Tables: Designed for high scalability, Iceberg can efficiently handle petabyte-scale tables, making it ideal for large datasets typical in big data environments.
    • Compatibility with Modern Data Lakes: Iceberg’s design is tailored for modern data lake architectures, supporting efficient data organization, metadata management, and performance optimization, aligning well with contemporary data management practices.

    These features make Iceberg a powerful and flexible table format for managing data lakes, ensuring efficient data processing, robust performance, and seamless integration with various tools and platforms. By leveraging Iceberg, organizations can achieve greater data agility, reliability, and efficiency, enhancing their data analytics capabilities and driving better business outcomes.

    Prerequisite:

    • PySpark: Ensure that you have PySpark installed and properly configured. PySpark provides the Python API for Spark, enabling you to harness the power of distributed computing with Spark using Python.
    • Python: Make sure you have Python installed on your system. Python is essential for writing and running your PySpark scripts. It’s recommended to use a virtual environment to manage your dependencies effectively.
    • Iceberg-Spark JAR: Download the appropriate Iceberg-Spark JAR file that corresponds to your Spark version. This JAR file is necessary to integrate Iceberg with Spark, allowing you to utilize Iceberg’s advanced table format capabilities within your Spark jobs.
    • Jars to Configure Cloud Storage: Obtain and configure the necessary JAR files for your specific cloud storage provider. For example, if you are using Amazon S3, you will need the hadoop-aws JAR and its dependencies. For Google Cloud Storage, you need the gcs-connector JAR. These JARs enable Spark to read from and write to cloud storage systems.
    • Spark and Hadoop Configuration: Ensure your Spark and Hadoop configurations are correctly set up to integrate with your cloud storage. This might include setting the appropriate access keys, secret keys, and endpoint configurations in your spark-defaults.conf and core-site.xml.
    • Iceberg Configuration: Configure Iceberg settings specific to your environment. This might include catalog configurations (e.g., Hive, Hadoop, AWS Glue) and other Iceberg properties that optimize performance and compatibility.
    • Development Environment: Set up a development environment with an IDE or text editor that supports Python and Spark development, such as IntelliJ IDEA with the PyCharm plugin, Visual Studio Code, or Jupyter Notebooks.
    • Data Source Access: Ensure you have access to the data sources you will be working with, whether they are in cloud storage, relational databases, or other data repositories. Proper permissions and network configurations are necessary for seamless data integration.
    • Basic Understanding of Data Lakes: A foundational understanding of data lake concepts and architectures will help effectively utilize Iceberg. Knowledge of how data lakes differ from traditional data warehouses and their benefits will also be helpful.
    • Version Control System: Use a version control system like Git to manage your codebase. This helps in tracking changes, collaborating with team members, and maintaining code quality.
    • Documentation and Resources: Familiarize yourself with Iceberg documentation and other relevant resources. This will help you troubleshoot issues, understand best practices, and leverage advanced features effectively.

    You can download the run time JAR from here —according to the Spark version installed on your machine or cluster. It will be the same as the Delta Lake setup. You can either download these JAR files to your machine or cluster, provide a Spark submit command, or you can download these while initializing the Spark session by passing these in Spark config as a JAR package, along with the appropriate version.

    To use cloud storage, we are using these JARs with the S3 bucket for reading and writing Iceberg tables. Here is the basic example of a spark session:

    AWS_ACCESS_KEY_ID = "XXXXXXXXXXXXXX"
    AWS_SECRET_ACCESS_KEY = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXpiwvahk7e"
    
    spark_jars_packages = "com.amazonaws:aws-java-sdk:1.12.246,org.apache.hadoop:hadoop-aws:3.2.2,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0"
    
    spark = pyspark.sql.SparkSession.builder 
       .config("spark.jars.packages", spark_jars_packages) 
       .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") 
       .config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog") 
       .config("spark.sql.catalog.demo.warehouse", "s3a://abhishek-test-01012023/iceberg-sample-data/") 
       .config('spark.sql.catalog.demo.type', 'hadoop') 
       .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') 
       .config("spark.driver.memory", "20g") 
       .config("spark.memory.offHeap.enabled", "true") 
       .config("spark.memory.offHeap.size", "8g") 
       .getOrCreate()
    
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)

    Iceberg Setup Using Docker

    You can set and configure AWS creds, as well as some database-related or stream-related configs inside the docker-compose file.

    version: "3"
    
    services:
      spark-iceberg:
        image: tabulario/spark-iceberg
        container_name: spark-iceberg
        build: spark/
        depends_on:
          - rest
          - minio
        volumes:
          - ./warehouse:/home/iceberg/warehouse
          - ./notebooks:/home/iceberg/notebooks/notebooks
          - ./data:/home/iceberg/data
        environment:
          - AWS_ACCESS_KEY_ID=admin
          - AWS_SECRET_ACCESS_KEY=password
          - AWS_REGION=us-east-1
        ports:
          - 8888:8888
          - 8080:8080
        links:
          - rest:rest
          - minio:minio
      rest:
        image: tabulario/iceberg-rest:0.1.0
        ports:
          - 8181:8181
        environment:
          - AWS_ACCESS_KEY_ID=admin
          - AWS_SECRET_ACCESS_KEY=password
          - AWS_REGION=us-east-1
          - CATALOG_WAREHOUSE=s3a://warehouse/wh/
          - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
          - CATALOG_S3_ENDPOINT=http://minio:9000
      minio:
        image: minio/minio
        container_name: minio
        environment:
          - MINIO_ROOT_USER=admin
          - MINIO_ROOT_PASSWORD=password
        ports:
          - 9001:9001
          - 9000:9000
        command: ["server", "/data", "--console-address", ":9001"]
      mc:
        depends_on:
          - minio
        image: minio/mc
        container_name: mc
        environment:
          - AWS_ACCESS_KEY_ID=admin
          - AWS_SECRET_ACCESS_KEY=password
          - AWS_REGION=us-east-1
        entrypoint: >
          /bin/sh -c "
          until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
          /usr/bin/mc rm -r --force minio/warehouse;
          /usr/bin/mc mb minio/warehouse;
          /usr/bin/mc policy set public minio/warehouse;
          exit 0;
          " 

    Save this file with docker-compose.yaml. And run the command: docker compose up. Now, you can log into your container by using this command:

    docker exec -it <container-id> bash

    You can mount the sample data directory in a container or copy it from your local machine to the container. To copy the data inside the Docker directory, we can use the CP command.

    docker cp input-data <Container ID>:/home/iceberg/data 

    Setup S3 As a Warehouse in Iceberg, Read Data from the S3, and Write Iceberg Tables in the S3 Again Using an EC2 Instance  

    We have generated 90 GB of data here using Spark Job, stored in the S3 bucket. 

    AWS_ACCESS_KEY_ID = "XXXXXXXXXXX"
    AWS_SECRET_ACCESS_KEY = "XXXXXXXXXXX+XXXXXXXXXXX"
    
    spark_jars_packages = "com.amazonaws:aws-java-sdk:1.12.246,org.apache.hadoop:hadoop-aws:3.2.2,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0"
    
    spark = pyspark.sql.SparkSession.builder 
       .config("spark.jars.packages", spark_jars_packages) 
       .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") 
       .config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog") 
       .config("spark.sql.catalog.demo.warehouse", "s3a://abhishek-test-01012023/iceberg-sample-data/") 
       .config('spark.sql.catalog.demo.type', 'hadoop') 
       .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') 
       .config("spark.driver.memory", "20g") 
       .config("spark.memory.offHeap.enabled", "true") 
       .config("spark.memory.offHeap.size", "8g") 
       .getOrCreate()
    
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)

    Step 1

    We read the data in Spark and create an Iceberg table out of it, storing the iceberg tables in the S3 bucket only.

    Some Iceberg functionality won’t work if we haven’t installed or used the appropriate JAR file of the Iceberg version. The Iceberg version should be compatible with the Spark version you are using; otherwise, some feature partitions will throw an error of noSuchMethod. This must be taken care of carefully while setting this up, either in EC2 or EMR.

    Create an Iceberg table on S3 and write data into that table. The sample data we have used is generated using a Spark job for Delta tables. We are using the same data and schema of the data as follows.

    Step 2

    We created Iceberg tables in the location of the S3 bucket and wrote the data with partition columns in the S3 bucket only.

    spark.sql(""" CREATE TABLE IF NOT EXISTS demo.db.iceberg_data_2(id INT, first_name String,
    last_name String, address String, pincocde INT, net_income INT, source_of_income String,
    state String, email_id String, description String, population INT, population_1 String,
    population_2 String, population_3 String, population_4 String, population_5 String, population_6 String,
    population_7 String, date INT)
    USING iceberg
    TBLPROPERTIES ('format'='parquet', 'format-version' = '2')
    PARTITIONED BY (`date`)
    location 's3a://abhishek-test-01012023/iceberg_v2/db/iceberg_data_2'""")
    
    # Read the data that need to be written
    # Reading the data from delta tables in spark Dataframe
    
    df = spark.read.parquet("s3a://abhishek-test-01012023/delta-lake-sample-data/")
    
    logging.info("Starting writing the data")
    
    df.sortWithinPartitions("date").writeTo("demo.db.iceberg_data").partitionedBy("date").createOrReplace()
    
    logging.info("Writing has been finished")
    
    logging.info("Query the data from iceberg using spark SQL")
    
    spark.sql("describe table demo.db.iceberg_data").show()
    spark.sql("Select * from demo.db.iceberg_data limit 10").show()

    This is how we can use Iceberg over S3. There is another option: We can also create Iceberg tables in the AWS Glue catalog. Most tables created in the Glue catalog using Ahena are external tables that we use externally after generating the manifest files, like Delta Lake. 

    Step 3

    We print the Iceberg table’s data along with the table descriptions. 

    Using Iceberg, we can directly create the table in the Glue catalog using Athena, and it supports all read and write operations on the data available. These are the configurations that need to use in spark while using Glue catalog.

    {
        "conf":  {
                 "spark.sql.catalog.glue_catalog1": "org.apache.iceberg.spark.SparkCatalog",
                 "spark.sql.catalog.glue_catalog1.warehouse": 
                       "s3://YOUR-BUCKET-NAME/iceberg/glue_catalog1/tables/",
                 "spark.sql.catalog.glue_catalog1.catalog-impl":    "org.apache.iceberg.aws.glue.GlueCatalog",
                 "spark.sql.catalog.glue_catalog1.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
                 "spark.sql.catalog.glue_catalog1.lock-impl": "org.apache.iceberg.aws.glue.DynamoLockManager",
                 "spark.sql.catalog.glue_catalog1.lock.table": "myGlueLockTable",
      "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
               } 
    }

    Now, we can easily create the Iceberg table using the Spark or Athena, and it will be accessible via Delta. We can perform upserts, too.

    Conclusion

    We’ve learned the basics of the Iceberg table format, its features, and the reasons for choosing Iceberg. We discussed how Iceberg provides significant advantages such as schema evolution, partition evolution, hidden partitioning, and ACID compliance, making it a robust choice for managing large-scale data. We also delved into the fundamental setup required to implement this table format, including configuration and integration with data processing engines like Apache Spark and query engines like Presto and Trino. By leveraging Iceberg, organizations can ensure efficient data management and analytics, facilitating better performance and scalability. With this knowledge, you are well-equipped to start using Iceberg for your data lake needs, ensuring a more organized, scalable, and efficient data infrastructure.