Tag: etl

  • Data Engineering: Beyond Big Data

    When a data project comes to mind, the end goal is to enhance the data. It’s about building systems to curate the data in a way that can help the business.

    At the dawn of their data engineering journey, people tend to familiarize themselves with the terms “extract,” transformation,” and ”loading.” These terms, along with traditional data engineering, spark the image that data engineering is about the processing and movement of large amounts of data. And why not! We’ve witnessed a tremendous evolution in these technologies, from storing information in simple spreadsheets to managing massive data warehouses and data lakes, supported by advanced infrastructure capable of ingesting and processing huge data volumes. 

    However, this doesn’t limit data engineering to ETL; rather, it opens so many opportunities to introduce new technologies and concepts that can and are needed to support big data processing. The expectations from a modern data system extend well beyond mere data movement. There’s a strong emphasis on privacy, especially with the vast amounts of sensitive data that need protection. Speed is crucial, particularly in real-world scenarios like satellite data processing, financial trading, and data processing in healthcare, where eliminating latency is key.

    With technologies like AI and machine learning driving analysis on massive datasets, data volumes will inevitably continue to grow. We’ve seen this trend before, just as we once spoke of megabytes and now regularly discuss gigabytes. In the future, we’ll likely talk about terabytes and petabytes with the same familiarity.

    These growing expectations have made data engineering a sphere with numerous supporting components, and in this article, we’ll delve into some of those components.

    • Data governance
    • Metadata management
    • Data observability
    • Data quality
    • Orchestration
    • Visualization

    Data Governance

    With huge amounts of confidential business and user data moving around, it’s a very delicate process to handle it safely. We must ensure trust in data processes, and the data itself can not be compromised. It is essential for a business onboarding users to show that their data is in safe hands. In today’s time, when a business needs sensitive information from you, you’ll be bound to ask questions such as:

    • What if my data is compromised?
    • Are we putting it to the right use?
    • Who’s in control of this data? Are the right personnel using it?
    • Is it compliant to the rules and regulations for data practices?

    So, to answer these questions satisfactorily, data governance comes into the picture. The basic idea of data governance is that it’s a set of rules, policies, principles, or processes to maintain data integrity. It’s about how we can supervise our data and keep it safe. Think of data governance as a protective blanket that takes care of all the security risks, creates a habitable environment for data, and builds trust in data processing.

    Data governance is very strong equipment in the data engineering arsenal. These rules and principles are consistently applied throughout all data processing activities. Wherever data flows, data governance ensures that data adheres to these established protocols. By adding a sense of trust to the activities involving data, you gain the freedom to focus on your data solution without worrying about any external or internal risks. This helps in reaching the ultimate goal—to foster a culture that prioritizes and emphasizes data responsibility.

    Understanding the extensive application of data governance in data engineering clearly illustrates its significance and where it needs to be implemented in real-world scenarios. In numerous entities, such as government organizations or large corporations, data sensitivity is a top priority. Misuse of this data can have widespread negative impacts. To ensure that it doesn’t happen, we can use tools to ensure oversight and compliance. Let’s briefly explore one of those tools.

    Microsoft Purview

    Microsoft Purview comes with a range of solutions to protect your data. Let’s look at some of its offerings.

    • Insider risk management
      • Microsoft purview takes care of data security risks from people inside your organization by identifying high-risk individuals.
      • It helps you classify data breaches into different sections and take appropriate action to prevent them.
    • Data loss prevention
      • It makes applying data loss prevention policies straightforward.
      • It secures data by restricting important and sensitive data from being deleted and blocks unusual activities, like sharing sensitive data outside your organization.
    • Compliance adherence
      • Microsoft Purview can help you make sure that your data processes are compliant with data regulatory bodies and organizational standards.
    • Information protection
      • It provides granular control over data, allowing you to define strict accessibility rules.
      • When you need to manage what data can be shared with specific individuals, this control restricts the data visible to others.
    • Know your sensitive data
      • It simplifies the process of understanding and learning about your data.
      • MS Purview features ML-based classifiers that label and categorize your sensitive data, helping you identify its specific category.

    Metadata Management

    Another essential aspect of big data movement is metadata management. 

    Metadata, simply put, is data about data. This component of data engineering makes a base for huge improvements in data systems.

    You might have come across this headline a while back, which also reappeared recently.

    This story is from about a decade ago, and it tells us about metadata’s longevity and how it became a base for greater things.

    At the time, Instagram showed the number of likes by running a count function on the database and storing it in a cache. This method was fine because the number wouldn’t change frequently, so the request would hit the cache and get the result. Even if the number changed, the request would query the data, and because the number was small, it wouldn’t scan a lot of rows, saving the data system from being overloaded.

    However, when a celebrity posted something, it’d receive so many likes that the count would be enormous and change so frequently that looking into the cache became just an extra step.

    The request would trigger a query that would repeatedly scan many rows in the database, overloading the system and causing frequent crashes.

    To deal with this, Instagram came up with the idea of denormalizing the tables and storing the number of likes for each post. So, the request would result in a query where the database needs to look at only one cell to get the number of likes. To handle the issue of frequent changes in the number of likes, Instagram began updating the value at small intervals. This story tells how Instagram solved this problem with a simple tweak of using metadata. 

    Metadata in data engineering has evolved to solve even more significant problems by adding a layer on top of the data flow that works as an interface to communicate with data. Metadata management has become a foundation of multiple data features such as:

    • Data lineage: Stakeholders are interested in the results we get from data processes. Sometimes, in order to check the authenticity of data and get answers to questions like where the data originated from, we need to track back to the data source. Data lineage is a property that makes use of metadata to help with this scenario. Many data products like Atlan and data warehouses like Snowflake extensively use metadata for their services.
    • Schema information: With a clear understanding of your data’s structure, including column details and data types, we can efficiently troubleshoot and resolve data modeling challenges.
    • Data contracts: Metadata helps honor data contacts by keeping a common data profile, which maintains a common data structure across all data usages.
    • Stats: Managing metadata can help us easily access data statistics while also giving us quick answers to questions like what the total count of a table is, how many distinct records there are, how much space it takes, and many more.
    • Access control: Metadata management also includes having information about data accessibility. As we encountered it in the MS Purview features, we can associate a table with vital information and restrict the visibility of a table or even a column to the right people.
    • Audit: Keeping track of information, like who accessed the data, who modified it, or who deleted it, is another important feature that a product with multiple users can benefit from.

    There are many other use cases of metadata that enhance data engineering. It’s positively impacting the current landscape and shaping the future trajectory of data engineering. A very good example is a data catalog. Data catalogs focus on enriching datasets with information about data. Table formats, such as Iceberg and Delta, use catalogs to provide integration with multiple data sources, handle schema evolution, etc. Popular cloud services like AWS Glue also use metadata for features like data discovery. Tech giants like Snowflake and Databricks rely heavily on metadata for features like faster querying, time travel, and many more. 

    With the introduction of AI in the data domain, metadata management has a huge effect on the future trajectory of data engineering. Services such as Cortex and Fabric have integrated AI systems that use metadata for easy questioning and answering. When AI gets to know the context of data, the application of metadata becomes limitless.

    Data Observability

    We know how important metadata can be, and while it’s important to know your data, it’s as important to know about the processes working on it. That’s where observability enters the discussion. It is another crucial aspect of data engineering and a component we can’t miss from our data project. 

    Data observability is about setting up systems that can give us visibility over different services that are working on the data. Whether it’s ingestion, processing, or load operations, having visibility into data movement is essential. This not only ensures that these services remain reliable and fully operational, but it also keeps us informed about the ongoing processes. The ultimate goal is to proactively manage and optimize these operations, ensuring efficiency and smooth performance. We need to achieve this goal because it’s very likely that whenever we create a data system, multiple issues, as well as errors and bugs, will start popping out of nowhere.

    So, how do we keep an eye on these services to see whether they are performing as expected? The answer to that is setting up monitoring and alerting systems.

    Monitoring

    Monitoring is the continuous tracking and measurement of key metrics and indicators that tells us about the system’s performance. Many cloud services offer comprehensive performance metrics, presented through interactive visuals. These tools provide valuable insights, such as throughput, which measures the volume of data processed per second, and latency, which indicates how long it takes to process the data. They track errors and error rates, detailing the types and how frequently they happen.

    To lay the base for monitoring, there are tools like Prometheus and Datadog, which provide us with these monitoring features, indicating the performance of data systems and the system’s infrastructure. We also have Graylog, which gives us multiple features to monitor logs of a system, that too in real-time.

    Now that we have the system that gives us visibility into the performance of processes, we need a setup that can tell us about them if anything goes sideways, a setup that can notify us. 

    Alerting

    Setting up alerting systems allows us to receive notifications directly within the applications we use regularly, eliminating the need for someone to constantly monitor metrics on a UI or watch graphs all day, which would be a waste of time and resources. This is why alerting systems are designed to trigger notifications based on predefined thresholds, such as throughput dropping below a certain level, latency exceeding a specific duration, or the occurrence of specific errors. These alerts can be sent to channels like email or Slack, ensuring that users are immediately aware of any unusual conditions in their data processes.

    Implementing observability will significantly impact data systems. By setting up monitoring and alerting, we can quickly identify issues as they arise and gain context about the nature of the errors. This insight allows us to pinpoint the source of problems, effectively debug and rectify them, and ultimately reduce downtime and service disruptions, saving valuable time and resources.

    Data Quality

    Knowing the data and its processes is undoubtedly important, but all this knowledge is futile if the data itself is of poor quality. That’s where the other essential component of data engineering, data quality, comes into play because data processing is one thing; preparing the data for processing is another.

    In a data project involving multiple sources and formats, various discrepancies are likely to arise. These can include missing values, where essential data points are absent; outdated data, which no longer reflects current information; poorly formatted data that doesn’t conform to expected standards; incorrect data types that lead to processing errors; and duplicate rows that skew results and analyses. Addressing these issues will ensure the accuracy and reliability of the data used in the project.

    Data quality involves enhancing data with key attributes. For instance, accuracy measures how closely the data reflects reality, validity ensures that the data accurately represents what we aim to measure, and completeness guarantees that no critical data is missing. Additionally, attributes like timeliness ensure the data is up to date. Ultimately, data quality is about embedding attributes that build trust in the data. For a deeper dive into this, check out Rita’s blog on Data QA: The Need of the Hour.

    Data quality plays a crucial role in elevating other processes in data engineering. In a data engineering project, there are often multiple entry points for data processing, with data being refined at different stages to achieve a better state each time. Assessing data at the source of each processing stage and addressing issues early on is vital. This approach ensures that data standards are maintained throughout the data flow. As a result, by making data consistent at every step, we gain improved control over the entire data lifecycle. 

    Data tools like Great Expectations and data unit test libraries such as Deequ play a crucial role in safeguarding data pipelines by implementing data quality checks and validations. To gain more context on this, you might want to read Unit Testing Data at Scale using Deequ and Apache Spark by Nishant. These tools ensure that data meets predefined standards, allowing for early detection of issues and maintaining the integrity of data as it moves through the pipeline.

    Orchestration

    With so many processes in place, it’s essential to ensure everything happens at the right time and in the right way. Relying on someone to manually trigger processes at scheduled times every day is an inefficient use of resources. For that individual, performing the same repetitive tasks can quickly become monotonous. Beyond that, manual execution increases the risk of missing schedules or running tasks out of order, disrupting the entire workflow.

    This is where orchestration comes to the rescue, automating tedious, repetitive tasks and ensuring precision in the timing of data flows. Data pipelines can be complex, involving many interconnected components that must work together seamlessly. Orchestration ensures that each component follows a defined set of rules, dictating when to start, what to do, and how to contribute to the overall process of handling data, thus maintaining smooth and efficient operations.

    This automation helps reduce errors that could occur with manual execution, ensuring that data processes remain consistent by streamlining repetitive tasks. With a number of different orchestration tools and services in place, we can now monitor and manage everything from a single platform. Tools like Airflow, an open-source orchestrator, Prefect, which offers a user-friendly drag-and-drop interface, and cloud services such as Azure Data Factory, Google Cloud Composer, and AWS Step Functions, enhance our visibility and control over the entire process lifecycle, making data management more efficient and reliable. Don’t miss Shreyash’s excellent blog on Mage: Your New Go-To Tool for Data Orchestration.

    Orchestration is built on a foundation of multiple concepts and technologies that make it robust and fail-safe. These underlying principles ensure that orchestration not only automates processes but also maintains reliability and resilience, even in complex and demanding data environments.

    • Workflow definition: This defines how tasks in the pipeline are organized and executed. It lays out the sequence of tasks—telling it what needs to be finished before other tasks can start—and takes care of other conditions for pipeline execution. Think of it like a roadmap that guides the flow of tasks.
    • Task scheduling: This determines when and how tasks are executed. Tasks might run at specific times, in response to events, or based on the completion of other tasks. It’s like scheduling appointments for tasks to ensure they happen at the right time and with the right resources.
    • Dependency management: Since tasks often rely on each other, with the concepts of dependency management, we can ensure that tasks run in the correct order. It ensures that each process starts only when its prerequisites are met, like waiting for a green light before proceeding.

    With these concepts, orchestration tools provide powerful features for workflow design and management, enabling the definition of complex, multi-step processes. They support parallel, sequential, and conditional execution of tasks, allowing for flexibility in how workflows are executed. Not just that, they also offer event-driven and real-time orchestration, enabling systems to respond to dynamic changes and triggers as they occur. These tools also include robust error handling and exception management, ensuring that workflows are resilient and fault-tolerant.

    Visualization

    The true value lies not just in collecting vast amounts of data but in interpreting it in ways that generate real business value, and this makes visualization of data a vital component to provide a clear and accurate representation of data that can be easily understood and utilized by decision-makers. The presentation of data in the right way enables businesses to get intelligence from data, which makes data engineering worth the investment and this is what guides strategic decisions, optimizes operations, and gives power to innovation. 

    Visualizations allow us to see patterns, trends, and anomalies that might not be apparent in raw data. Whether it’s spotting a sudden drop in sales, detecting anomalies in customer behavior, or forecasting future performance, data visualization can provide the clear context needed to make well-informed decisions. When numbers and graphs are presented effectively, it feels as though we are directly communicating with the data, and this language of communication bridges the gap between technical experts and business leaders.

    Visualization Within ETL Processes

    Visualization isn’t just a final output. It can also be a valuable tool within the data engineering process itself. Intermediate visualization during the ETL workflow can be a game-changer. In collaborative teams, as we go through the transformation process, visualizing it at various stages helps ensure the accuracy and relevance of the result. We can understand the datasets better, identify issues or anomalies between different stages, and make more informed decisions about the transformations needed.

    Technologies like Fabric and Mage enable seamless integration of visualizations into ETL pipelines. These tools empower team members at all levels to actively engage with data, ask insightful questions, and contribute to the decision-making process. Visualizing datasets at key points provides the flexibility to verify that data is being processed correctly, develop accurate analytical formulas, and ensure that the final outputs are meaningful.

    Depending on the industry and domain, there are various visualization tools suited to different use cases. For example, 

    • For real-time insights, which are crucial in industries like healthcare, financial trading, and air travel, tools such as Tableau and Striim are invaluable. These tools allow for immediate visualization of live data, enabling quick and informed decision-making.
    • For broad data source integrations and dynamic dashboard querying, often demanded in the technology sector, tools like Power BI, Metabase, and Grafana are highly effective. These platforms support a wide range of data sources and offer flexible, interactive dashboards that facilitate deep analysis and exploration of data.

    It’s Limitless

    We are seeing many advancements in this domain, which are helping businesses, data science, AI and ML, and many other sectors because the potential of data is huge. If a business knows how to use data, it can be a major factor in its success. And for that reason, we have constantly seen the rise of different components in data engineering. All with one goal: to make data useful.

    Recently, we’ve witnessed the introduction of numerous technologies poised to revolutionize the data engineering domain. Concepts like data mesh are enhancing data discovery, improving data ownership, and streamlining data workflows. AI-driven data engineering is rapidly advancing, with expectations to automate key processes such as data cleansing, pipeline optimization, and data validation. We’re already seeing how cloud data services have evolved to embrace AI and machine learning, ensuring seamless integration with data initiatives. The rise of real-time data processing brings new use cases and advancements, while practices like DataOps foster better collaboration among teams. Take a closer look at the modern data stack in Shivam’s detailed article, Modern Data Stack: The What, Why, and How?

    These developments are accompanied by a wide array of technologies designed to support infrastructure, analytics, AI, and machine learning, alongside enterprise tools that lay the foundation for this ongoing evolution. All these elements collectively set the stage for a broader discussion on data engineering and what lies beyond big data. Big data, supported by these satellite activities, aims to extract maximum value from data, unlocking its full potential.

    References:

    1. Velotio – Data Engineering Blogs
    2. Firstmark
    3. MS Purview Data Security
    4. Tech Target – Article on data quality
    5. Splunk – Data Observability: The Complete Introduction
    6. Instagram crash story – WIRED

  • Mage: Your New Go-To Tool for Data Orchestration

    In our journey to automate data pipelines, we’ve used tools like Apache Airflow, Dagster, and Prefect to manage complex workflows. However, as data automation continues to change, we’ve added a new tool to our toolkit: Mage AI.

    Mage AI isn’t just another tool; it’s a solution to the evolving demands of data automation. This blog aims to explain how Mage AI is changing the way we automate data pipelines by addressing challenges and introducing innovative features. Let’s explore this evolution, understand the problems we face, and see why we’ve adopted Mage AI.

    What is Mage AI?

    Mage is a user-friendly open-source framework created for transforming and merging data. It’s a valuable tool for developers handling substantial data volumes efficiently. At its heart, Mage relies on “data pipelines,” made up of code blocks. These blocks can run independently or as part of a larger pipeline. Together, these blocks form a structure known as a directed acyclic graph (DAG), which helps manage dependencies. For example, you can use Mage for tasks like loading data, making transformations, or exportation.

    Mage Architecture:

    Before we delve into Mage’s features, let’s take a look at how it works.

    When you use Mage, your request begins its journey in the Mage Server Container, which serves as the central hub for handling requests, processing data, and validation. Here, tasks like data processing and real-time interactions occur. The Scheduler Process ensures tasks are scheduled with precision, while Executor Containers, designed for specific tasks like Python or AWS, carry out the instructions.

    Mage’s scalability is impressive, allowing it to handle growing workloads effectively. It can expand both vertically and horizontally to maintain top-notch performance. Mage efficiently manages data, including code, data, and logs, and takes security seriously when handling databases and sensitive information. This well-coordinated system, combined with Mage’s scalability, guarantees reliable data pipelines, blending technical precision with seamless orchestration.

    Scaling Mage:

    To enhance Mage’s performance and reliability as your workload expands, it’s crucial to scale its architecture effectively. In this concise guide, we’ll concentrate on four key strategies for optimizing Mage’s scalability:

    1. Horizontal Scaling: Ensure responsiveness by running multiple Mage Server and Scheduler instances. This approach keeps the system running smoothly, even during peak usage.
    2. Multiple Executor Containers: Deploy several Executor Containers to handle concurrent task execution. Customize them for specific executors (e.g., Python, PySpark, or AWS) to scale task processing horizontally as needed.
    3. External Load Balancers: Utilize external load balancers to distribute client requests across Mage instances. This not only boosts performance but also ensures high availability by preventing overloading of a single server.
    4. Scaling for Larger Datasets: To efficiently handle larger datasets, consider:

    a. Allocating more resources to executors, empowering them to tackle complex data transformations.

    b. Mage supports direct data warehouse transformation and native Spark integration for massive datasets.

    Features: 

    1) Interactive Coding Experience

    Mage offers an interactive coding experience tailored for data preparation. Each block in the editor is a modular file that can be tested, reused, and chained together to create an executable data pipeline. This means you can build your data pipeline piece by piece, ensuring reliability and efficiency.

    2) UI/IDE for Building and Managing Data Pipelines

    Mage takes data pipeline development to the next level with a user-friendly integrated development environment (IDE). You can build and manage your data pipelines through an intuitive user interface, making the process efficient and accessible to both data scientists and engineers.

    3) Multiple Languages Support

    Mage supports writing pipelines in multiple languages such as Python, SQL, and R. This language versatility means you can work with the languages you’re most comfortable with, making your data preparation process more efficient.

    4) Multiple Types of Pipelines

    Mage caters to diverse data pipeline needs. Whether you require standard batch pipelines, data integration pipelines, streaming pipelines, Spark pipelines, or DBT pipelines, Mage has you covered.

    5) Built-In Engineering Best Practices

    Mage is not just a tool; it’s a promoter of good coding practices. It enables reusable code, data validation in each block, and operationalizes data pipelines with built-in observability, data quality monitoring, and lineage. This ensures that your data pipelines are not only efficient but also maintainable and reliable.

    6) Dynamic Blocks

    Dynamic blocks in Mage allow the output of a block to dynamically create additional blocks. These blocks are spawned at runtime, with the total number of blocks created being equal to the number of items in the output data of the dynamic block multiplied by the number of its downstream blocks.

    7) Triggers

    • Schedule Triggers: These triggers allow you to set specific start dates and intervals for pipeline runs. Choose from daily, weekly, or monthly, or even define custom schedules using Cron syntax. Mage’s Schedule Triggers put you in control of when your pipelines execute.
    • Event Triggers: With Event Triggers, your pipelines respond instantly to specific events, such as the completion of a database query or the creation of a new object in cloud storage services like Amazon S3 or Google Storage. Real-time automation at your fingertips.
    • API Triggers: API Triggers enable your pipelines to run in response to specific API calls. Whether it’s customer requests or external system interactions, these triggers ensure your data workflows stay synchronized with the digital world.

    Different types of Block: 

    Data Loader: Within Mage, Data Loaders are ready-made templates designed to seamlessly link up with a multitude of data sources. These sources span from Postgres, Bigquery, Redshift, and S3 to various others. Additionally, Mage allows for the creation of custom data loaders, enabling connections to APIs. The primary role of Data Loaders is to facilitate the retrieval of data from these designated sources.

    Data Transformer: Much like Data Loaders, Data Transformers provide predefined functions such as handling duplicates, managing missing data, and excluding specific columns. Alternatively, you can craft your own data transformations or merge outputs from multiple data loaders to preprocess and sanitize the data before it advances through the pipeline.

    Data Exporter: Data Exporters within Mage empower you to dispatch data to a diverse array of destinations, including databases, data lakes, data warehouses, or local storage. You can opt for predefined export templates or craft custom exporters tailored to your precise requirements.

    Custom Blocks: Custom blocks in the Mage framework are incredibly flexible and serve various purposes. They can store configuration data and facilitate its transmission across different pipeline stages. Additionally, they prove invaluable for logging purposes, allowing you to categorize and visually distinguish log entries for enhanced organization.

    Sensor: A Sensor, a specialized block within Mage, continuously assesses a condition until it’s met or until a specified time duration has passed. When a block depends on a sensor, it remains inactive until the sensor confirms that its condition has been satisfied. Sensors are especially valuable when there’s a need to wait for external dependencies or handle delayed data before proceeding with downstream tasks.

    Getting Started with Mage

    There are two ways to run mage, either using docker or using pip:
    Docker Command

    Create a new working directory where all the mage files will be stored.

    Then, in that working directory, execute this command:

    Windows CMD: 

    docker run -it -p 6789:6789 -v %cd%:/home/src mageai/mageai /app/run_app.sh mage start [project_name]

    Linux CMD:

    docker run -it -p 6789:6789 -v $(pwd):/home/src mageai/mageai /app/run_app.sh mage start [project_name]

    Using Pip (Working directory):

    pip install mage-ai

    Mage start [project_name]

    You can browse to http://localhost:6789/overview to get to the Mage UI.

    Let’s build our first pipelineto fetch CSV files from the API for data loading, do some useful transformations, and export that data to our local database.

    Dataset invoices CSV files stored in the current directory of columns:

     (1) First Name; (2) Last Name; (3) E-mail; (4) Product ID; (5) Quantity; (6) Amount; (7) Invoice Date; (8) Address; (9) City; (10) Stock Code

    Create a new pipeline and select a standard batch (we’ll be implementing a batch pipeline) from the dashboard and give it a unique ID.

    Project structure:

    ├── mage_data

    └── [project_name]

        ├── charts

        ├── custom

        ├── data_exporters

        ├── data_loaders

        ├── dbt

        ├── extensions

        ├── pipelines

        │   └── [pipeline_name]

        │       ├── __init__.py

        │       └── metadata.yaml

        ├── scratchpads

        ├── transformers

        ├── utils

        ├── __init__.py

        ├── io_config.yaml

        ├── metadata.yaml

        └── requirements.txt

    This pipeline consists of all the block files, including data loader, transformer, charts, and configuration files for our pipeline io_config.yaml and metadata.yaml files. All block files will contain decorators’ inbuilt function where we will be writing our code.

    1. We begin by loading a CSV file from our local directory, specifically located at /home/src/invoice.csv. To achieve this, we select the “Local File” option from the Templates dropdown and configure the Data Loader block accordingly. Running this configuration will allow us to confirm if the CSV file loads successfully.

    2. In the next step, we introduce a Transformer block using a generic template. On the right side of the user interface, we can observe the directed acyclic graph (DAG) tree. To establish the data flow, we edit the parent of the Transformer block, linking it either directly to the Data Loader block or via the user interface.

    The Transformer block operates on the data frame received from the upstream Data Loader block, which is passed as the first argument to the Transformer function.

    3. Our final step involves exporting the DataFrame to a locally hosted PostgreSQL database. We incorporate a Data Export block and connect it to the Transformer block.

    To establish a connection with the PostgreSQL database, it is imperative to configure the database credentials in the io_config.yaml file. Alternatively, these credentials can be added to environmental variables.

    With these steps completed, we have successfully constructed a foundational batch pipeline. This pipeline efficiently loads, transforms, and exports data, serving as a fundamental building block for more advanced data processing tasks.

    Mage vs Other tools:

    Consistency Across Environments: Some orchestration tools may exhibit inconsistencies between local development and production environments due to varying configurations. Mage tackles this challenge by providing a consistent and reproducible workflow environment through a single configuration file that can be executed uniformly across different environments.

    Reusability: Achieving reusability in workflows can be complex in some tools. Mage simplifies this by allowing tasks and workflows to be defined as reusable components within a Magefile, making it effortless to share them across projects and teams.

    Data Passing: Efficiently passing data between tasks can be a challenge in certain tools, especially when dealing with large datasets. Mage streamlines data passing through straightforward function arguments and returns, enabling seamless data flow and versatile data handling.

    Testing: Some tools lack user-friendly testing utilities, resulting in manual testing and potential coverage gaps. Mage simplifies testing with a robust testing framework that enables the definition of test cases, inputs, and expected outputs directly within the Mage file.

    Debugging: Debugging failed tasks can be time-consuming with certain tools. Mage enhances debugging with detailed logs and error messages, offering clear insights into the causes of failures and expediting issue resolution.

    Conclusion: 

    Mage offers a streamlined and user-friendly approach to data pipeline orchestration, addressing common challenges with simplicity and efficiency. Its single-container deployment, visual interface, and robust features make it a valuable tool for data professionals seeking an intuitive and consistent solution for managing data workflows.

  • Building an ETL Workflow Using Apache NiFi and Hive

    The objective of this article is to design an ETL workflow using Apache NiFi that will scrape a web page with almost no code to get an endpoint, extract and transform the dataset, and load the transformed data into a Hive table.

    Problem Statement

    One potential use case where we need to create a data pipeline would be to capture the district level COVID-19 information from the COVID19-India API website, which gets updated daily. So, the aim is to create a flow that collates and loads a dataset into a warehouse system used by various downstream applications for further analysis, and the flow should be easily configurable for future changes.

    Prerequisites

    Before we start, we must have a basic understanding of Apache NiFi, and having it installed on a system would be a great start for this article. If you do not have it installed, please follow these quick steps. Apache Hive should be added to this architecture, which also requires a fully functional Hadoop framework. For this article, I am using Hive on a single cluster installed locally, but you can use a remote hive connection as well.

    Basic Terminologies

    Apache NiFi is an ETL tool with flow-based programming that comes with a web UI built to provide an easy way (drag & drop) to handle data flow in real-time. It also supports powerful and scalable means of data routing and transformation, which can be run on a single server or in a clustered mode across many servers.

    NiFi workflow consists of processors, the rectangular components that can process, verify, filter, join, split, or adjust data. They exchange pieces of information called FlowFiles through queues named connections, and the FlowFile Controller helps to manage the resources between those components.

    Web scraping is a process to extract and collect structured web data with automation. It includes extracting and processing underlying HTML code using CSS selectors and the extracted data gets stored into a database.

    Apache Hive is a warehouse system built on top of Hadoop used for data summarization, query, and ad-hoc analysis.

    Steps for ETL Workflow

    Fig:- End-to-End NiFi WorkFlow

    The above flow comprises multiple processors each performing different tasks at different stages to process data. The different stages are Collect (InvokeHTTP – API Web Page, InvokeHTTP – Download District Data), Filter (GetHTMLElement, ExtractEndPoints, RouteOnAttributeDistrict API, QueryRecord), Transform (ReplaceHeaders, ConvertJSONToSQL), Load (PutHiveQL), and Logging (LogAttribute). Each processor is connected through different relationship connections and gets triggered on success until the data gets loaded into the table. The entire flow is scheduled to run daily.

    So, let’s dig into each step to understand the flow better.

    1. Get the HTML document using the Remote URL

    The flow starts with an InvokeHTTP processor that sends an HTTP GET request to the COVID19-India API URL and returns an HTML page in the response queue for further inspection. The processor can be used to invoke multiple HTTP methods (GET, PUT, POST, or PATCH) as well.

    Fig:- InvokeHTTP – API Web Page Configuration

    2. Extract listed endpoints

    The second step occurs when the GETHTMLElement processor targets HTML table rows from the response where all the endpoints are listed inside anchor tags using the CSS selector as tr > td > a. and extracts data into FlowFiles.

    Fig:- GetHTMLElement Configuration

    After the success of the previous step, the ExtractText processor evaluates regular expressions against the content of the FlowFile to extract the URLs, which are then assigned to a FlowFile attribute named data_url.

    Fig:- ExtractEndPoints Configuration

    Note: The layout of the web page may have changed in the future. So, if you are reading this article in the future, configure the above processors as per the layout changes if any.

    3. Pick districts API and Download the dataset

    Here, the RouteOnAttribute processor filters out an API for district-level information and ignores other APIs using Apache NiFi Expression since we are only interested in district.csv

    Fig:- RouteOnAttribute – District API Configuration

    And this time, the InvokeHTTP processor downloads the data using the extracted API endpoint assigned to the attribute data_url surrounded with curly braces and the response data will be in the CSV format.

    Fig:- InvokeHTTP – Download District Data Configuration

    ‍4. Transform and Filter the dataset

    In this stage, the header of the response data is changed to lowercase using the ReplaceText processor with Literal Replace strategy, and the first field name is changed from date to recorded_date to avoid using reserved database keywords.

    Since the data is being updated daily on an incremental basis, we will only extract the data from the previous day using the QueryRecord processor. It will also convert the CSV data into JSON FlowFile using the CSVReader and JsonRecordSetWriter controller services.

    Please note that both the CSVReader and JsonRecordSetWriter services can have the default settings for our use. You can check out this blog for more reading on the controller services.

    And as mentioned, QueryRecord evaluates the below query to get data from the previous day out of the FlowFile and passes it to the next processor.

    select * from FlowFile where recorded_date=’${now():toNumber():minus(86400000):format(‘yyyy-MM-dd’)}’ 

    Fig:- ReplaceHeaders Configuration

    Fig:- QueryRecord Configuration

    ‍5. Establish JDBC connection pool for Hive and create a table

    Let’s set up the Hive JDBC driver for the NiFi flow using HiveConnectinPool with required local/remote configurations (database connection URL, user, and password).  Hive Configuration Resources property expects Hive configuration file path, i.e., hive-site.xml.

    Fig:- HiveConnectionPool Setup

    Now, we need an empty table to load the data from the NiFi flow, and to do so, you can use the DDL structure below:

    CREATE TABLE IF NOT EXISTS demo.districts (recorded_date string, state string, district string, confirmed string, recovered string, deceased string, other string, tested string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

    6. Load data into the Hive table

    In this step, the JSON-formatted FlowFile is converted into an SQL statement using ConvertJSONToSQL to provide a SQL query as the output FlowFile. We can configure the HiveConnectinPool for the JDBC Connection Pool property along with the table name and statement type before running the processor. In this case, the statement would be an insert type since we need to load the data into the table.

    Also, please note that when preparing a SQL command, the SQL Parameter Attribute Prefix property should be hiveql. Otherwise, the very next processor will not be able to identify it and will throw an error.

    Then, on success, PutHiveQL executes the input SQL command and loads the data into the table. The success of this processor marks the end of the workflow and the data can be verified by fetching the target table.

    Fig:- ConvertJSONToSQL Configurations

     

    Fig:- PutHiveQL Configuration

    ‍7. Schedule the flow for daily updates

    You can schedule the entire flow to run at any given time using different NiFi scheduling strategies. Since the first InvokeHTTP is the initiator of this flow, we can configure it to run daily at 2 AM.

    Fig:- Scheduling Strategy

    8. Log Management

    Almost every processor has been directed to the LogAttribute processor with a failure/success queue, which will write the state and information of all used attributes into the NiFi file, logs/nifi-app.log. By checking this file, we can debug and fix the issue in case of any failure. To extend it even further, we can also set up a flow to capture and notify error logs using Apache Kafka over email.

    9. Consume data for analysis

    You can use various open-source visualization tools to start off with the exploratory data analysis on the data stored in the Hive table.

    You can download the template covid_etl_workflow.xml and run it on your machine for reference.

    Future Scope

    There are different ways to build any workflow, and this was one of them. You can take this further by allowing multiple datasets (state_wise, test_datasets) from the list with different combinations of various processors/controllers as a part of the flow. 

    You can also try scraping data from a product listing page of multiple e-commerce websites for a comparison between goods and price or you can even extract movie reviews and ratings from the IMDb website and use it as a recommendation for users. 

    Conclusion

    In this article, we discussed Apache NiFi and created a workflow to extract, filter, transform, and load the data for analysis purposes. If you are more comfortable building logics and want to focus on the architecture with less code, then Apache NiFi is the tool for you.

  • Lessons Learnt While Building an ETL Pipeline for MongoDB & Amazon Redshift Using Apache Airflow

    Recently, I was involved in building an ETL (Extract-Transform-Load) pipeline. It included extracting data from MongoDB collections, perform transformations and then loading it into Redshift tables. Many ETL solutions are available in the market which kind-of solves the issue, but the key part of an ETL process lies in its ability to transform or process raw data before it is pushed to its destination.

    Each ETL pipeline comes with a specific business requirement around processing data which is hard to be achieved using off-the-shelf ETL solutions. This is why a majority of ETL solutions are custom built manually, from scratch. In this blog, I am going to talk about my learning around building a custom ETL solution which involved moving data from MongoDB to Redshift using Apache Airflow.

    Background:

    I began by writing a Python-based command line tool which supported different phases of ETL, like extracting data from MongoDB, processing extracted data locally, uploading the processed data to S3, loading data from S3 to Redshift, post-processing and cleanup. I used the PyMongo library to interact with MongoDB and the Boto library for interacting with Redshift and S3.

    I kept each operation atomic so that multiple instances of each operation can run independently of each other, which will help to achieve parallelism. One of the major challenges was to achieve parallelism while running the ETL tasks. One option was to develop our own framework based on threads or developing a distributed task scheduler tool using a message broker tool like Celery combined with RabbitMQ. After doing some research I settled for Apache Airflow. Airflow is a Python-based scheduler where you can define DAGs (Directed Acyclic Graphs), which would run as per the given schedule and run tasks in parallel in each phase of your ETL. You can define DAG as Python code and it also enables you to handle the state of your DAG run using environment variables. Features like task retries on failure handling are a plus.

    We faced several challenges while getting the above ETL workflow to be near real-time and fault tolerant. We discuss the challenges faced and the solutions below:

    Keeping your ETL code changes in sync with Redshift schema

    While you are building the ETL tool, you may end up fetching a new field from MongoDB, but at the same time, you have to add that column to the corresponding Redshift table. If you fail to do so the ETL pipeline will start failing. In order to tackle this, I created a database migration tool which would become the first step in my ETL workflow.

    The migration tool would:

    • keep the migration status in a Redshift table and
    • would track all migration scripts in a code directory.

    In each ETL run, it would get the most recently ran migrations from Redshift and would search for any new migration script available in the code directory. If found it would run the newly found migration script after which the regular ETL tasks would run. This adds the onus on the developer to add a migration script if he is making any changes like addition or removal of a field that he is fetching from MongoDB.

    Maintaining data consistency

    While extracting data from MongoDB, one needs to ensure all the collections are extracted at a specific point in time else there can be data inconsistency issues. We need to solve this problem at multiple levels:

    • While extracting data from MongoDB define parameters like modified date and extract data from different collections with a filter as records less than or equal to that date. This will ensure you fetch point in time data from MongoDB.
    • While loading data into Redshift tables, don’t load directly to master table, instead load it to some staging table. Once you are done loading data in staging for all related collections, load it to master from staging within a single transaction. This way data is either updated in all related tables or in none of the tables.

    A single bad record can break your ETL

    While moving data across the ETL pipeline into Redshift, one needs to take care of field formats. For example, the Date field in the incoming data can be different than that in the Redshift schema design. Another example can be that the incoming data can exceed the length of the field in the schema. Redshift’s COPY command which is used to load data from files to redshift tables is very vulnerable to such changes in data types. Even a single incorrectly formatted record will lead to all your data getting rejected and effectively breaking the ETL pipeline.

    There are multiple ways in which we can solve this problem. Either handle it in one of the transform jobs in the pipeline. Alternately we put the onus on Redshift to handle these variances. Redshift’s COPY command has many options which can help you solve these problems. Some of the very useful options are

    • ACCEPTANYDATE: Allows any date format, including invalid formats such as 00/00/00 00:00:00, to be loaded without generating an error.
    • ACCEPTINVCHARS: Enables loading of data into VARCHAR columns even if the data contains invalid UTF-8 characters.
    • TRUNCATECOLUMNS: Truncates data in columns to the appropriate number of characters so that it fits the column specification.

    Redshift going out of storage

    Redshift is based on PostgreSQL and one of the common problems is when you delete records from Redshift tables it does not actually free up space. So if your ETL process is deleting and creating new records frequently, then you may run out of Redshift storage space. VACUUM operation for Redshift is the solution to this problem. Instead of making VACUUM operation a part of your main ETL flow, define a different workflow which runs on a different schedule to run VACUUM operation. VACUUM operation reclaims space and resorts rows in either a specified table or all tables in the current database. VACUUM operation can be FULL, SORT ONLY, DELETE ONLY & REINDEX. More information on VACUUM can be found here.

    ETL instance going out of storage

    Your ETL will be generating a lot of files by extracting data from MongoDB onto your ETL instance. It is very important to periodically delete those files otherwise you are very likely to go out of storage on your ETL server. If your data from MongoDB is huge, you might end up creating large files on your ETL server. Again, I would recommend defining a different workflow which runs on a different schedule to run a cleanup operation.

    Making ETL Near Real Time

    Processing only the delta rather than doing a full load in each ETL run

    ETL would be faster if you keep track of the already processed data and process only the new data. If you are doing a full load of data in each ETL run, then the solution would not scale as your data scales. As a solution to this, we made it mandatory for the collection in our MongoDB to have a created and a modified date. Our ETL would check the maximum value of the modified date for the given collection from the Redshift table. It will then generate the filter query to fetch only those records from MongoDB which have modified date greater than that of the maximum value. It may be difficult for you to make changes in your product, but it’s worth the effort!

    Compressing and splitting files while loading

    A good approach is to write files in some compressed format. It saves your storage space on ETL server and also helps when you load data to Redshift. Redshift COPY command suggests that you provide compressed files as input. Also instead of a single huge file, you should split your files into parts and give all files to a single COPY command. This will enable Redshift to use it’s computing resources across the cluster to do the copy in parallel, leading to faster loads.

    Streaming mongo data directly to S3 instead of writing it to ETL server

    One of the major overhead in the ETL process is to write data first to ETL server and then uploading it to S3. In order to reduce disk IO, you should not store data to ETL server. Instead, use MongoDB’s handy stream API. For MongoDB Node driver, both the collection.find() and the collection.aggregate() function return cursors. The stream method also accepts a transform function as a parameter. All your custom transform logic could go into the transform function. AWS S3’s node library’s upload() function, also accepts readable streams. Use the stream from the MongoDB Node stream method, pipe it into zlib to gzip it, then feed the readable stream into AWS S3’s Node library. Simple! You will see a large improvement in your ETL process by this simple but important change.

    Optimizing Redshift Queries

    Optimizing Redshift Queries helps in making the ETL system highly scalable, efficient and also reduce the cost. Lets look at some of the approaches:

    Add a distribution key

    Redshift database is clustered, meaning your data is stored across cluster nodes. When you query for certain set of records, Redshift has to search for those records in each node, leading to slow queries. A distribution key is a single metric, which will decide the data distribution of all data records across your tables. If you have a single metric which is available for all your data, you can specify it as distribution key. When loading data into Redshift, all data for a certain value of distribution key will be placed on a single node of Redshift cluster. So when you query for certain records Redshift knows exactly where to search for your data. This is only useful when you are also using the distribution key to query the data.

    Source: Slideshare

     

    Generating a numeric primary key for string primary key

    In MongoDB, you can have any type of field as your primary key. If your Mongo collections are having a non-numeric primary key and you are using those same keys in Redshift, your joins will end up being on string keys which are slower. Instead, generate numeric keys for your string keys and joining on it which will make queries run much faster. Redshift supports specifying a column with an attribute as IDENTITY which will auto-generate numeric unique value for the column which you can use as your primary key.

    Conclusion:

    In this blog, I have covered the best practices around building ETL pipelines for Redshift  based on my learning. There are many more recommended practices which can be easily found in Redshift and MongoDB documentation.