Tag: data engineering

  • Data Engineering: Beyond Big Data

    When a data project comes to mind, the end goal is to enhance the data. It’s about building systems to curate the data in a way that can help the business.

    At the dawn of their data engineering journey, people tend to familiarize themselves with the terms “extract,” transformation,” and ”loading.” These terms, along with traditional data engineering, spark the image that data engineering is about the processing and movement of large amounts of data. And why not! We’ve witnessed a tremendous evolution in these technologies, from storing information in simple spreadsheets to managing massive data warehouses and data lakes, supported by advanced infrastructure capable of ingesting and processing huge data volumes. 

    However, this doesn’t limit data engineering to ETL; rather, it opens so many opportunities to introduce new technologies and concepts that can and are needed to support big data processing. The expectations from a modern data system extend well beyond mere data movement. There’s a strong emphasis on privacy, especially with the vast amounts of sensitive data that need protection. Speed is crucial, particularly in real-world scenarios like satellite data processing, financial trading, and data processing in healthcare, where eliminating latency is key.

    With technologies like AI and machine learning driving analysis on massive datasets, data volumes will inevitably continue to grow. We’ve seen this trend before, just as we once spoke of megabytes and now regularly discuss gigabytes. In the future, we’ll likely talk about terabytes and petabytes with the same familiarity.

    These growing expectations have made data engineering a sphere with numerous supporting components, and in this article, we’ll delve into some of those components.

    • Data governance
    • Metadata management
    • Data observability
    • Data quality
    • Orchestration
    • Visualization

    Data Governance

    With huge amounts of confidential business and user data moving around, it’s a very delicate process to handle it safely. We must ensure trust in data processes, and the data itself can not be compromised. It is essential for a business onboarding users to show that their data is in safe hands. In today’s time, when a business needs sensitive information from you, you’ll be bound to ask questions such as:

    • What if my data is compromised?
    • Are we putting it to the right use?
    • Who’s in control of this data? Are the right personnel using it?
    • Is it compliant to the rules and regulations for data practices?

    So, to answer these questions satisfactorily, data governance comes into the picture. The basic idea of data governance is that it’s a set of rules, policies, principles, or processes to maintain data integrity. It’s about how we can supervise our data and keep it safe. Think of data governance as a protective blanket that takes care of all the security risks, creates a habitable environment for data, and builds trust in data processing.

    Data governance is very strong equipment in the data engineering arsenal. These rules and principles are consistently applied throughout all data processing activities. Wherever data flows, data governance ensures that data adheres to these established protocols. By adding a sense of trust to the activities involving data, you gain the freedom to focus on your data solution without worrying about any external or internal risks. This helps in reaching the ultimate goal—to foster a culture that prioritizes and emphasizes data responsibility.

    Understanding the extensive application of data governance in data engineering clearly illustrates its significance and where it needs to be implemented in real-world scenarios. In numerous entities, such as government organizations or large corporations, data sensitivity is a top priority. Misuse of this data can have widespread negative impacts. To ensure that it doesn’t happen, we can use tools to ensure oversight and compliance. Let’s briefly explore one of those tools.

    Microsoft Purview

    Microsoft Purview comes with a range of solutions to protect your data. Let’s look at some of its offerings.

    • Insider risk management
      • Microsoft purview takes care of data security risks from people inside your organization by identifying high-risk individuals.
      • It helps you classify data breaches into different sections and take appropriate action to prevent them.
    • Data loss prevention
      • It makes applying data loss prevention policies straightforward.
      • It secures data by restricting important and sensitive data from being deleted and blocks unusual activities, like sharing sensitive data outside your organization.
    • Compliance adherence
      • Microsoft Purview can help you make sure that your data processes are compliant with data regulatory bodies and organizational standards.
    • Information protection
      • It provides granular control over data, allowing you to define strict accessibility rules.
      • When you need to manage what data can be shared with specific individuals, this control restricts the data visible to others.
    • Know your sensitive data
      • It simplifies the process of understanding and learning about your data.
      • MS Purview features ML-based classifiers that label and categorize your sensitive data, helping you identify its specific category.

    Metadata Management

    Another essential aspect of big data movement is metadata management. 

    Metadata, simply put, is data about data. This component of data engineering makes a base for huge improvements in data systems.

    You might have come across this headline a while back, which also reappeared recently.

    This story is from about a decade ago, and it tells us about metadata’s longevity and how it became a base for greater things.

    At the time, Instagram showed the number of likes by running a count function on the database and storing it in a cache. This method was fine because the number wouldn’t change frequently, so the request would hit the cache and get the result. Even if the number changed, the request would query the data, and because the number was small, it wouldn’t scan a lot of rows, saving the data system from being overloaded.

    However, when a celebrity posted something, it’d receive so many likes that the count would be enormous and change so frequently that looking into the cache became just an extra step.

    The request would trigger a query that would repeatedly scan many rows in the database, overloading the system and causing frequent crashes.

    To deal with this, Instagram came up with the idea of denormalizing the tables and storing the number of likes for each post. So, the request would result in a query where the database needs to look at only one cell to get the number of likes. To handle the issue of frequent changes in the number of likes, Instagram began updating the value at small intervals. This story tells how Instagram solved this problem with a simple tweak of using metadata. 

    Metadata in data engineering has evolved to solve even more significant problems by adding a layer on top of the data flow that works as an interface to communicate with data. Metadata management has become a foundation of multiple data features such as:

    • Data lineage: Stakeholders are interested in the results we get from data processes. Sometimes, in order to check the authenticity of data and get answers to questions like where the data originated from, we need to track back to the data source. Data lineage is a property that makes use of metadata to help with this scenario. Many data products like Atlan and data warehouses like Snowflake extensively use metadata for their services.
    • Schema information: With a clear understanding of your data’s structure, including column details and data types, we can efficiently troubleshoot and resolve data modeling challenges.
    • Data contracts: Metadata helps honor data contacts by keeping a common data profile, which maintains a common data structure across all data usages.
    • Stats: Managing metadata can help us easily access data statistics while also giving us quick answers to questions like what the total count of a table is, how many distinct records there are, how much space it takes, and many more.
    • Access control: Metadata management also includes having information about data accessibility. As we encountered it in the MS Purview features, we can associate a table with vital information and restrict the visibility of a table or even a column to the right people.
    • Audit: Keeping track of information, like who accessed the data, who modified it, or who deleted it, is another important feature that a product with multiple users can benefit from.

    There are many other use cases of metadata that enhance data engineering. It’s positively impacting the current landscape and shaping the future trajectory of data engineering. A very good example is a data catalog. Data catalogs focus on enriching datasets with information about data. Table formats, such as Iceberg and Delta, use catalogs to provide integration with multiple data sources, handle schema evolution, etc. Popular cloud services like AWS Glue also use metadata for features like data discovery. Tech giants like Snowflake and Databricks rely heavily on metadata for features like faster querying, time travel, and many more. 

    With the introduction of AI in the data domain, metadata management has a huge effect on the future trajectory of data engineering. Services such as Cortex and Fabric have integrated AI systems that use metadata for easy questioning and answering. When AI gets to know the context of data, the application of metadata becomes limitless.

    Data Observability

    We know how important metadata can be, and while it’s important to know your data, it’s as important to know about the processes working on it. That’s where observability enters the discussion. It is another crucial aspect of data engineering and a component we can’t miss from our data project. 

    Data observability is about setting up systems that can give us visibility over different services that are working on the data. Whether it’s ingestion, processing, or load operations, having visibility into data movement is essential. This not only ensures that these services remain reliable and fully operational, but it also keeps us informed about the ongoing processes. The ultimate goal is to proactively manage and optimize these operations, ensuring efficiency and smooth performance. We need to achieve this goal because it’s very likely that whenever we create a data system, multiple issues, as well as errors and bugs, will start popping out of nowhere.

    So, how do we keep an eye on these services to see whether they are performing as expected? The answer to that is setting up monitoring and alerting systems.

    Monitoring

    Monitoring is the continuous tracking and measurement of key metrics and indicators that tells us about the system’s performance. Many cloud services offer comprehensive performance metrics, presented through interactive visuals. These tools provide valuable insights, such as throughput, which measures the volume of data processed per second, and latency, which indicates how long it takes to process the data. They track errors and error rates, detailing the types and how frequently they happen.

    To lay the base for monitoring, there are tools like Prometheus and Datadog, which provide us with these monitoring features, indicating the performance of data systems and the system’s infrastructure. We also have Graylog, which gives us multiple features to monitor logs of a system, that too in real-time.

    Now that we have the system that gives us visibility into the performance of processes, we need a setup that can tell us about them if anything goes sideways, a setup that can notify us. 

    Alerting

    Setting up alerting systems allows us to receive notifications directly within the applications we use regularly, eliminating the need for someone to constantly monitor metrics on a UI or watch graphs all day, which would be a waste of time and resources. This is why alerting systems are designed to trigger notifications based on predefined thresholds, such as throughput dropping below a certain level, latency exceeding a specific duration, or the occurrence of specific errors. These alerts can be sent to channels like email or Slack, ensuring that users are immediately aware of any unusual conditions in their data processes.

    Implementing observability will significantly impact data systems. By setting up monitoring and alerting, we can quickly identify issues as they arise and gain context about the nature of the errors. This insight allows us to pinpoint the source of problems, effectively debug and rectify them, and ultimately reduce downtime and service disruptions, saving valuable time and resources.

    Data Quality

    Knowing the data and its processes is undoubtedly important, but all this knowledge is futile if the data itself is of poor quality. That’s where the other essential component of data engineering, data quality, comes into play because data processing is one thing; preparing the data for processing is another.

    In a data project involving multiple sources and formats, various discrepancies are likely to arise. These can include missing values, where essential data points are absent; outdated data, which no longer reflects current information; poorly formatted data that doesn’t conform to expected standards; incorrect data types that lead to processing errors; and duplicate rows that skew results and analyses. Addressing these issues will ensure the accuracy and reliability of the data used in the project.

    Data quality involves enhancing data with key attributes. For instance, accuracy measures how closely the data reflects reality, validity ensures that the data accurately represents what we aim to measure, and completeness guarantees that no critical data is missing. Additionally, attributes like timeliness ensure the data is up to date. Ultimately, data quality is about embedding attributes that build trust in the data. For a deeper dive into this, check out Rita’s blog on Data QA: The Need of the Hour.

    Data quality plays a crucial role in elevating other processes in data engineering. In a data engineering project, there are often multiple entry points for data processing, with data being refined at different stages to achieve a better state each time. Assessing data at the source of each processing stage and addressing issues early on is vital. This approach ensures that data standards are maintained throughout the data flow. As a result, by making data consistent at every step, we gain improved control over the entire data lifecycle. 

    Data tools like Great Expectations and data unit test libraries such as Deequ play a crucial role in safeguarding data pipelines by implementing data quality checks and validations. To gain more context on this, you might want to read Unit Testing Data at Scale using Deequ and Apache Spark by Nishant. These tools ensure that data meets predefined standards, allowing for early detection of issues and maintaining the integrity of data as it moves through the pipeline.

    Orchestration

    With so many processes in place, it’s essential to ensure everything happens at the right time and in the right way. Relying on someone to manually trigger processes at scheduled times every day is an inefficient use of resources. For that individual, performing the same repetitive tasks can quickly become monotonous. Beyond that, manual execution increases the risk of missing schedules or running tasks out of order, disrupting the entire workflow.

    This is where orchestration comes to the rescue, automating tedious, repetitive tasks and ensuring precision in the timing of data flows. Data pipelines can be complex, involving many interconnected components that must work together seamlessly. Orchestration ensures that each component follows a defined set of rules, dictating when to start, what to do, and how to contribute to the overall process of handling data, thus maintaining smooth and efficient operations.

    This automation helps reduce errors that could occur with manual execution, ensuring that data processes remain consistent by streamlining repetitive tasks. With a number of different orchestration tools and services in place, we can now monitor and manage everything from a single platform. Tools like Airflow, an open-source orchestrator, Prefect, which offers a user-friendly drag-and-drop interface, and cloud services such as Azure Data Factory, Google Cloud Composer, and AWS Step Functions, enhance our visibility and control over the entire process lifecycle, making data management more efficient and reliable. Don’t miss Shreyash’s excellent blog on Mage: Your New Go-To Tool for Data Orchestration.

    Orchestration is built on a foundation of multiple concepts and technologies that make it robust and fail-safe. These underlying principles ensure that orchestration not only automates processes but also maintains reliability and resilience, even in complex and demanding data environments.

    • Workflow definition: This defines how tasks in the pipeline are organized and executed. It lays out the sequence of tasks—telling it what needs to be finished before other tasks can start—and takes care of other conditions for pipeline execution. Think of it like a roadmap that guides the flow of tasks.
    • Task scheduling: This determines when and how tasks are executed. Tasks might run at specific times, in response to events, or based on the completion of other tasks. It’s like scheduling appointments for tasks to ensure they happen at the right time and with the right resources.
    • Dependency management: Since tasks often rely on each other, with the concepts of dependency management, we can ensure that tasks run in the correct order. It ensures that each process starts only when its prerequisites are met, like waiting for a green light before proceeding.

    With these concepts, orchestration tools provide powerful features for workflow design and management, enabling the definition of complex, multi-step processes. They support parallel, sequential, and conditional execution of tasks, allowing for flexibility in how workflows are executed. Not just that, they also offer event-driven and real-time orchestration, enabling systems to respond to dynamic changes and triggers as they occur. These tools also include robust error handling and exception management, ensuring that workflows are resilient and fault-tolerant.

    Visualization

    The true value lies not just in collecting vast amounts of data but in interpreting it in ways that generate real business value, and this makes visualization of data a vital component to provide a clear and accurate representation of data that can be easily understood and utilized by decision-makers. The presentation of data in the right way enables businesses to get intelligence from data, which makes data engineering worth the investment and this is what guides strategic decisions, optimizes operations, and gives power to innovation. 

    Visualizations allow us to see patterns, trends, and anomalies that might not be apparent in raw data. Whether it’s spotting a sudden drop in sales, detecting anomalies in customer behavior, or forecasting future performance, data visualization can provide the clear context needed to make well-informed decisions. When numbers and graphs are presented effectively, it feels as though we are directly communicating with the data, and this language of communication bridges the gap between technical experts and business leaders.

    Visualization Within ETL Processes

    Visualization isn’t just a final output. It can also be a valuable tool within the data engineering process itself. Intermediate visualization during the ETL workflow can be a game-changer. In collaborative teams, as we go through the transformation process, visualizing it at various stages helps ensure the accuracy and relevance of the result. We can understand the datasets better, identify issues or anomalies between different stages, and make more informed decisions about the transformations needed.

    Technologies like Fabric and Mage enable seamless integration of visualizations into ETL pipelines. These tools empower team members at all levels to actively engage with data, ask insightful questions, and contribute to the decision-making process. Visualizing datasets at key points provides the flexibility to verify that data is being processed correctly, develop accurate analytical formulas, and ensure that the final outputs are meaningful.

    Depending on the industry and domain, there are various visualization tools suited to different use cases. For example, 

    • For real-time insights, which are crucial in industries like healthcare, financial trading, and air travel, tools such as Tableau and Striim are invaluable. These tools allow for immediate visualization of live data, enabling quick and informed decision-making.
    • For broad data source integrations and dynamic dashboard querying, often demanded in the technology sector, tools like Power BI, Metabase, and Grafana are highly effective. These platforms support a wide range of data sources and offer flexible, interactive dashboards that facilitate deep analysis and exploration of data.

    It’s Limitless

    We are seeing many advancements in this domain, which are helping businesses, data science, AI and ML, and many other sectors because the potential of data is huge. If a business knows how to use data, it can be a major factor in its success. And for that reason, we have constantly seen the rise of different components in data engineering. All with one goal: to make data useful.

    Recently, we’ve witnessed the introduction of numerous technologies poised to revolutionize the data engineering domain. Concepts like data mesh are enhancing data discovery, improving data ownership, and streamlining data workflows. AI-driven data engineering is rapidly advancing, with expectations to automate key processes such as data cleansing, pipeline optimization, and data validation. We’re already seeing how cloud data services have evolved to embrace AI and machine learning, ensuring seamless integration with data initiatives. The rise of real-time data processing brings new use cases and advancements, while practices like DataOps foster better collaboration among teams. Take a closer look at the modern data stack in Shivam’s detailed article, Modern Data Stack: The What, Why, and How?

    These developments are accompanied by a wide array of technologies designed to support infrastructure, analytics, AI, and machine learning, alongside enterprise tools that lay the foundation for this ongoing evolution. All these elements collectively set the stage for a broader discussion on data engineering and what lies beyond big data. Big data, supported by these satellite activities, aims to extract maximum value from data, unlocking its full potential.

    References:

    1. Velotio – Data Engineering Blogs
    2. Firstmark
    3. MS Purview Data Security
    4. Tech Target – Article on data quality
    5. Splunk – Data Observability: The Complete Introduction
    6. Instagram crash story – WIRED

  • Data QA: The Need of the Hour

    Have you ever encountered vague or misleading data analytics reports? Are you struggling to provide accurate data values to your end users? Have you ever experienced being misdirected by a geographical map application, leading you to the wrong destination? Imagine Amazon customers expressing dissatisfaction due to receiving the wrong product at their doorstep.

    These issues stem from the use of incorrect or vague data by application/service providers. The need of the hour is to address these challenges by enhancing data quality processes and implementing robust data quality solutions. Through effective data management and validation, organizations can unlock valuable insights and make informed decisions.

    “Harnessing the potential of clean data is like painting a masterpiece with accurate brushstrokes.”

    Introduction

    Data quality assurance (QA) is the systematic approach organizations use to ensure they have reliable, correct, consistent, and relevant data. It involves various methods, approaches, and tools to maintain good data quality from commencement to termination.

    What is Data Quality?

    Data quality refers to the overall utility of a dataset and its ability to be easily processed and analyzed for other uses. It is an integral part of data governance that ensures your organization’s data is fit for purpose. 

    How can I measure Data Quality?

                                                 

    What is the critical importance of Data Quality?

    Remember, good data is super important! So, invest in good data—it’s the secret sauce for business success!

    What are the Data Quality Challenges?

    1. Data quality issues on production:

    Production-specific data quality issues are primarily caused by unexpected changes in the data and infrastructure failures.

    A. Source and third-party data changes:

    External data sources, like websites or companies, may introduce errors or inconsistencies, making it challenging to use the data accurately. These issues can lead to system errors or missing values, which might go unnoticed without proper monitoring.

    Example:

    • File formats change without warning:

    Imagine we’re using an API to get data in CSV format, and we’ve made a pipeline that handles it well.

    import csv
    
    def process_csv_data(csv_file):
        with open(csv_file, 'r') as file:
            csv_reader = csv.DictReader(file)
            for row in csv_reader:
                print(row)
    
    csv_file = 'data.csv'
    process_csv_data(csv_file)

    The data source switched to using the JSON format, breaking our pipeline. This inconsistency can cause errors or missing data if our system can’t adapt. Monitoring and adjustments will ensure the accuracy of data analysis or applications.

    • Malformed data values and schema changes:

    Suppose we’re handling inventory data for an e-commerce site. The starting schema for your inventory dataset might have fields like:

    Now, imagine that the inventory file’s schema changed suddenly. A “quantity” column has been renamed to “qty,” and the last_updated_at timestamp format switches to epoch timestamp.

    This change might not be communicated in advance, leaving our data pipeline unprepared to handle the new field and time format.

    B. Infrastructure failures:

    Reliable software is crucial for processing large data volumes, but even the best tools can encounter issues. Infrastructure failures, like glitches or overloads, can disrupt data processing regardless of the software used.

    Solution: 

    Data observability tools such as Monte Carlo, BigEye, and Great Expectations help detect these issues by monitoring for changes in data quality and infrastructure performance. These tools are essential for identifying and alerting the root causes of data problems, ensuring data reliability in production environments.

    2. Data quality issues during development:

    Development-specific data quality issues are primarily caused by untested code changes.

    A. Incorrect parsing of data:

    Data transformation bugs can occur due to mistakes in code or parsing, leading to data type mismatches or schema inaccuracies.

    Example:

    Imagine we’re converting a date string (“YYYY-MM-DD”) to a Unix epoch timestamp using Python. But misunderstanding the strptime() function’s format specifier leads to unexpected outcomes.

    from datetime import datetime
    
    timestamp_str = "2024-05-10" # %Y-%d-%m correct format from incoming data
    
    # Incorrectly using '%d' for month (should be '%m')
    format_date = "%Y-%m-%d" 
    timestamp_dt = datetime.strptime(timestamp_str, format_date)
    
    epoch_seconds = int(timestamp_dt.timestamp())

    This error makes strptime() interpret “2024” as the year, “05” as the month (instead of the day), and “10” as the day (instead of the month), leading to inaccurate data in the timestamp_dt variable.

    B. Misapplied or misunderstood requirements:

    Even with the right code, data quality problems can still occur if requirements are misunderstood, resulting in logic errors and data quality issues.

    Example:
    Imagine we’re assigned to validate product prices in a dataset, ensuring they fall between $10 and $100.

    product_prices = [10, 5, 25, 50, 75, 110]
    valid_prices = []
    
    for price in product_prices:
        if price >= 10 and price <= 100:
            valid_prices.append(price)
    
    print("Valid prices:", valid_prices)

    The requirement states prices should range from $10 to $100. But a misinterpretation leads the code to check if prices are >= $10 and <= $100. This makes $10 valid, causing a data quality problem.

    C. Unaccounted downstream dependencies:

    Despite careful planning and logic, data quality incidents can occur due to overlooked dependencies. Understanding data lineage and communicating effectively across all users is crucial to preventing such incidents.

    Example:

    Suppose we’re working on a database schema migration project for an e-commerce system. In the process, we rename the order_date column to purchase_date in the orders table. Despite careful planning and testing, a data quality issue arises due to an overlooked downstream dependency. The marketing team’s reporting dashboard relies on a SQL query referencing the order_date column, now renamed purchase_date, resulting in inaccurate reporting and potentially misinformed business decisions.

    Here’s an example SQL query that represents the overlooked downstream dependency:

    -- SQL query used by the marketing team's reporting dashboard
    SELECT 
        DATE_TRUNC('month', order_date) AS month,
        SUM(total_amount) AS total_sales
    FROM 
        orders
    GROUP BY 
        DATE_TRUNC('month', order_date)

    This SQL query relies on the order_date column to calculate monthly sales metrics. After the schema migration, this column no longer exists, causing query failure and inaccurate reporting.

    Solutions:

    Data Quality tools like Great Expectations and Deequ proactively catch data quality issues by testing changes introduced from data-processing code, preventing issues from reaching production.

    a. Testing assertions: Assertions validate data against expectations, ensuring data integrity. While useful, they require careful maintenance and should be selectively applied.

    Example:
    Suppose we have an “orders” table in your dbt project and need to ensure the “total_amount” column contains only numeric values; we can write a dbt test to validate this data quality rule.

    version: 2
    
    models:
      - name: orders
        columns:
          - name: total_amount
            tests:
              - data_type: numeric

    In this dbt test code:

    • We specify the dbt version (version: 2), model named “orders,” and “total_amount” column.
    • Within the “total_amount” column definition, we add a test named “data_type” with the value “numeric,” ensuring the column contains only numeric data.
    • Running the dbt test command will execute this test, checking if the “total_amount” column adheres to the numeric data type. Any failure indicates a data quality issue.

    b. Comparing staging and production data: Data Diff is a CLI tool that compares datasets within or across databases, highlighting changes in data similar to how git diff highlights changes in source code. Aiding in detecting data quality issues early in the development process.

    Here’s a data-diff example between staging and production databases for the payment_table.

    data-diff 
      staging_db_connection 
      staging_payment_table 
      production_db_connection 
      production_payment_table 
      -k primary_key 
      -c “payment_amount, payment_type, payment_currency” 
      -w filter_condition(optional)

    Source: https://docs.datafold.com/data_diff/what_is_data_diff

    What are some best practices for maintaining high-quality data?

    1. Establish Data Standards: Define clear data standards and guidelines for data collection, storage, and usage to ensure consistency and accuracy across the organization.
    2. Data Validation: Implement validation checks to ensure data conforms to predefined rules and standards, identifying and correcting errors early in the data lifecycle.
    3. Regular Data Cleansing: Schedule regular data cleansing activities to identify and correct inaccuracies, inconsistencies, and duplicates in the data, ensuring its reliability and integrity over time.
    4. Data Governance: Establish data governance policies and procedures to manage data assets effectively, including roles and responsibilities, data ownership, access controls, and compliance with regulations.
    5. Metadata Management: Maintain comprehensive metadata to document data lineage, definitions, and usage, providing transparency and context for data consumers and stakeholders.
    6. Data Security: Implement robust data security measures to protect sensitive information from unauthorized access, ensuring data confidentiality, integrity, and availability.
    7. Data Quality Monitoring: Continuously monitor data quality metrics and KPIs to track performance, detect anomalies, and identify areas for improvement, enabling proactive data quality management.
    8. Data Training and Awareness: Provide data training and awareness programs for employees to enhance their understanding of data quality principles, practices, and tools, fostering a data-driven culture within the organization.
    9. Collaboration and Communication: Encourage collaboration and communication among stakeholders, data stewards, and IT teams to address data quality issues effectively and promote accountability and ownership of data quality initiatives.
    10. Continuous Improvement: Establish a culture of continuous improvement by regularly reviewing and refining data quality processes, tools, and strategies based on feedback, lessons learned, and evolving business needs.

    Can you recommend any tools for improving data quality?

    1. AWS Deequ: AWS Deequ is an open-source data quality library built on top of Apache Spark. It provides tools for defining data quality rules and validating large-scale datasets in Spark-based data processing pipelines.
    1. Great Expectations: GX Cloud is a fully managed SaaS solution that simplifies deployment, scaling, and collaboration and lets you focus on data validation. 

    1. Soda: Soda allows data engineers to test data quality early and often in pipelines to catch data quality issues before they have a downstream impact.
    1. Datafold: Datafold is a cloud-based data quality platform that automates and simplifies the process of monitoring and validating data pipelines. It offers features such as automated data comparison, anomaly detection, and integration with popular data processing tools like dbt.

    Considerations for Selecting a Data QA Tool:

    Selecting a data QA (Quality Assurance) tool hinges on your specific needs and requirements. Consider factors such as: 

    1. Scalability and Performance: Ensure the tool can handle current and future data volumes efficiently, with real-time processing capabilities. some text

    Example: Great Expectations help validate data in a big data environment by providing a scalable and customizable way to define and monitor data quality across different sources

    2. Data Profiling and Cleansing Capabilities: Look for comprehensive data profiling and cleansing features to detect anomalies and improve data quality.some text

    Example: AWS Glue DataBrew offers profiling, cleaning and normalizing, creating map data lineage, and automating data cleaning and normalization tasks.

    3. Data Monitoring Features: Choose tools with continuous monitoring capabilities, allowing you to track metrics and establish data lineage.some text

    Example: Datafold’s monitoring feature allows data engineers to write SQL commands to find anomalies and create automated alerts.

    4. Seamless Integration with Existing Systems: Select a tool compatible with your existing systems to minimize disruption and facilitate seamless integration.some text

    Example: dbt offers seamless integration with existing data infrastructure, including data warehouses and BI tools. It allows users to define data transformation pipelines using SQL, making it compatible with a wide range of data systems.

    5. User-Friendly Interface: Prioritize tools with intuitive interfaces for quick adoption and minimal training requirements.some text

    Example: Soda SQL is an open-source tool with a simple command line interface (CLI) and Python library to test your data through metric collection.

    6. Flexibility and Customization Options: Seek tools that offer flexibility to adapt to changing data requirements and allow customization of rules and workflows.some text

    Example: dbt offers flexibility and customization options for defining data transformation workflows. 

    7. Vendor Support and Community: Evaluate vendors based on their support reputation and active user communities for shared knowledge and resources.some text

    Example: AWS Deequ is supported by Amazon Web Services (AWS) and has an active community of users. It provides comprehensive documentation, tutorials, and forums for users to seek assistance and share knowledge about data quality best practices.

    8. Pricing and Licensing Options: Consider pricing models that align with your budget and expected data usage, such as subscription-based or volume-based pricing. some text

    Example: Great Expectations offers flexible pricing and licensing options, including both open-source (freely available) and enterprise editions(subscription-based).

    Ultimately, the right tool should effectively address your data quality challenges and seamlessly fit into your data infrastructure and workflows.

    Conclusion: The Vital Role of Data Quality

    In conclusion, data quality is paramount in today’s digital age. It underpins informed decisions, strategic formulation, and business success. Without it, organizations risk flawed judgments, inefficiencies, and competitiveness loss. Recognizing its vital role empowers businesses to drive innovation, enhance customer experiences, and achieve sustainable growth. Investing in robust data management, embracing technology, and fostering data integrity are essential. Prioritizing data quality is key to seizing new opportunities and staying ahead in the data-driven landscape.

    References:

    https://docs.getdbt.com/docs/build/data-tests

    https://aws.amazon.com/blogs/big-data/test-data-quality-at-scale-with-deequ

    https://www.soda.io/resources/introducing-soda-sql

  • Mage: Your New Go-To Tool for Data Orchestration

    In our journey to automate data pipelines, we’ve used tools like Apache Airflow, Dagster, and Prefect to manage complex workflows. However, as data automation continues to change, we’ve added a new tool to our toolkit: Mage AI.

    Mage AI isn’t just another tool; it’s a solution to the evolving demands of data automation. This blog aims to explain how Mage AI is changing the way we automate data pipelines by addressing challenges and introducing innovative features. Let’s explore this evolution, understand the problems we face, and see why we’ve adopted Mage AI.

    What is Mage AI?

    Mage is a user-friendly open-source framework created for transforming and merging data. It’s a valuable tool for developers handling substantial data volumes efficiently. At its heart, Mage relies on “data pipelines,” made up of code blocks. These blocks can run independently or as part of a larger pipeline. Together, these blocks form a structure known as a directed acyclic graph (DAG), which helps manage dependencies. For example, you can use Mage for tasks like loading data, making transformations, or exportation.

    Mage Architecture:

    Before we delve into Mage’s features, let’s take a look at how it works.

    When you use Mage, your request begins its journey in the Mage Server Container, which serves as the central hub for handling requests, processing data, and validation. Here, tasks like data processing and real-time interactions occur. The Scheduler Process ensures tasks are scheduled with precision, while Executor Containers, designed for specific tasks like Python or AWS, carry out the instructions.

    Mage’s scalability is impressive, allowing it to handle growing workloads effectively. It can expand both vertically and horizontally to maintain top-notch performance. Mage efficiently manages data, including code, data, and logs, and takes security seriously when handling databases and sensitive information. This well-coordinated system, combined with Mage’s scalability, guarantees reliable data pipelines, blending technical precision with seamless orchestration.

    Scaling Mage:

    To enhance Mage’s performance and reliability as your workload expands, it’s crucial to scale its architecture effectively. In this concise guide, we’ll concentrate on four key strategies for optimizing Mage’s scalability:

    1. Horizontal Scaling: Ensure responsiveness by running multiple Mage Server and Scheduler instances. This approach keeps the system running smoothly, even during peak usage.
    2. Multiple Executor Containers: Deploy several Executor Containers to handle concurrent task execution. Customize them for specific executors (e.g., Python, PySpark, or AWS) to scale task processing horizontally as needed.
    3. External Load Balancers: Utilize external load balancers to distribute client requests across Mage instances. This not only boosts performance but also ensures high availability by preventing overloading of a single server.
    4. Scaling for Larger Datasets: To efficiently handle larger datasets, consider:

    a. Allocating more resources to executors, empowering them to tackle complex data transformations.

    b. Mage supports direct data warehouse transformation and native Spark integration for massive datasets.

    Features: 

    1) Interactive Coding Experience

    Mage offers an interactive coding experience tailored for data preparation. Each block in the editor is a modular file that can be tested, reused, and chained together to create an executable data pipeline. This means you can build your data pipeline piece by piece, ensuring reliability and efficiency.

    2) UI/IDE for Building and Managing Data Pipelines

    Mage takes data pipeline development to the next level with a user-friendly integrated development environment (IDE). You can build and manage your data pipelines through an intuitive user interface, making the process efficient and accessible to both data scientists and engineers.

    3) Multiple Languages Support

    Mage supports writing pipelines in multiple languages such as Python, SQL, and R. This language versatility means you can work with the languages you’re most comfortable with, making your data preparation process more efficient.

    4) Multiple Types of Pipelines

    Mage caters to diverse data pipeline needs. Whether you require standard batch pipelines, data integration pipelines, streaming pipelines, Spark pipelines, or DBT pipelines, Mage has you covered.

    5) Built-In Engineering Best Practices

    Mage is not just a tool; it’s a promoter of good coding practices. It enables reusable code, data validation in each block, and operationalizes data pipelines with built-in observability, data quality monitoring, and lineage. This ensures that your data pipelines are not only efficient but also maintainable and reliable.

    6) Dynamic Blocks

    Dynamic blocks in Mage allow the output of a block to dynamically create additional blocks. These blocks are spawned at runtime, with the total number of blocks created being equal to the number of items in the output data of the dynamic block multiplied by the number of its downstream blocks.

    7) Triggers

    • Schedule Triggers: These triggers allow you to set specific start dates and intervals for pipeline runs. Choose from daily, weekly, or monthly, or even define custom schedules using Cron syntax. Mage’s Schedule Triggers put you in control of when your pipelines execute.
    • Event Triggers: With Event Triggers, your pipelines respond instantly to specific events, such as the completion of a database query or the creation of a new object in cloud storage services like Amazon S3 or Google Storage. Real-time automation at your fingertips.
    • API Triggers: API Triggers enable your pipelines to run in response to specific API calls. Whether it’s customer requests or external system interactions, these triggers ensure your data workflows stay synchronized with the digital world.

    Different types of Block: 

    Data Loader: Within Mage, Data Loaders are ready-made templates designed to seamlessly link up with a multitude of data sources. These sources span from Postgres, Bigquery, Redshift, and S3 to various others. Additionally, Mage allows for the creation of custom data loaders, enabling connections to APIs. The primary role of Data Loaders is to facilitate the retrieval of data from these designated sources.

    Data Transformer: Much like Data Loaders, Data Transformers provide predefined functions such as handling duplicates, managing missing data, and excluding specific columns. Alternatively, you can craft your own data transformations or merge outputs from multiple data loaders to preprocess and sanitize the data before it advances through the pipeline.

    Data Exporter: Data Exporters within Mage empower you to dispatch data to a diverse array of destinations, including databases, data lakes, data warehouses, or local storage. You can opt for predefined export templates or craft custom exporters tailored to your precise requirements.

    Custom Blocks: Custom blocks in the Mage framework are incredibly flexible and serve various purposes. They can store configuration data and facilitate its transmission across different pipeline stages. Additionally, they prove invaluable for logging purposes, allowing you to categorize and visually distinguish log entries for enhanced organization.

    Sensor: A Sensor, a specialized block within Mage, continuously assesses a condition until it’s met or until a specified time duration has passed. When a block depends on a sensor, it remains inactive until the sensor confirms that its condition has been satisfied. Sensors are especially valuable when there’s a need to wait for external dependencies or handle delayed data before proceeding with downstream tasks.

    Getting Started with Mage

    There are two ways to run mage, either using docker or using pip:
    Docker Command

    Create a new working directory where all the mage files will be stored.

    Then, in that working directory, execute this command:

    Windows CMD: 

    docker run -it -p 6789:6789 -v %cd%:/home/src mageai/mageai /app/run_app.sh mage start [project_name]

    Linux CMD:

    docker run -it -p 6789:6789 -v $(pwd):/home/src mageai/mageai /app/run_app.sh mage start [project_name]

    Using Pip (Working directory):

    pip install mage-ai

    Mage start [project_name]

    You can browse to http://localhost:6789/overview to get to the Mage UI.

    Let’s build our first pipelineto fetch CSV files from the API for data loading, do some useful transformations, and export that data to our local database.

    Dataset invoices CSV files stored in the current directory of columns:

     (1) First Name; (2) Last Name; (3) E-mail; (4) Product ID; (5) Quantity; (6) Amount; (7) Invoice Date; (8) Address; (9) City; (10) Stock Code

    Create a new pipeline and select a standard batch (we’ll be implementing a batch pipeline) from the dashboard and give it a unique ID.

    Project structure:

    ├── mage_data

    └── [project_name]

        ├── charts

        ├── custom

        ├── data_exporters

        ├── data_loaders

        ├── dbt

        ├── extensions

        ├── pipelines

        │   └── [pipeline_name]

        │       ├── __init__.py

        │       └── metadata.yaml

        ├── scratchpads

        ├── transformers

        ├── utils

        ├── __init__.py

        ├── io_config.yaml

        ├── metadata.yaml

        └── requirements.txt

    This pipeline consists of all the block files, including data loader, transformer, charts, and configuration files for our pipeline io_config.yaml and metadata.yaml files. All block files will contain decorators’ inbuilt function where we will be writing our code.

    1. We begin by loading a CSV file from our local directory, specifically located at /home/src/invoice.csv. To achieve this, we select the “Local File” option from the Templates dropdown and configure the Data Loader block accordingly. Running this configuration will allow us to confirm if the CSV file loads successfully.

    2. In the next step, we introduce a Transformer block using a generic template. On the right side of the user interface, we can observe the directed acyclic graph (DAG) tree. To establish the data flow, we edit the parent of the Transformer block, linking it either directly to the Data Loader block or via the user interface.

    The Transformer block operates on the data frame received from the upstream Data Loader block, which is passed as the first argument to the Transformer function.

    3. Our final step involves exporting the DataFrame to a locally hosted PostgreSQL database. We incorporate a Data Export block and connect it to the Transformer block.

    To establish a connection with the PostgreSQL database, it is imperative to configure the database credentials in the io_config.yaml file. Alternatively, these credentials can be added to environmental variables.

    With these steps completed, we have successfully constructed a foundational batch pipeline. This pipeline efficiently loads, transforms, and exports data, serving as a fundamental building block for more advanced data processing tasks.

    Mage vs Other tools:

    Consistency Across Environments: Some orchestration tools may exhibit inconsistencies between local development and production environments due to varying configurations. Mage tackles this challenge by providing a consistent and reproducible workflow environment through a single configuration file that can be executed uniformly across different environments.

    Reusability: Achieving reusability in workflows can be complex in some tools. Mage simplifies this by allowing tasks and workflows to be defined as reusable components within a Magefile, making it effortless to share them across projects and teams.

    Data Passing: Efficiently passing data between tasks can be a challenge in certain tools, especially when dealing with large datasets. Mage streamlines data passing through straightforward function arguments and returns, enabling seamless data flow and versatile data handling.

    Testing: Some tools lack user-friendly testing utilities, resulting in manual testing and potential coverage gaps. Mage simplifies testing with a robust testing framework that enables the definition of test cases, inputs, and expected outputs directly within the Mage file.

    Debugging: Debugging failed tasks can be time-consuming with certain tools. Mage enhances debugging with detailed logs and error messages, offering clear insights into the causes of failures and expediting issue resolution.

    Conclusion: 

    Mage offers a streamlined and user-friendly approach to data pipeline orchestration, addressing common challenges with simplicity and efficiency. Its single-container deployment, visual interface, and robust features make it a valuable tool for data professionals seeking an intuitive and consistent solution for managing data workflows.

  • Apache Flink – A Solution for Real-Time Analytics

    In today’s world, data is being generated at an unprecedented rate. Every click, every tap, every swipe, every tweet, every post, every like, every share, every search, and every view generates a trail of data. Businesses are struggling to keep up with the speed and volume of this data, and traditional batch-processing systems cannot handle the scale and complexity of this data in real-time.

    This is where streaming analytics comes into play, providing faster insights and more timely decision-making. Streaming analytics is particularly useful for scenarios that require quick reactions to events, such as financial fraud detection or IoT data processing. It can handle large volumes of data and provide continuous monitoring and alerts in real-time, allowing for immediate action to be taken when necessary.

    Stream processing or real-time analytics is a method of analyzing and processing data as it is generated, rather than in batches. It allows for faster insights and more timely decision-making. Popular open-source stream processing engines include Apache Flink, Apache Spark Streaming, and Apache Kafka Streams. In this blog, we are going to talk about Apache Flink and its fundamentals and how it can be useful for streaming analytics. 

    Introduction

    Apache Flink is an open-source stream processing framework first introduced in 2014. Flink has been designed to process large amounts of data streams in real-time, and it supports both batch and stream processing. It is built on top of the Java Virtual Machine (JVM) and is written in Java and Scala.

    Flink is a distributed system that can run on a cluster of machines, and it has been designed to be highly available, fault-tolerant, and scalable. It supports a wide range of data sources and provides a unified API for batch and stream processing, which makes it easy to build complex data processing applications.

    Advantages of Apache Flink

    Real-time analytics is the process of analyzing data as it is generated. It requires a system that can handle large volumes of data in real-time and provide insights into the data as soon as possible. Apache Flink has been designed to meet these requirements and has several advantages over other real-time data processing systems.

    1. Low Latency: Flink processes data streams in real-time, which means it can provide insights into the data almost immediately. This makes it an ideal solution for applications that require low latency, such as fraud detection and real-time recommendations.
    2. High Throughput: Flink has been designed to handle large volumes of data and can scale horizontally to handle increasing volumes of data. This makes it an ideal solution for applications that require high throughput, such as log processing and IoT applications.
    3. Flexible Windowing: Flink provides a flexible windowing API that enables the creation of complex windows for processing data streams. This enables the creation of windows based on time, count, or custom triggers, which makes it easy to create complex data processing applications.
    4. Fault Tolerance: Flink is designed to be highly available and fault-tolerant. It can recover from failures quickly and can continue processing data even if some of the nodes in the cluster fail.
    5. Compatibility: Flink is compatible with a wide range of data sources, including Kafka, Hadoop, and Elasticsearch. This makes it easy to integrate with existing data processing systems.

    Flink Architecture

    Apache Flink processes data streams in a distributed manner. The Flink cluster consists of several nodes, each of which is responsible for processing a portion of the data. The nodes communicate with each other using a messaging system, such as Apache Kafka.

    The Flink cluster processes data streams in parallel by dividing the data into small chunks, or partitions, and processing them independently. Each partition is processed by a task, which is a unit of work that runs on a node in the cluster.

    Flink provides several APIs for building data processing applications, including the DataStream API, the DataSet API, and the Table API. The below diagram illustrates what a Flink cluster looks like.

    Apache Flink Cluster
    • Flink application runs on a cluster.
    • A Flink cluster has a job manager and a bunch of task managers.
    • A job manager is responsible for effective allocation and management of computing resources. 
    • Task managers are responsible for the execution of a job.

    Flink Job Execution

    1. Client system submits job graph to the job manager
    • A client system prepares and sends a dataflow/job graph to the job manager.
    • It can be your Java/Scala/Python Flink application or the CLI.
    • The runtime and program execution do not include the client.
    • After submitting the job, the client can either disconnect and operate in detached mode or remain connected to receive progress reports in attached mode.

    Given below is an illustration of how the job graph converted from code looks like

    Job Graph
    1. The job graph is converted to an execution graph by the job manager
    • The execution graph is a parallel version of the job graph. 
    • For each job vertex, it contains an execution vertex per parallel subtask. 
    • An operator that exhibits a parallelism level of 100 will consist of a single job vertex and 100 execution vertices.

    Given below is an illustration of what an execution graph looks like:

    Execution Graph
    1. Job manager submits the parallel instances of execution graph to task managers
    • Execution resources in Flink are defined through task slots. 
    • Each task manager will have one or more task slots, each of which can run one pipeline of parallel tasks. 
    • A pipeline consists of multiple successive tasks
    Parallel instances of execution graph being submitted to task slots

    Flink Program

    Flink programs look like regular programs that transform DataStreams. Each program consists of the same basic parts:

    • Obtain an execution environment 

    ExecutionEnvironment is the context in which a program is executed. This is how execution environment is set up in Flink code:

    ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); // if program is running on local machine
    ExecutionEnvironment env = new CollectionEnvironment(); // if source is collections
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // will do the right thing based on context

    • Connect to data stream

    We can use an instance of the execution environment to connect to the data source which can be file System, a streaming application or collection. This is how we can connect to data source in Flink: 

    DataSet<String> data = env.readTextFile("file:///path/to/file"); // to read from file
    DataSet<User> users = env.fromCollection( /* get elements from a Java Collection */); // to read from collections
    DataSet<User> users = env.addSource(/*streaming application or database*/);

    • Perform Transformations

    We can perform transformation on the events/data that we receive from the data sources.
    A few of the data transformation operations are map, filter, keyBy, flatmap, etc.

    • Specify where to send the data

    Once we have performed the transformation/analytics on the data that is flowing through the stream, we can specify where we will send the data.
    The destination can be a filesystem, database, or data streams.

     dataStream.sinkTo(/*streaming application or database api */);

    Flink Transformations

    1. Map: Takes one element at a time from the stream and performs some transformation on it, and gives one element of any type as an output.

      Given below is an example of Flink’s map operator:

    stream.map(new MapFunction<Integer, String>()
    {
    public String map(Integer integer)
    {
    return " input -> "+integer +" : " +		
    " output -> " +
    ""+numberToWords(integer	
    .toString().	
    toCharArray()); // converts number to words
    }
    }).print();

    1. Filter: Evaluates a boolean function for each element and retains those for which the function returns true.

    Given below is an example of Flink’s filter operator:

    stream.filter(new FilterFunction<Integer>()
    {
    public String filter(Integer integer) throws Exception
    {
    return integer%2 != 0;
    }
    }).print();

    1. Reduce: A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

    Given below is an example of Flink’s reduce operator:

    DataStream<Integer> stream = env.fromCollection(data);
    stream.countWindowAll(3)
    .reduce(new ReduceFunction<Integer>(){
    public Integer reduce(Integer integer, Integer t1)  throws Exception
    {
    return integer+=t1;
    }
    }).print();

    Input : 

    Output : 

    1. KeyBy: 
    • Logically partitions a stream into disjoint partitions. 
    • All records with the same key are assigned to the same partition. 
    • Internally, keyBy() is implemented with hash partitioning.

    The figure below illustrates how key by operator works in Flink.

    Fault Tolerance

    • Flink combines stream replay and checkpointing to achieve fault tolerance. 
    • At a checkpoint, each operator’s corresponding state and the specific point in each input stream are marked.
    • Whenever Checkpointing is done, a snapshot of the data of all the operators is saved in the state backend, which is generally the job manager’s memory or configurable durable storage.
    • Whenever there is a failure, operators are reset to the most recent state in the state backend, and event processing is resumed.

    Checkpointing

    • Checkpointing is implemented using stream barriers.
    • Barriers are injected into the data stream at the source. E.g., kafka, kinesis, etc.
    • Barriers flow with the records as part of the data stream.

    Refer below diagram to understand how checkpoint barriers flow with the events:

    Checkpoint Barriers
    Saving Snapshots
    • Operators snapshot their state at the point in time when they have received all snapshot barriers from their input streams, and before emitting the barriers to their output streams.
    • Once a sink operator (the end of a streaming DAG) has received the barrier n from all of its input streams, it acknowledges that snapshot n to the checkpoint coordinator. 
    • After all sinks have acknowledged a snapshot, it is considered completed.

    The below diagram illustrates how checkpointing is achieved in Flink with the help of barrier events, state backends, and checkpoint table.

    Checkpointing

    Recovery

    • Flink selects the latest completed checkpoint upon failure. 
    • The system then re-deploys the entire distributed dataflow.
    • Gives each operator the state that was snapshotted as part of the checkpoint.
    • The sources are set to start reading the stream from the position given in the snapshot.
    • For example, in Apache Kafka, that means telling the consumer to start fetching messages from an offset given in the snapshot.

    Scalability  

    A Flink job can be scaled up and scaled down as per requirement.

    This can be done manually by:

    1. Triggering a savepoint (manually triggered checkpoint)
    2. Adding/Removing nodes to/from the cluster
    3. Restarting the job from savepoint

    OR 

    Automatically by Reactive Scaling

    • The configuration of a job in Reactive Mode ensures that it utilizes all available resources in the cluster at all times.
    • Adding a Task Manager will scale up your job, and removing resources will scale it down. 
    • Reactive Mode restarts a job on a rescaling event, restoring it from the latest completed checkpoint.
    • The only downside is that it works only in standalone mode.

    Alternatives  

    • Spark Streaming: It is an open-source distributed computing engine that has added streaming capabilities, but Flink is optimized for low-latency processing of real-time data streams and supports more complex processing scenarios.
    • Apache Storm: It is another open-source stream processing system that has a steeper learning curve than Flink and uses a different architecture based on spouts and bolts.
    • Apache Kafka Streams: It is a lightweight stream processing library built on top of Kafka, but it is not as feature-rich as Flink or Spark, and is better suited for simpler stream processing tasks.

    Conclusion  

    In conclusion, Apache Flink is a powerful solution for real-time analytics. With its ability to process data in real-time and support for streaming data sources, it enables businesses to make data-driven decisions with minimal delay. The Flink ecosystem also provides a variety of tools and libraries that make it easy for developers to build scalable and fault-tolerant data processing pipelines.

    One of the key advantages of Apache Flink is its support for event-time processing, which allows it to handle delayed or out-of-order data in a way that accurately reflects the sequence of events. This makes it particularly useful for use cases such as fraud detection, where timely and accurate data processing is critical.

    Additionally, Flink’s support for multiple programming languages, including Java, Scala, and Python, makes it accessible to a broad range of developers. And with its seamless integration with popular big data platforms like Hadoop and Apache Kafka, it is easy to incorporate Flink into existing data infrastructure.

    In summary, Apache Flink is a powerful and flexible solution for real-time analytics, capable of handling a wide range of use cases and delivering timely insights that drive business value.

    References  

  • An Introduction to Stream Processing & Analytics

    What is Stream Processing and Analytics?

    Stream processing is a technology used to process large amounts of data in real-time as it is generated rather than storing it and processing it later.

    Think of it like a conveyor belt in a factory. The conveyor belt constantly moves, bringing in new products that need to be processed. Similarly, stream processing deals with data that is constantly flowing, like a stream of water. Just like the factory worker needs to process each product as it moves along the conveyor belt, stream processing technology processes each piece of data as it arrives.

    Stateful and stateless processing are two different approaches to stream processing, and the right choice depends on the specific requirements and needs of the application. 

    Stateful processing is useful in scenarios where the processing of an event or data point depends on the state of previous events or data points. For example, it can be used to maintain a running total or average across multiple events or data points.

    Stateless processing, on the other hand, is useful in scenarios where the processing of an event or data point does not depend on the state of previous events or data points. For example, in a simple data transformation application, stateless processing can be used to transform each event or data point independently without the need to maintain state.

    Streaming analytics refers to the process of analyzing and processing data in real time as it is generated. Streaming analytics enable applications to react to events and make decisions in near real time.

    Why Stream Processing and Analytics?

    Stream processing is important because it allows organizations to make real-time decisions based on the data they are receiving. This is particularly useful in situations where timely information is critical, such as in financial transactions, network security, and real-time monitoring of industrial processes.

    For example, in financial trading, stream processing can be used to analyze stock market data in real time and make split-second decisions to buy or sell stocks. In network security, it can be used to detect and respond to cyber-attacks in real time. And in industrial processes, it can be used to monitor production line efficiency and quickly identify and resolve any issues.

    Stream processing is also important because it can process massive amounts of data, making it ideal for big data applications. With the growth of the Internet of Things (IoT), the amount of data being generated is growing rapidly, and stream processing provides a way to process this data in real time and derive valuable insights.

    Collectively, stream processing provides organizations with the ability to make real-time decisions based on the data they are receiving, allowing them to respond quickly to changing conditions and improve their operations.

    How is it different from Batch Processing?

    Batch Data Processing:

    Batch Data Processing is a method of processing where a group of transactions or data is collected over a period of time and is then processed all at once in a “batch”. The process begins with the extraction of data from its sources, such as IoT devices or web/application logs. This data is then transformed and integrated into a data warehouse. The process is generally called the Extract, Transform, Load (ETL) process. The data warehouse is then used as the foundation for an analytical layer, which is where the data is analyzed, and insights are generated.

    Stream/Real-time Data Processing:

    Real-Time Data Streaming involves the continuous flow of data that is generated in real-time, typically from multiple sources such as IoT devices or web/application logs. A message broker is used to manage the flow of data between the stream processors, the analytical layer, and the data sink. The message broker ensures that the data is delivered in the correct order and that it is not lost. Stream processors used to perform data ingestion and processing. These processors take in the data streams and process them in real time. The processed data is then sent to an analytical layer, where it is analyzed, and insights are generated. 

    Processes involved in Stream processing and Analytics:

    The process of stream processing can be broken down into the following steps:

    • Data Collection: The first step in stream processing is collecting data from various sources, such as sensors, social media, and transactional systems. The data is then fed into a stream processing system in real time.
    • Data Ingestion: Once the data is collected, it needs to be ingested or taken into the stream processing system. This involves converting the data into a standard format that can be processed by the system.
    • Data Processing: The next step is to process the data as it arrives. This involves applying various processing algorithms and rules to the data, such as filtering, aggregating, and transforming the data. The processing algorithms can be applied to individual events in the stream or to the entire stream of data.
    • Data Storage: After the data has been processed, it is stored in a database or data warehouse for later analysis. The storage can be configured to retain the data for a specific amount of time or to retain all the data.
    • Data Analysis: The final step is to analyze the processed data and derive insights from it. This can be done using data visualization tools or by running reports and queries on the stored data. The insights can be used to make informed decisions or to trigger actions, such as sending notifications or triggering alerts.

    It’s important to note that stream processing is an ongoing process, with data constantly being collected, processed, and analyzed in real time. The visual representation of this process can be represented as a continuous cycle of data flowing through the system, being processed and analyzed at each step along the way.

    Stream Processing Platforms & Frameworks:

    Stream Processing Platforms & Tools are software systems that enable the collection, processing, and analysis of real-time data streams.

    Stream Processing Frameworks:

    A stream processing framework is a software library or framework that provides a set of tools and APIs for developers to build custom stream processing applications. Frameworks typically require more development effort and configuration to set up and use. They provide more flexibility and control over the stream processing pipeline but also require more development and maintenance resources. 

    Examples: Apache Spark Streaming, Apache Flink, Apache Beam, Apache Storm, Apache Samza

    Let’s first look into the most commonly used stream processing frameworks: Apache Flink & Apache Spark Streaming.

    Apache Flink : 

    Flink is an open-source, unified stream-processing and batch-processing framework. Flink executes arbitrary dataflow programs in a data-parallel and pipelined manner, making it ideal for processing huge amounts of data in real-time.

    • Flink provides out-of-the-box checkpointing and state management, two features that make it easy to manage enormous amounts of data with relative ease.
    • The event processing function, the filter function, and the mapping function are other features that make handling a large amount of data easy.

    Flink also comes with real-time indicators and alerts which make abig difference when it comes to data processing and analysis.

    Note: We have discussed the stream processing and analytics in detail in Stream Processing and Analytics with Apache Flink

    Apache Spark Streaming : 

    Apache Spark Streaming is a scalable fault-tolerant streaming processing system that natively supports both batch and streaming workloads. Spark Streaming is an extension of the core Spark API that allows data engineers and data scientists to process real-time data.

    • Great for solving complicated transformative logic
    • Easy to program
    • Runs at blazing speeds
    • Processes large data within a fraction of second

    Stream Processing Platforms:

    A stream processing platform is an end-to-end solution for processing real-time data streams. Platforms typically require less development effort and maintenance as they provide pre-built tools and functionality for processing, analyzing, and visualizing data. 

    Examples: Apache Kafka, Amazon Kinesis, Google Cloud Pub-Sub

    Let’s look into the most commonly used stream processing platforms: Apache Kafka & AWS Kinesis.

    Apache Kafka: 

    Apache Kafka is an open-source distributed event streaming platform for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

    • Because it’s an open-source, “Kafka generally requires a higher skill set to operate and manage, so it’s typically used for development and testing.
    • APIs allow “producers” to publish data streams to “topics;” a “topic” is a partitioned log of records; a “partition” is ordered and immutable; “consumers” subscribe to “topics.”
    •  It can run on a cluster of “brokers” with partitions split across cluster nodes. 
    • Messages can be effectively unlimited in size (2GB). 

    AWS Kinesis:

    Amazon Kinesis is a cloud-based service on Amazon Web Services (AWS) that allows you to ingest real-time data such as application logs, website clickstreams, and IoT telemetry data for machine learning and analytics, as well as video and audio. 

    • Amazon Kinesis is a SaaS offering, reducing the complexities in the design, build, and manage stages compared to open-source Apache Kafka. It’s ideally suited for building microservices architectures. 
    • “Producers” can push data as soon as it is put on the stream.  Kinesis breaks the stream across “shards” (which are like partitions). 
    • Shards have a hard limit on the number of transactions and data volume per second. If you need more volume, you must subscribe to more shards. You pay for what you use.
    •  Most maintenance and configurations are hidden from the user. Scaling is easy (adding shards) compared to Kafka. 
    • Maximum message size is 1MB.

    Three Characteristics of Event Stream processing Platform:

    Publish and Subscribe:

    In a publish-subscribe model, producers publish events or messages to streams or topics, and consumers subscribe to streams or topics to receive the events or messages. This is similar to a message queue or enterprise messaging system. It allows for the decoupling of the producer and consumer, enabling them to operate independently and asynchronously. 

    Store streams of events in a fault-tolerant way

    This means that the platform is able to store and manage events in a reliable and resilient manner, even in the face of failures or errors. To achieve fault tolerance, event streaming platforms typically use a variety of techniques, such as replicating data across multiple nodes, and implementing data recovery and failover mechanisms.

    Process streams of events in real-time, as they occur

    This means that the platform can process and analyze data as it is generated rather than waiting for data to be batch-processed or stored for later processing.

    Challenges when designing the stream processing and analytics solution:

    Stream processing is a powerful technology, but there are also several challenges associated with it, including:

    • Late arriving data: Data that is delayed or arrives out of order can disrupt the processing pipeline and lead to inaccurate results. Stream processing systems need to be able to handle out-of-order data and reconcile it with the data that has already been processed.
    • Missing data: If data is missing or lost, it can impact the accuracy of the processing results. Stream processing systems need to be able to identify missing data and handle it appropriately, whether by skipping it, buffering it, or alerting a human operator.
    • Duplicate data: Duplicate data can lead to over-counting and skewed results. Stream processing systems need to be able to identify and de-duplicate data to ensure accurate results.
    • Data skew: data skew occurs when there is a disproportionate amount of data for certain key fields or time periods. This can lead to performance issues, processing delays, and inaccurate results. Stream processing systems need to be able to handle data skew by load balancing and scaling resources appropriately.
    • Fault tolerance: Stream processing systems need to be able to handle hardware and software failures without disrupting the processing pipeline. This requires fault-tolerant design, redundancy, and failover mechanisms.
    • Data security and privacy: Real-time data processing often involves sensitive data, such as personal information, financial data, or intellectual property. Stream processing systems need to ensure that data is securely transmitted, stored, and processed in compliance with regulatory requirements.
    • Latency: Another challenge with stream processing is latency or the amount of time it takes for data to be processed and analyzed. In many applications, the results of the analysis need to be produced in real-time, which puts pressure on the stream processing system to process the data quickly.
    • Scalability: Stream processing systems must be able to scale to handle large amounts of data as the amount of data being generated continues to grow. This can be a challenge because the systems must be designed to handle data in real-time while also ensuring that the results of the analysis are accurate and reliable.
    • Maintenance: Maintaining a stream processing system can also be challenging, as the systems are complex and require specialized knowledge to operate effectively. In addition, the systems must be able to evolve and adapt to changing requirements over time.

    Despite these challenges, stream processing remains an important technology for organizations that need to process data in real time and make informed decisions based on that data. By understanding these challenges and designing the systems to overcome them, organizations can realize the full potential of stream processing and improve their operations.

    Key benefits of stream processing and analytics:

    • Real-time processing keeps you in sync all the time:

    For Example: Suppose an online retailer uses a distributed system to process orders. The system might include multiple components, such as a web server, a database server, and an application server. The different components could be kept in sync by real-time processing by processing orders as they are received and updating the database accordingly. As a result, orders would be accurate and processed efficiently by maintaining a consistent view of the data.

    • Real-time data processing is More Accurate and timely:

    For Example a financial trading system that processes data in real-time can help to ensure that trades are executed at the best possible prices, improving the accuracy and timeliness of the trades. 

    • Deadlines are met with Real-time processing:

    For example: In a control system, it may be necessary to respond to changing conditions within a certain time frame in order to maintain the stability of the system. 

    • Real-time processing is quite reactive:

    For example, a real-time processing system might be used to monitor a manufacturing process and trigger an alert if it detects a problem or to analyze sensor data from a power plant and adjust the plant’s operation in response to changing conditions.

    • Real-time processing involves multitasking:

    For example, consider a real-time monitoring system that is used to track the performance of a large manufacturing plant. The system might receive data from multiple sensors and sources, including machine sensors, temperature sensors, and video cameras. In this case, the system would need to be able to multitask in order to process and analyze data from all of these sources in real time and to trigger alerts or take other actions as needed. 

    • Real-time processing works independently:

    For example, a real-time processing system may rely on a database or message queue to store and retrieve data, or it may rely on external APIs or services to access additional data or functionality.

    Use case studies:

    There are many real-life examples of stream processing in different industries that demonstrate the benefits of this technology. Here are a few examples:

    • Financial Trading: In the financial industry, stream processing is used to analyze stock market data in real time and make split-second decisions to buy or sell stocks. This allows traders to respond to market conditions in real time and improve their chances of making a profit.
    • Network Security: Stream processing is also used in network security to detect and respond to cyber-attacks in real-time. By processing network data in real time, security systems can quickly identify and respond to threats, reducing the risk of a data breach.
    • Industrial Monitoring: In the industrial sector, stream processing is used to monitor production line efficiency and quickly identify and resolve any issues. For example, it can be used to monitor the performance of machinery and identify any potential problems before they cause a production shutdown.
    • Social Media Analysis: Stream processing is also used to analyze social media data in real time. This allows organizations to monitor brand reputation, track customer sentiment, and respond to customer complaints in real time.
    • Healthcare: In the healthcare industry, stream processing is used to monitor patient data in real time and quickly identify any potential health issues. For example, it can be used to monitor vital signs and alert healthcare providers if a patient’s condition worsens.

    These are just a few examples of the many real-life applications of stream processing. Across all industries, stream processing provides organizations with the ability to process data in real time and make informed decisions based on the data they are receiving.

    How to start stream analytics?

    • Our recommendation in building a dedicated platform is to keep the focus on choosing a diverse stream processor to pair with your existing analytical interface. 
    • Or, keep an eye on vendors who offer both stream processing and BI as a service.

    Resources:

    Here are some useful resources for learning more about stream processing:

    Videos:

    Tutorials:

    Articles:

    These resources will provide a good starting point for learning more about stream processing and how it can be used to solve real-world problems. 

    Conclusion:

    Real-time data analysis and decision-making require stream processing and analytics in diverse industries, including finance, healthcare, and e-commerce. Organizations can improve operational efficiency, customer satisfaction, and revenue growth by processing data in real time. A robust infrastructure, skilled personnel, and efficient algorithms are required for stream processing and analytics. Businesses need stream processing and analytics to stay competitive and agile in today’s fast-paced world as data volumes and complexity continue to increase.

  • Modern Data Stack: The What, Why and How?

    This post will provide you with a comprehensive overview of the modern data stack (MDS), including its benefits, how it’s components differ from its predecessors’, and what its future holds.

    “Modern” has the connotation of being up-to-date, of being better. This is true for MDS, but how exactly is MDS better than what was before?

    What was the data stack like?…

    A few decades back, the map-reduce technological breakthrough made it possible to efficiently process large amounts of data in parallel on multiple machines.

    It provided the backbone of a standard pipeline that looked like:

    It was common to see HDFS used for storage, spark for computing, and hive to perform SQL queries on top.

    To run this, we had people handling the deployment and maintenance of Hadoop on their own.

    This core attribute of the setup eventually became a pain point and made it complex and inefficient in the long run.

    Being on-prem while facing growing heavier loads meant scalability became a huge concern.

    Hence, unlike today, the process was much more manual. Adding more RAM, increasing storage, and rolling out updates manually reduced productivity

    Moreover,

    • The pipeline wasn’t modular; components were tightly coupled, causing failures when deciding to shift to something new.
    • Teams committed to specific vendors and found themselves locked in, by design, for years.
    • Setup was complex, and the infrastructure was not resilient. Random surges in data crashed the systems. (This randomness in demand has only increased since the early decade of internet, due to social media-triggered virality.)
    • Self-service was non-existent. If you wanted to do anything with your data, you needed data engineers.
    • Observability was a myth. Your pipeline is failing, but you’re unaware, and then you don’t know why, where, how…Your customers become your testers, knowing more about your system’s issues.
    • Data protection laws weren’t as formalized, especially the lack of policies within the organization. These issues made the traditional setup inefficient in solving modern problems, and as we all know…

    For an upgraded modern setup, we needed something that is scalable, has a smaller learning curve, and something that is feasible for both a seed-stage startup or a fortune 500.

    Standing on the shoulders of tech innovations from the 2000s, data engineers started building a blueprint for MDS tooling with three core attributes: 

    Cloud Native (or the ocean)

    Arguably the definitive change of the MDS era, the cloud reduces the hassle of on-prem and welcomes auto-scaling horizontally or vertically in the era of virality and spikes as technical requirements.

    Modularity

    The M in MDS could stand for modular.

    You can integrate any MDS tool into your existing stack, like LEGO blocks.

    You can test out multiple tools, whether they’re open source or managed, choose the best fit, and iteratively build out your data infrastructure.

    This mindset helps instill a habit of avoiding vendor lock-in by continuously upgrading your architecture with relative ease.

    By moving away from the ancient, one-size-fits-all model, MDS recognizes the uniqueness of each company’s budget, domain, data types, and maturity—and provides the correct solution for a given use case.

    Ease of Use

    MDS tools are easier to set up. You can start playing with these tools within a day.

    Importantly, the ease of use is not limited to technical engineers.

    Owing to the rise of self-serve and no-code tools like tableau—data is finally democratized for usage for all kinds of consumers. SQL remains crucial, but for basic metric calculations PMs, Sales, Marketing, etc., can use a simple drag and drop in the UI (sometimes even simpler than Excel pivot tables).

    MDS also enables one to experiment with different architectural frameworks for their use case. For example, ELT vs. ETL (explained under Data Transformation).

    But, one might think such improvements mean MDS is the v1.1 of Data Stack, a tech upgrade that ultimately uses data to solve similar problems.

    Fortunately, that’s far from the case.

    MDS enables data to solve more human problems across the org—problems that employees have long been facing but could never systematically solve for, helping generate much more value from the data.

    Beyond these, employees want transparency and visibility into how any metric was calculated and which data source in Snowflake was used to build what specific tableau dashboard.

    Critically, with compliance finally being focused on, orgs need solutions for giving the right people the right access at the right time.

    Lastly, as opposed to previous eras, these days, even startups have varied infrastructure components with data; if you’re a PM tasked with bringing insights, how do you know where to start? What data assets the organization has?

    Besides these problem statements being tackled, MDS builds a culture of upskilling employees in various data concepts.

    Data security, governance, and data lineage are important irrespective of department or persona in the organization.

    From designers to support executives, the need for a data-driven culture is a given.

    You’re probably bored of hearing how good the MDS is and want to deconstruct it into its components.

    Let’s dive in.

    SOURCES

    In our modern era, every product is inevitably becoming a tech product

    From a smart bulb to an orbiting satellite, each generates data in its own unique flavor of frequency of generation, data format, data size, etc.

    Social media, microservices, IoT devices, smart devices, DBs, CRMs, ERPs, flat files, and a lot more…

    INGESTION

    Post creation of data, how does one “ingest” or take in that data for actual usage? (the whole point of investing).

    Roughly, there are three categories to help describe the ingestion solutions:

    Generic tools allow us to connect various data sources with data storages.

    E.g.: we can connect Google Ads or Salesforce to dump data into BigQuery or S3.

    These generic tools highlight the modularity and low or no code barrier aspect in MDS.

    Things are as easy as drag and drop, and one doesn’t need to be fluent in scripting.

    Then we have programmable tools as well, where we get more control over how we ingest data through code

    For example, we can write Apache Airflow DAGs in Python to load data from S3 and dump it to Redshift.

    Intermediary – these tools cater to a specific use case or are coupled with the source itself.

    E.g. – Snowpipe, a part of the data source snowflake itself, allows us to load data from files as soon as it’s available at the source.

    DATA STORAGE‍

    Where do you ingest data into?

    Here, we’ve expanded from HDFS & SQL DBs to a wider variety of formats (noSQL, document DB).

    Depending on the use case and the way you interact with data, you can choose from a DW, DB, DL, ObjectStores, etc.

    You might need a standard relational DB for transactions in finance, or you might be collecting logs. You might be experimenting with your product at an early stage and be fine with noSQL without worrying about prescribing schemas.

    One key feature to note is that—most are cloud-based. So, no more worrying about scalability and we pay only for what we use.

    PS: Do stick around till the end for new concepts of Lake House and reverse ETL (already prevalent in the industry).

    DATA TRANSFORMATION

    The stored raw data must be cleaned and restructured into the shape we deem best for actual usage. This slicing and dicing is different for every kind of data.

    For example, we have tools for the E-T-L way, which can be categorized into SaaS and Frameworks, e.g., Fivetran and Spark respectively.

    Interestingly, the cloud era has given storage computational capability such that we don’t even need an external system for transformation, sometimes.

    With this rise of E-LT, we leverage the processing capabilities of cloud data warehouses or lake houses. Using tools like DBT, we write templated SQL queries to transform our data in the warehouses or lake house itself.

    This is enabling analysts to perform heavy lifting of traditional DE problems

    We also see stream processing where we work with applications where “micro” data is processed in real time (analyzed as soon as it’s produced, as opposed to large batches).

    DATA VISUALIZATION

    The ability to visually learn from data has only improved in the MDS era with advanced design, methodology, and integration.

    With Embedded analytics, one can integrate analytical capabilities and data visualizations into the software application itself.

    External analytics, on the other hand, are used to build using your processed data. You choose your source, create a chart, and let it run.

    DATA SCIENCE, MACHINE LEARNING, MLOps

    Source: https://medium.com/vertexventures/thinking-data-the-modern-data-stack-d7d59e81e8c6

    In the last decade, we have moved beyond ad-hoc insight generation in Jupyter notebooks to

    production-ready, real-time ML workflows, like recommendation systems and price predictions. Any startup can and does integrate ML into its products.

    Most cloud service providers offer machine learning models and automated model building as a service.

    MDS concepts like data observation are used to build tools for ML practitioners, whether its feature stores (a feature store is a central repository that provides entity values as of a certain time), or model monitoring (checking data drift, tracking model performance, and improving model accuracy).

    This is extremely important as statisticians can focus on the business problem not infrastructure.

    This is an ever-expanding field where concepts for ex MLOps (DevOps for the ML pipelines—optimizing workflows, efficient transformations) and Synthetic media (using AI to generate content itself) arrive and quickly become mainstream.

    ChatGPT is the current buzz, but by the time you’re reading this, I’m sure there’s going to be an updated one—such is the pace of development.

    DATA ORCHESTRATION

    With a higher number of modularized tools and source systems comes complicated complexity.

    More steps, processes, connections, settings, and synchronization are required.

    Data orchestration in MDS needs to be Cron on steroids.

    Using a wide variety of products, MDS tools help bring the right data for the right purposes based on complex logic.

     

    DATA OBSERVABILITY

    Data observability is the ability to monitor and understand the state and behavior of data as it flows through an organization’s systems.

    In a traditional data stack, organizations often rely on reactive approaches to data management, only addressing issues as they arise. In contrast, data observability in an MDS involves adopting a proactive mindset, where organizations actively monitor and understand the state of their data pipelines to identify potential issues before they become critical.

    Monitoring – a dashboard that provides an operational view of your pipeline or system

    Alerting – both for expected events and anomalies 

    Tracking – ability to set and track specific events

    Analysis – automated issue detection that adapts to your pipeline and data health

    Logging – a record of an event in a standardized format for faster resolution

    SLA Tracking – Measure data quality against predefined standards (cost, performance, reliability)

    Data Lineage – graph representation of data assets showing upstream/downstream steps.

    DATA GOVERNANCE & SECURITY

    Data security is a critical consideration for organizations of all sizes and industries and needs to be prioritized to protect sensitive information, ensure compliance, and preserve business continuity. 

    The introduction of stricter data protection regulations, such as the General Data Protection Regulation (GDPR) and CCPA, introduced a huge need in the market for MDS tools, which efficiently and painlessly help organizations govern and secure their data.

    DATA CATALOG

    Now that we have all the components of MDS, from ingestion to BI, we have so many sources, as well as things like dashboards, reports, views, other metadata, etc., that we need a google like engine just to navigate our components.

    This is where a data catalog helps; it allows people to stitch the metadata (data about your data: the #rows in your table, the column names, types, etc.) across sources.

    This is necessary to help efficiently discover, understand, trust, and collaborate on data assets.

    We don’t want PMs & GTM to look at different dashboards for adoption data.

    Previously, the sole purpose of the original data pipeline was to aggregate and upload events to Hadoop/Hive for batch processing. Chukwa collected events and wrote them to S3 in Hadoop sequence file format. In those days, end-to-end latency was up to 10 minutes. That was sufficient for batch jobs, which usually scan data at daily or hourly frequency.

    With the emergence of Kafka and Elasticsearch over the last decade, there has been a growing demand for real-time analytics on Netflix. By real-time, we mean sub-minute latency. Instead of starting from scratch, Netflix was able to iteratively grow its MDS as per changes in market requirements.

    Source: https://blog.transform.co/data-talks/the-metric-layer-why-you-need-it-examples-and-how-it-fits-into-your-modern-data-stack/

     

    This is a snapshot of the MDS stack a data-mature company like Netflix had some years back where instead of a few all in one tools, each data category was solved by a specialized tool.

    FUTURE COMPONENTS OF MDS?

    DATA MESH

    Source: https://martinfowler.com/articles/data-monolith-to-mesh.html

    The top picture shows how teams currently operate, where no matter the feature or product on the Y axis, the data pipeline’s journey remains the same moving along the X. But in an ideal world of data mesh, those who know the data should own its journey.

    As decentralization is the name of the game, data mesh is MDS’s response to this demand for an architecture shift where domain owners use self-service infrastructure to shape how their data is consumed.

    DATA LAKEHOUSE

    Source: https://www.altexsoft.com/blog/data-lakehouse/

    We have talked about data warehouses and data lakes being used for data storage.

    Initially, when we only needed structured data, data warehouses were used. Later, with big data, we started getting all kinds of data, structured and unstructured.

    So, we started using Data Lakes, where we just dumped everything.

    The lakehouse tries to combine the best of both worlds by adding an intelligent metadata layer on top of the data lake. This layer basically classifies and categorizes data such that it can be interpreted in a structured manner.

    Also, all the data in the lake house is open, meaning that it can be utilized by all kinds of tools. They are generally built on top of open data formats like parquet so that they can be easily accessed by all the tools.

    End users can simply run their SQLs as if they’re querying a DWH. 

    REVERSE ETL

    Suppose you’re a salesperson using Salesforce and want to know if a lead you just got is warm or cold (warm indicating a higher chance of conversion).

    The attributes about your lead, like salary and age are fetched from your OLTP into a DWH, analyzed, and then the flag “warm” is sent back to Salesforce UI, ready to be used in live operations.

     METRICS LAYER

    The Metric layer will be all about consistency, accessibility, and trust in the calculations of metrics.

    Earlier, for metrics, you had v1 v1.1 Excels with logic scattered around.

    Currently, in the modern data stack world, each team’s calculation is isolated in the tool they are used to. For example, BI would store metrics in tableau dashboards while DEs would use code.

    A metric layer would exist to ensure global access of the metrics to every other tool in the data stack.

    For example, DBT metrics layer helps define these in the warehouse—something accessible to both BI and engineers. Similarly, looker, mode, and others have their unique approach to it.

    In summary, this blog post discussed the modern data stack and its advantages over older approaches. We examined the components of the modern data stack, including data sources, ingestion, transformation, and more, and how they work together to create an efficient and effective system for data management and analysis. We also highlighted the benefits of the modern data stack, including increased efficiency, scalability, and flexibility. 

    As technology continues to advance, the modern data stack will evolve and incorporate new components and capabilities.

  • Building an ETL Workflow Using Apache NiFi and Hive

    The objective of this article is to design an ETL workflow using Apache NiFi that will scrape a web page with almost no code to get an endpoint, extract and transform the dataset, and load the transformed data into a Hive table.

    Problem Statement

    One potential use case where we need to create a data pipeline would be to capture the district level COVID-19 information from the COVID19-India API website, which gets updated daily. So, the aim is to create a flow that collates and loads a dataset into a warehouse system used by various downstream applications for further analysis, and the flow should be easily configurable for future changes.

    Prerequisites

    Before we start, we must have a basic understanding of Apache NiFi, and having it installed on a system would be a great start for this article. If you do not have it installed, please follow these quick steps. Apache Hive should be added to this architecture, which also requires a fully functional Hadoop framework. For this article, I am using Hive on a single cluster installed locally, but you can use a remote hive connection as well.

    Basic Terminologies

    Apache NiFi is an ETL tool with flow-based programming that comes with a web UI built to provide an easy way (drag & drop) to handle data flow in real-time. It also supports powerful and scalable means of data routing and transformation, which can be run on a single server or in a clustered mode across many servers.

    NiFi workflow consists of processors, the rectangular components that can process, verify, filter, join, split, or adjust data. They exchange pieces of information called FlowFiles through queues named connections, and the FlowFile Controller helps to manage the resources between those components.

    Web scraping is a process to extract and collect structured web data with automation. It includes extracting and processing underlying HTML code using CSS selectors and the extracted data gets stored into a database.

    Apache Hive is a warehouse system built on top of Hadoop used for data summarization, query, and ad-hoc analysis.

    Steps for ETL Workflow

    Fig:- End-to-End NiFi WorkFlow

    The above flow comprises multiple processors each performing different tasks at different stages to process data. The different stages are Collect (InvokeHTTP – API Web Page, InvokeHTTP – Download District Data), Filter (GetHTMLElement, ExtractEndPoints, RouteOnAttributeDistrict API, QueryRecord), Transform (ReplaceHeaders, ConvertJSONToSQL), Load (PutHiveQL), and Logging (LogAttribute). Each processor is connected through different relationship connections and gets triggered on success until the data gets loaded into the table. The entire flow is scheduled to run daily.

    So, let’s dig into each step to understand the flow better.

    1. Get the HTML document using the Remote URL

    The flow starts with an InvokeHTTP processor that sends an HTTP GET request to the COVID19-India API URL and returns an HTML page in the response queue for further inspection. The processor can be used to invoke multiple HTTP methods (GET, PUT, POST, or PATCH) as well.

    Fig:- InvokeHTTP – API Web Page Configuration

    2. Extract listed endpoints

    The second step occurs when the GETHTMLElement processor targets HTML table rows from the response where all the endpoints are listed inside anchor tags using the CSS selector as tr > td > a. and extracts data into FlowFiles.

    Fig:- GetHTMLElement Configuration

    After the success of the previous step, the ExtractText processor evaluates regular expressions against the content of the FlowFile to extract the URLs, which are then assigned to a FlowFile attribute named data_url.

    Fig:- ExtractEndPoints Configuration

    Note: The layout of the web page may have changed in the future. So, if you are reading this article in the future, configure the above processors as per the layout changes if any.

    3. Pick districts API and Download the dataset

    Here, the RouteOnAttribute processor filters out an API for district-level information and ignores other APIs using Apache NiFi Expression since we are only interested in district.csv

    Fig:- RouteOnAttribute – District API Configuration

    And this time, the InvokeHTTP processor downloads the data using the extracted API endpoint assigned to the attribute data_url surrounded with curly braces and the response data will be in the CSV format.

    Fig:- InvokeHTTP – Download District Data Configuration

    ‍4. Transform and Filter the dataset

    In this stage, the header of the response data is changed to lowercase using the ReplaceText processor with Literal Replace strategy, and the first field name is changed from date to recorded_date to avoid using reserved database keywords.

    Since the data is being updated daily on an incremental basis, we will only extract the data from the previous day using the QueryRecord processor. It will also convert the CSV data into JSON FlowFile using the CSVReader and JsonRecordSetWriter controller services.

    Please note that both the CSVReader and JsonRecordSetWriter services can have the default settings for our use. You can check out this blog for more reading on the controller services.

    And as mentioned, QueryRecord evaluates the below query to get data from the previous day out of the FlowFile and passes it to the next processor.

    select * from FlowFile where recorded_date=’${now():toNumber():minus(86400000):format(‘yyyy-MM-dd’)}’ 

    Fig:- ReplaceHeaders Configuration

    Fig:- QueryRecord Configuration

    ‍5. Establish JDBC connection pool for Hive and create a table

    Let’s set up the Hive JDBC driver for the NiFi flow using HiveConnectinPool with required local/remote configurations (database connection URL, user, and password).  Hive Configuration Resources property expects Hive configuration file path, i.e., hive-site.xml.

    Fig:- HiveConnectionPool Setup

    Now, we need an empty table to load the data from the NiFi flow, and to do so, you can use the DDL structure below:

    CREATE TABLE IF NOT EXISTS demo.districts (recorded_date string, state string, district string, confirmed string, recovered string, deceased string, other string, tested string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

    6. Load data into the Hive table

    In this step, the JSON-formatted FlowFile is converted into an SQL statement using ConvertJSONToSQL to provide a SQL query as the output FlowFile. We can configure the HiveConnectinPool for the JDBC Connection Pool property along with the table name and statement type before running the processor. In this case, the statement would be an insert type since we need to load the data into the table.

    Also, please note that when preparing a SQL command, the SQL Parameter Attribute Prefix property should be hiveql. Otherwise, the very next processor will not be able to identify it and will throw an error.

    Then, on success, PutHiveQL executes the input SQL command and loads the data into the table. The success of this processor marks the end of the workflow and the data can be verified by fetching the target table.

    Fig:- ConvertJSONToSQL Configurations

     

    Fig:- PutHiveQL Configuration

    ‍7. Schedule the flow for daily updates

    You can schedule the entire flow to run at any given time using different NiFi scheduling strategies. Since the first InvokeHTTP is the initiator of this flow, we can configure it to run daily at 2 AM.

    Fig:- Scheduling Strategy

    8. Log Management

    Almost every processor has been directed to the LogAttribute processor with a failure/success queue, which will write the state and information of all used attributes into the NiFi file, logs/nifi-app.log. By checking this file, we can debug and fix the issue in case of any failure. To extend it even further, we can also set up a flow to capture and notify error logs using Apache Kafka over email.

    9. Consume data for analysis

    You can use various open-source visualization tools to start off with the exploratory data analysis on the data stored in the Hive table.

    You can download the template covid_etl_workflow.xml and run it on your machine for reference.

    Future Scope

    There are different ways to build any workflow, and this was one of them. You can take this further by allowing multiple datasets (state_wise, test_datasets) from the list with different combinations of various processors/controllers as a part of the flow. 

    You can also try scraping data from a product listing page of multiple e-commerce websites for a comparison between goods and price or you can even extract movie reviews and ratings from the IMDb website and use it as a recommendation for users. 

    Conclusion

    In this article, we discussed Apache NiFi and created a workflow to extract, filter, transform, and load the data for analysis purposes. If you are more comfortable building logics and want to focus on the architecture with less code, then Apache NiFi is the tool for you.

  • ClickHouse – The Newest Data Store in Your Big Data Arsenal

    ClickHouse

    ClickHouse is an open-source column-oriented data warehouse for online analytical processing of queries (OLAP). It is fast, scalable, flexible, cost-efficient, and easy to run. It supports the best in the industry query performance while significantly reducing storage requirements through innovative use of columnar storage and compression.

    ClickHouse’s performance exceeds comparable column-oriented database management systems that are available on the market. ClickHouse is a database management system, not a single database. ClickHouse allows creating tables and databases at runtime, loading data, and running queries without reconfiguring and restarting the server.

    ClickHouse processes from hundreds of millions to over a billion rows of data across hundreds of node clusters. It utilizes all available hardware for processing queries to their fastest. The peak processing performance for a single query stands at more than two terabytes per second.

    What makes ClickHouse unique?

    • Data Storage & Compression: ClickHouse is designed to work on regular hard drives but uses SSD and additional RAM if available. Data compression in ClickHouse plays a crucial role in achieving excellent performance. It provides general-purpose compression codecs and some specialized codecs for specific kinds of data. These codecs have different CPU consumption and disk space and help ClickHouse outperform other databases.
    • High Performance: By using vector computation, engine data is processed by vectors which are parts of columns, and achieve high CPU efficiency. It supports parallel processing across multiple cores, turning large queries into parallelized naturally. ClickHouse also supports distributed query processing; data resides across shards which are used for parallel execution of the query.
    • Primary & Secondary Index: Data is sorted physically by the primary key allowing low latency extraction of specific values or ranges. The secondary index in ClickHouse enable the database to know that the query filtering conditions would skip some of the parts entirely. Therefore, these are also called data skipping indexes.
    • Support for Approximated Calculations: ClickHouse trades accuracy for performance by approximated calculations. It provides aggregate functions for an approximated estimate of several distinct values, medians, and quantiles. It retrieves proportionally fewer data from the disk to run queries based on the part of data to get approximated results.
    • Data Replication and Data Integrity Support: All the remaining duplicates retrieve their copies in the background after being written to any available replica. The system keeps identical data on several clones. Most failures are recovered automatically or semi-automatically in complex scenarios.

    But it can’t be all good, can it? there are some disadvantages to ClickHouse as well:

    • No full-fledged transactions.
    • Inability to efficiently and precisely change or remove previously input data. For example, to comply with GDPR, data could well be cleaned up or modified using batch deletes and updates.
    • ClickHouse is less efficient for point queries that retrieve individual rows by their keys due to the sparse index.

    ClickHouse against its contemporaries

    So with all these distinctive features, how does ClickHouse compare with other industry-leading data storage tools. Now, ClickHouse being general-purpose, has a variety of use cases, and it has its pros and cons, so here’s a high-level comparison against the best tools in their domain. Depending on the use case, each tool has its unique traits, and comparison around them would not be fair, but what we care about the most is performance, scalability, cost, and other key attributes that can be compared irrespective of the domain. So here we go:

    ClickHouse vs Snowflake:

    • With its decoupled storage & compute approach, Snowflake is able to segregate workloads and enhance performance. The search optimization service in Snowflake further enhances the performance for point lookups but has additional costs attached with it. ClickHouse, on the other hand, with local runtime and inherent support for multiple forms of indexing, drastically improves query performance.
    • Regarding scalability, ClickHouse being on-prem makes it slightly challenging to scale compared to Snowflake, which is cloud-based. Managing hardware manually by provisioning clusters and migrating is doable but tedious. But one possible solution to tackle is to deploy CH on the cloud, a very good option that is cheaper and, frankly, the most viable. 

    ClickHouse vs Redshift:

    • Redshift is a managed, scalable cloud data warehouse. It offers both provisioned and serverless options. Its RA3 nodes compute scalably and cache the necessary data. Still, even with that, its performance does not separate different workloads that are on the same data putting it on the lower end of the decoupled compute & storage cloud architectures. ClickHouse’s local runtime is one of the fastest. 
    • Both Redshift and ClickHouse are columnar, sort data, allowing read-only specific data. But deploying CH is cheaper, and although RS is tailored to be a ready-to-use tool, CH is better if you’re not entirely dependent on Redshift’s features like configuration, backup & monitoring.

    ClickHouse vs InfluxDB:

    • InfluxDB, written in Go, this open-source no-SQL is one of the most popular choices when it comes to dealing with time-series data and analysis. Despite being a general-purpose analytical DB, ClickHouse provides competitive write performance. 
    • ClickHouse’s data structures like AggregatingMergeTree allow real-time data to be stored in a pre-aggregated format which puts it on par in performance regarding TSDBs. It is significantly faster in heavy queries and comparable in the case of light queries.

    ClickHouse vs PostgreSQL:

    • Postgres is another DB that is very versatile and thus is widely used by the world for various use cases, just like ClickHouse. Postgres, however, is an OLTP DB, so unlike ClickHouse, analytics is not its primary aim, but it’s still used for analytics purposes to a certain extent.
    • In terms of transactional data, ClickHouse’s columnar nature puts it below Postgres, but when it comes to analytical capabilities, even after tuning Postgres to its max potential, for, e.g., by using materialized views, indexing, cache size, buffers, etc. ClickHouse is ahead.  

    ClickHouse vs Apache Druid:

    • Apache Druid is an open-source data store that is primarily used for OLAP. Both Druid & ClickHouse are very similar in terms of their approaches and use cases but differ in terms of their architecture. Druid is mainly used for real-time analytics with heavy ingestions and high uptime.
    • Unlike Druid, ClickHouse has a much simpler deployment. CH can be deployed on only one server, while Druid setup needs multiple types of nodes (master, broker, ingestion, etc.). ClickHouse, with its support for SQL-like nature, provides better flexibility. It is more performant when the deployment is small.

    To summarize the differences between ClickHouse and other data warehouses:

    ClickHouse Engines

    Depending on the type of your table (internal or external) ClickHouse provides an array of engines that help us connect to different data storages and also determine the way data is stored, accessed, and other interactions on it.

    These engines are mainly categorized into two types:

    Database Engines:

    These allow us to work with different databases & tables.
    ClickHouse uses the Atomic database engine to provide configurable table engines and dialects. The popular ones are PostgreSQL, MySQL, and so on.

    Table Engines:

    These determine 

    • how and where data is stored
    • where to read/write it from/to
    • which queries it supports
    • use of indexes
    • concurrent data access and so on.

    These engines are further classified into families based on the above parameters:

    MergeTree Engines:

    This is the most universal and functional table for high-load tasks. The engines of this family support quick data insertion with subsequent background data processing. These engines also support data replication, partitioning, secondary data-skipping indexes and some other features. Following are some of the popular engines in this family:

    • MergeTree
    • SummingMergeTree
    • AggregatingMergeTree

    MergeTree engines with indexing and partitioning support allow data to be processed at a tremendous speed. These can also be leveraged to form materialized views that store aggregated data further improving the performance.

    Log Engines:

    These are lightweight engines with minimum functionality. These work the best when the requirement is to quickly write into many small tables and read them later as a whole. This family consists of:

    • Log
    • StripeLog
    • TinyLog

    These engines append data to the disk in a sequential fashion and support concurrent reading. They do not support indexing, updating, or deleting and hence are only useful when the data is small, sequential, and immutable.

    Integration Engines:

    These are used for communicating with other data storage and processing systems. This support:

    • JDBC
    • MongoDB
    • HDFS
    • S3
    • Kafka and so on.

    Using these engines we can import and export data from external sources. With engines like Kafka we can ingest data directly from a topic to a table in ClickHouse and with the S3 engine, we work directly with S3 objects.

    Special Engines:

    ClickHouse offers some special engines that are specific to the use case. For example:

    • MaterializedView
    • Distributed
    • Merge
    • File and so on.

    These special engines have their own quirks for eg. with File we can export data to a file, update data in the table by updating the file, etc.

    Summary

    We learned that ClickHouse is a very powerful and versatile tool. One that has stellar performance is feature-packed, very cost-efficient, and open-source. We saw a high-level comparison of ClickHouse with some of the best choices in an array of use cases. Although it ultimately comes down to how specific and intense your use case is, ClickHouse and its generic nature measure up pretty well on multiple occasions.

    ClickHouse’s applicability in web analytics, network management, log analysis, time series analysis, asset valuation in financial markets, and security threat identification makes it tremendously versatile. With consistently solving business problems in a low latency response for petabytes of data, ClickHouse is indeed one of the faster data warehouses out there.

    Further Readings

  • Unit Testing Data at Scale using Deequ and Apache Spark

    Everyone knows the importance of knowledge and how critical it is to progress. In today’s world, data is knowledge. But that’s only when the data is “good” and correctly interpreted. Let’s focus on the “good” part. What do we really mean by “good data”?

    Its definition can change from use case to use case but, in general terms, good data can be defined by its accuracy, legitimacy, reliability, consistency, completeness, and availability.

    Bad data can lead to failures in production systems, unexpected outputs, and wrong inferences, leading to poor business decisions.

    It’s important to have something in place that can tell us about the quality of the data we have, how close it is to our expectations, and whether we can rely on it.

    This is basically the problem we’re trying to solve.

    The Problem and the Potential Solutions

    A manual approach to data quality testing is definitely one of the solutions and can work well.

    We’ll need to write code for computing various statistical measures, running them manually on different columns, maybe draw some plots, and then conduct some spot checks to see if there’s something not right or unexpected. The overall process can get tedious and time-consuming if we need to do it on a daily basis.

    Certain tools can make life easier for us, like:

    In this blog, we’ll be focussing on Amazon Deequ.

    Amazon Deequ

    Amazon Deequ is an open-source tool developed and used at Amazon. It’s built on top of Apache Spark, so it’s great at handling big data. Deequ computes data quality metrics regularly, based on the checks and validations set, and generates relevant reports.

    Deequ provides a lot of interesting features, and we’ll be discussing them in detail. Here’s a look at its main components:

    Source: AWS

    Prerequisites

    Working with Deequ requires having Apache Spark up and running with Deequ as one of the dependencies.

    As of this blog, the latest version of Deequ, 1.1.0, supports Spark 2.2.x to 2.4.x and Spark 3.0.x.

    Sample Dataset

    For learning more about Deequ and its features, we’ll be using an open-source IMDb dataset which has the following schema: 

    root
     |-- tconst: string (nullable = true)
     |-- titleType: string (nullable = true)
     |-- primaryTitle: string (nullable = true)
     |-- originalTitle: string (nullable = true)
     |-- isAdult: integer (nullable = true)
     |-- startYear: string (nullable = true)
     |-- endYear: string (nullable = true)
     |-- runtimeMinutes: string (nullable = true)
     |-- genres: string (nullable = true)
     |-- averageRating: double (nullable = true)
     |-- numVotes: integer (nullable = true)

    Here, tconst is the primary key, and the rest of the columns are pretty much self-explanatory.

    Data Analysis and Validation

    Before we start defining checks on the data, if we want to compute some basic stats on the dataset, Deequ provides us with an easy way to do that. They’re called metrics.

    Deequ provides support for the following metrics:

    ApproxCountDistinct, ApproxQuantile, ApproxQuantiles, Completeness, Compliance, Correlation, CountDistinct, DataType, Distance, Distinctness, Entropy, Histogram, Maximum, MaxLength, Mean, Minimum, MinLength, MutualInformation, PatternMatch, Size, StandardDeviation, Sum, UniqueValueRatio, Uniqueness

    Let’s go ahead and apply some metrics to our dataset.

    val runAnalyzer: AnalyzerContext = { AnalysisRunner
      .onData(data)
      .addAnalyzer(Size())
      .addAnalyzer(Completeness("averageRating"))
      .addAnalyzer(Uniqueness("tconst"))
      .addAnalyzer(Mean("averageRating"))
      .addAnalyzer(StandardDeviation("averageRating"))
      .addAnalyzer(Compliance("top rating", "averageRating >= 7.0"))
      .addAnalyzer(Correlation("numVotes", "averageRating"))
      .addAnalyzer(Distinctness("tconst"))
      .addAnalyzer(Maximum("averageRating"))
      .addAnalyzer(Minimum("averageRating"))
      .run()
    }
    
    val metricsResult = successMetricsAsDataFrame(spark, runAnalyzer)
    metricsResult.show(false)

    We get the following output by running the code above:

    +-----------+----------------------+-----------------+--------------------+
    |entity     |instance              |name             |value               |
    +-----------+----------------------+-----------------+--------------------+
    |Mutlicolumn|numVotes,averageRating|Correlation      |0.013454113877394851|
    |Column     |tconst                |Uniqueness       |1.0                 |
    |Column     |tconst                |Distinctness     |1.0                 |
    |Dataset    |*                     |Size             |7339583.0           |
    |Column     |averageRating         |Completeness     |0.14858528066240276 |
    |Column     |averageRating         |Mean             |6.886130810579155   |
    |Column     |averageRating         |StandardDeviation|1.3982924856469208  |
    |Column     |averageRating         |Maximum          |10.0                |
    |Column     |averageRating         |Minimum          |1.0                 |
    |Column     |top rating            |Compliance       |0.080230443609671   |
    +-----------+----------------------+-----------------+--------------------+

    Let’s try to quickly understand what this tells us.

    • The dataset has 7,339,583 rows.
    • The distinctness and uniqueness of the tconst column is 1.0, which means that all the values in the column are distinct and unique, which should be expected as it’s the primary key column.
    • The averageRating column has a min of 1 and a max of 10 with a mean of 6.88 and a standard deviation of 1.39, which tells us about the variation in the average rating values across the data.
    • The completeness of the averageRating column is 0.148, which tells us that we have an average rating available for around 15% of the dataset’s records.
    • Then, we tried to see if there’s any correlation between the numVotes and averageRating column. This metric calculates the Pearson correlation coefficient, which has a value of 0.01, meaning there’s no correlation between the two columns, which is expected.

    This feature of Deequ can be really helpful if we want to quickly do some basic analysis on a dataset.

    Let’s move on to defining and running tests and checks on the data.

    Data Validation

    For writing tests for our dataset, we use Deequ’s VerificationSuite and add checks on attributes of the dataset.

    Deequ has a big handy list of validators available to use, which are:

    hasSize, isComplete, hasCompleteness, isUnique, isPrimaryKey, hasUniqueness, hasDistinctness, hasUniqueValueRatio, hasNumberOfDistinctValues, hasHistogramValues, hasEntropy, hasMutualInformation, hasApproxQuantile, hasMinLength, hasMaxLength, hasMin, hasMax, hasMean, hasSum, hasStandardDeviation, hasApproxCountDistinct, hasCorrelation, satisfies, hasPattern, containsCreditCardNumber, containsEmail, containsURL, containsSocialSecurityNumber, hasDataType, isNonNegative, isPositive, isLessThan, isLessThanOrEqualTo, isGreaterThan, isGreaterThanOrEqualTo, isContainedIn

    Let’s apply some checks to our dataset.

    val validationResult: VerificationResult = { VerificationSuite()
      .onData(data)
      .addCheck(
        Check(CheckLevel.Error, "Review Check") 
          .hasSize(_ >= 100000) // check if the data has atleast 100k records
          .hasMin("averageRating", _ > 0.0) // min rating should not be less than 0
          .hasMax("averageRating", _ < 9.0) // max rating should not be greater than 9
          .containsURL("titleType") // verify that titleType column has URLs
          .isComplete("primaryTitle") // primaryTitle should never be NULL
          .isNonNegative("numVotes") // should not contain negative values
          .isPrimaryKey("tconst") // verify that tconst is the primary key column
          .hasDataType("isAdult", ConstrainableDataTypes.Integral) 
          //column contains Integer values only, expected as values this col has are 0 or 1
          )
      .run()
    }
    
    val results = checkResultsAsDataFrame(spark, validationResult)
    results.select("constraint","constraint_status","constraint_message").show(false)

    We have added some checks to our dataset, and the details about the check can be seen as comments in the above code.

    We expect all checks to pass for our dataset except the containsURL and hasMax ones.

    That’s because the titleType column doesn’t have URLs, and we know that the max rating is 10.0, but we are checking against 9.0.

    We can see the output below:

    +--------------------------------------------------------------------------------------------+-----------------+-----------------------------------------------------+
    |constraint                                                                                  |constraint_status|constraint_message                                   |
    +--------------------------------------------------------------------------------------------+-----------------+-----------------------------------------------------+
    |SizeConstraint(Size(None))                                                                  |Success          |                                                     |
    |MinimumConstraint(Minimum(averageRating,None))                                              |Success          |                                                     |
    |MaximumConstraint(Maximum(averageRating,None))                                              |Failure          |Value: 10.0 does not meet the constraint requirement!|
    |containsURL(titleType)                                                                      |Failure          |Value: 0.0 does not meet the constraint requirement! |
    |CompletenessConstraint(Completeness(primaryTitle,None))                                     |Success          |                                                     |
    |ComplianceConstraint(Compliance(numVotes is non-negative,COALESCE(numVotes, 0.0) >= 0,None))|Success          |                                                     |
    |UniquenessConstraint(Uniqueness(List(tconst),None))                                         |Success          |                                                     |
    |AnalysisBasedConstraint(DataType(isAdult,None),<function1>,Some(<function1>),None)          |Success          |                                                     |
    +--------------------------------------------------------------------------------------------+-----------------+-----------------------------------------------------+
    view raw

    In order to perform these checks, behind the scenes, Deequ calculated metrics that we saw in the previous section.

    To look at the metrics Deequ computed for the checks we defined, we can use: 

    VerificationResult.successMetricsAsDataFrame(spark,validationResult)
                      .show(truncate=false)

    Automated Constraint Suggestion

    Automated constraint suggestion is a really interesting and useful feature provided by Deequ.

    Adding validation checks on a dataset with hundreds of columns or on a large number of datasets can be challenging. With this feature, Deequ tries to make our task easier. Deequ analyses the data distribution and, based on that, suggests potential useful constraints that can be used as validation checks.

    Let’s see how this works.

    This piece of code can automatically generate constraint suggestions for us:

    val constraintResult = { ConstraintSuggestionRunner()
      .onData(data)
      .addConstraintRules(Rules.DEFAULT)
      .run()
    }
    
    val suggestionsDF = constraintResult.constraintSuggestions.flatMap { 
      case (column, suggestions) => 
        suggestions.map { constraint =>
          (column, constraint.description, constraint.codeForConstraint)
        } 
    }.toSeq.toDS()
    
    suggestionsDF.select("_1","_2").show(false)

    Let’s look at constraint suggestions generated by Deequ:

    +--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |runtimeMinutes|'runtimeMinutes' has less than 72% missing values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
    |tconst        |'tconst' is not null                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
    |titleType     |'titleType' is not null                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
    |titleType     |'titleType' has value range 'tvEpisode', 'short', 'movie', 'video', 'tvSeries', 'tvMovie', 'tvMiniSeries', 'tvSpecial', 'videoGame', 'tvShort'                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
    |titleType     |'titleType' has value range 'tvEpisode', 'short', 'movie' for at least 90.0% of values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
    |averageRating |'averageRating' has no negative values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
    |originalTitle |'originalTitle' is not null                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
    |startYear     |'startYear' has less than 9% missing values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
    |startYear     |'startYear' has type Integral                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
    |startYear     |'startYear' has no negative values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
    |endYear       |'endYear' has type Integral  
    |endYear       |'endYear' has value range '2017', '2018', '2019', '2016', '2015', '2020', '2014', '2013', '2012', '2011', '2010',......|
    |endYear       |'endYear' has value range '' for at least 99.0% of values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
    |endYear       |'endYear' has no negative values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
    |numVotes      |'numVotes' has no negative values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
    |primaryTitle  |'primaryTitle' is not null                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
    |isAdult       |'isAdult' is not null                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
    |isAdult       |'isAdult' has no negative values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
    |genres        |'genres' has less than 7% missing values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
    +--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

    We shouldn’t expect the constraint suggestions generated by Deequ to always make sense. They should always be verified before using.

    This is because the algorithm that generates the constraint suggestions just works on the data distribution and isn’t exactly “intelligent.”

    We can see that most of the suggestions generated make sense even though they might be really trivial.

    For the endYear column, one of the suggestions is that endYear should be contained in a list of years, which indeed is true for our dataset. However, it can’t be generalized as every passing year, the value for endYear continues to increase.

    But on the other hand, the suggestion that titleType can take the following values: ‘tvEpisode,’ ‘short,’ ‘movie,’ ‘video,’ ‘tvSeries,’ ‘tvMovie,’ ‘tvMiniSeries,’ ‘tvSpecial,’ ‘videoGame,’ and ‘tvShort’ makes sense and can be generalized, which makes it a great suggestion.

    And this is why we should not blindly use the constraints suggested by Deequ and always cross-check them.

    Something we can do to improve the constraint suggestions is to use the useTrainTestSplitWithTestsetRatio method in ConstraintSuggestionRunner.
    It makes a lot of sense to use this on large datasets.

    How does this work? If we use the config useTrainTestSplitWithTestsetRatio(0.1), Deequ would compute constraint suggestions on 90% of the data and evaluate the suggested constraints on the remaining 10%, which would improve the quality of the suggested constraints.

    Anomaly Detection

    Deequ also supports anomaly detection for data quality metrics.

    The idea behind Deequ’s anomaly detection is that often we have a sense of how much change in certain metrics of our data can be expected. Say we are getting new data every day, and we know that the number of records we get on a daily basis are around 8 to 12k. On a random day, if we get 40k records, we know something went wrong with the data ingestion job or some other job didn’t go right.

    Deequ will regularly store the metrics of our data in a MetricsRepository. Once that’s done, anomaly detection checks can be run. These compare the current values of the metrics to the historical values stored in the MetricsRepository, and that helps Deequ to detect anomalous changes that are a red flag.

    One of Deequ’s anomaly detection strategies is the RateOfChangeStrategy, which limits the maximum change in the metrics by some numerical factor that can be passed as a parameter.

    Deequ supports other strategies that can be found here. And code examples for anomaly detection can be found here.

    Conclusion

    We learned about the main features and capabilities of AWS Lab’s Deequ.

    It might feel a little daunting to people unfamiliar with Scala or Spark, but using Deequ is very easy and straightforward. Someone with a basic understanding of Scala or Spark should be able to work with Deequ’s primary features without any friction.

    For someone who rarely deals with data quality checks, manual test runs might be a good enough option. However, for someone dealing with new datasets frequently, as in multiple times in a day or a week, using a tool like Deequ to perform automated data quality testing makes a lot of sense in terms of time and effort.

    We hope this article helped you get a deep dive into data quality testing and using Deequ for these types of engineering practices.