Category: Services

  • Hacking Your Way Around AWS IAM Roles

    Identity and Access Management (IAM) offers role-based access control (RBAC) to your AWS account users and resources, and you can granularize the permission set by defining the policy. If you are familiar or even a beginner with AWS cloud, you know how important IAM is.

    “AWS Identity and Access Management (IAM) is a web service that helps you securely control access to AWS resources. You use IAM to control who is authenticated (signed in) and authorized (has permissions) to use resources.”

    – AWS IAM User Guide

    With the emergence of cloud infrastructure services, the coolest thing you can do is write your infrastructure as code. AWS offers SDKs for various programming/scripting languages, and of course, like any other API call, you need to sign a request with tokens. The AWS IAM console lets you generate access_key and secret_access_key tokens. This token can then be configured with your SDK. 

    Alternatively, you can configure the token with your user profile via aws cli. This also means anyone with access_key and secret_access_key will have permissions configured as per the IAM policy. Thus, keeping credentials on the disk is insecure. You can implement a key rotation policy to keep the environment compliant. To even overcome this, you can use the AWS IAM role for services. 

    Let’s say if you are working on an AWS EC2 instance that needs access to some other AWS service, like S3. You can create an IAM role for EC2 with a policy that has appropriate permission to access the S3 bucket. In this case, your SDK doesn’t need a token (not at least on the disk or hardcoded in code). Let’s take a look at the hierarchy of how the AWS SDK looks for a token for signing requests.

    1. Embedded in your code (very insecure). This is the very first place your SDK looks for. Below is a NodeJS example, where access_key and secret_access_key are part of the code itself.

    const {S3} = require("aws-sdk");
    const s3 = new S3({
       accessKeyId : "ABCDEFGHIJKLMNOPQRST",
       secretAccessKey : "7is/HVjA8lm9hRrJyZEPWAs5Bo8KyyvEqjjxIHoO"
      //sessionToken : "options_session_token_if_applicable"
    });

    2. AWS environment variables. If the token is not embedded in your code, your SDK looks for AWS environment variables available to process. These environment variables are AWS_ACCESS_KEY, AWS_SECRET_ACCESS_KEY, and optional AWS_SESSION_TOKEN. Below is an example where AWS credentials are exported and the aws cli command is used to list S3 buckets. Note that once credentials are exported, they are available to all the child processes. Therefore, these credentials are auto looked up by your AWS SDK.

    3. The AWS credentials (default profile) file located at ~/.aws/credentials. This is the third place for the lookup. You can generate this file by running the command aws configure. You may also manually create this file with various profiles. If you happen to have multiple profiles, you can then export an environment variable called AWS_PROFILE. An example credentials file is given below:

    [default] ; default profile
    aws_access_key_id = <DEFAULT_ACCESS_KEY_ID>
    aws_secret_access_key = <DEFAULT_SECRET_ACCESS_KEY>
      
    [personal-account] ; personal account profile
    aws_access_key_id = <PERSONAL_ACCESS_KEY_ID>
    aws_secret_access_key = <PERSONAL_SECRET_ACCESS_KEY>
      
    [work-account] ; work account profile
    aws_access_key_id = <WORK_ACCESS_KEY_ID>
    aws_secret_access_key = <WORK_SECRET_ACCESS_KEY>

    4. The IAM role attached to your resource. Your resource could be EC2 Instance, Lambda function, AWS glue, ECS Container, RDS, etc. Now, this is a secure way of using credentials. Since your credentials are not stored anywhere on the disk, exported via an environment variable, or hardcoded in the code. You need not worry about key rotation at all.

    TL;DR: IAM roles are a secure way of using credentials. However, they are only applicable to resources within AWS. You can not use them outside of AWS. So, the IAM role can only be attached to resources like EC2, Lambda, ECS, etc.

    The problem statement:

    Let’s say a group of developers needs access to a few S3 buckets and DynamoDB. The organization does not want developers to use access_key and secret_access_key on their local machine (laptop) as access_key and secret_access can be used anywhere or can be stolen. 

    Since IAM roles are more secure, they allocate EC2 with Windows OS and attach the IAM role with appropriate permission to access S3 buckets and DynamoDB and configure IDE and other essential dev tools. Developers then use RDP to connect to EC2 Instance. However, due to license restrictions, only two users can connect with RDP at a given time. So, they add more similar instances. This heavily increases cost. Wouldn’t it be nice, if somehow, IAM roles could be attached to local machines?

    How do IAM roles for resources work?

    Resources like EC2 or Lambda have the link-local address available. The link-local address 169.254.169.254 can be accessed over HTTP port 80 to retrieve instance metadata. For instance, to get the instance-id of an EC2 instance from the host itself, you can query with a GET request to curl -L http://169.254.169.254/latest/meta-data/instance-id/. Similarly, you can retrieve IAM credentials if the IAM role is attached to the EC2 instance. Let’s  assume you have created an IAM role for an EC2 instance with the name “iam-role-for-ec2”. Your SDK will then automatically access credentials via a GET request to curl -L http://169.254.169.254/latest/meta-data/iam/security-credentials/iam-role-for-ec2/

    $ curl -L 169.254.169.254/latest/meta-data/iam/security-credentials/iam-role-for-ec2/
    {
     "Code" : "Success",
     "LastUpdated" : "2021-08-03T09:18:49Z",
     "Type" : "AWS-HMAC",
     "AccessKeyId" : "ASIASP26DFHDIOFNJFFX",
     "SecretAccessKey" : "EK1A7x9dntSzF9LlG7BK08C6zpTS/F6MHYTBo/+U",
     "Token" : "IQoJb3JpZ2luX2VjEPr//////////wEaCXVzLXdlc3QtMiJIMEYCIQCOCqHrHjEkYZUFsRtGXwa8gfGjsBmaU+WrL2Z0ihvA3QIhAIsGhJFiPetOod7IUUC++unWZfoUEgjEU0ULYwZUvGwwKvoDCBIQAhoMMTcxNDU5MTYwNTE4IgxFUXJfE/0cdJs2Gigq1wM8Ww8yAS2i2qUqsQ1t+yd4ATkE5fvIMDtHxzPQ2raVQb+cCgC/eJVQpeNET1SP01HnrN5W1QFID+xOPk3vZt6NrCy48OUf6+cCGrd63Jv/7glAsyQGaGM/Jt5ddi6593dgN7VLFHsEBAwqkZ3j/VjAzYbthP3clmRl++6k+vpiUp2j4uwM4zW/6f8faR6awPbPVmJsyh94pXaQXJU+H0w+9Hp0MlUvP6GRqBiuTwv/+EOiRfth1XGRxxOuR5X+fr0Ve4tede2x0ZvSLeUsUENHlOQnUkSGbu1Hiv1BhDEjhzbHi7PXhW1G9N1FZObE+wdF4hGYbe3LUUIrnp2xnIcxKzmume2YQvFE4DvJvBtF22DsdLP4GPmitofhV2FGcVxP1f5Nv76M6SfOQY65vSZQde4LIwcotRIrMgwEWup2Rplq6s56K93IYXp6QmnUWLgdtcMBTMVQsOFhCdj05P+VYqlKe5xRT4/8BucmIHn7+J4indNoL+3BvYvnpiISdcEhlyswNZOPhVQJjwJfKPPdu9NDEKQ+Jep4wpVvOSh+CAtxKtqwGz1wrKzqlRvzqBFaEQrD4WdPdf9YnTvmKIXgPuk74pZRlarVsREL0KmG6G0zzA2lRYow6JOkiAY6pAHIZGH+UH5RL79drKe86tUnWCORcX9omN2uUK7FemTENwyvholib4jLGY6HcjvDF10jqkcu1KEV20xNsPj87BP7irEH7xH//Jz2+rnSaN5PCqLezSsATPYhHFQjg6Oti+0E33F+F5MA25Pn2+u5TDP1VfFgYExwSor79gNtwbOMs76432ssHYFioYjHttPfVwyNXloLCwgphqJBwiNhMDMcKapK6Q==",
     "Expiration" : "2021-08-03T15:47:26Z"
    }

    Notice that the response payload is JSON with AccessKeyId, SecretAccessKey, and Token. Additionally, there is an Expiration key, which states the validity of the token. This means the token is autogenerated once they expire.

    Solution:

    Now that you know how IAM roles work and how important link-local address is, you have probably guessed what needs to be done so that you can access IAM role credentials from your local machine. The two solutions that popup in my mind are:

    1. Host a lightweight reverse proxy server like Nginx and then write a wrapper around your SDK so that initial calls are made to EC2 and credentials are retrieved.

    2. Route traffic originating from your system, targeting 169.254.169.254. Traffic should reach the EC2 instance and EC2 itself should take care of forwarding packets to the instance metadata server.

    The second solution may sound pretty techy, but it is the ideal solution, and you don’t need to do additional tweaking in your SDK. The developer is transparent about what is being implemented. This blog will focus on implementing a second solution.

    Implementation:

    1. Launch a Linux (Ubuntu 20.04 LTS prefered) EC2 instance from AWS console and attach the IAM role with appropriate permissions. The instance should be in the public subnet and make sure to attach an Elastic IP address. Whitelist incoming port 1194 UDP (open to world) and port 22 (ssh, open to your IP address only) TCP in your instance security group.

    2. Install OpenVPN and git package. apt update; apt install git openvpn.

    3. Clone easy-rsa repository on your server. cd ~;git clone https://github.com/OpenVPN/easy-rsa.git

    4. Generate certificates for OpenVPN server and client using easy-rsa.

    #switch to easy-rsa directory
    cd ~/easy-rsa/easyrsa3
    #copy vars.example to vars
    cp vars.example vars
    #Find below variables in "vars" file and edit them according to your needs
    set_var EASYRSA_REQ_COUNTRY    "US"
    set_var EASYRSA_REQ_PROVINCE   "California"
    set_var EASYRSA_REQ_CITY       "San Francisco"
    set_var EASYRSA_REQ_ORG        "Copyleft Certificate Co"
    set_var EASYRSA_REQ_EMAIL      "me@example.net"
    set_var EASYRSA_REQ_OU         "My Organizational Unit"
    #Also edit below two variables if you plan to run easyrsa in non-interactive mode
    # EASYRSA_REQ_CN should be set to your ElasticIP Address.
    # Note: If your are using openvpn behind a load balancer, or if you plan to map DNS to your server, then this should be set to your DNS name
    set_var EASYRSA_REQ_CN         "Your Instance Elastic IP"
    set_var EASYRSA_BATCH          "NONEMPTY"
    #====================================================
    #Generate certificate and keys for server and client
    ./easyrsa init-pki
    ./easyrsa build-ca nopass
    ./easyrsa gen-dh
    ./easyrsa build-server-full server nopass
    ./easyrsa build-client-full client nopass
    #Copy certificates and keys to server configuration
    cp -p ./pki/ca.crt /etc/openvpn/
    cp -p ./pki/issued/server.crt /etc/openvpn/
    cp -p ./pki/private/server.key /etc/openvpn/
    cp -p ./pki/dh.pem /etc/openvpn/dh2049.pem
    cd /etc/openvpn
    openvpn --genkey --secret myvpn.tlsauth
    echo "net.ipv4.ip_forward = 1" >>/etc/sysctl.conf
    sysctl -p

    5. Configure OpenVPN server.conf file:

    port 1194
    proto udp
    dev tun
    ca ca.crt
    cert server.crt
    key server.key # This file should be kept secret
    dh dh2048.pem
    topology subnet
    server 10.8.0.0 255.255.255.0
    ifconfig-pool-persist ipp.txt
    push "redirect-gateway def1 bypass-dhcp"
    push "dhcp-option DNS 8.8.8.8"
    push "dhcp-option DNS 1.1.1.1"
    push "route 169.254.169.254 255.255.255.255"
    keepalive 10 120
    tls-auth myvpn.tlsauth 0
    cipher AES-256-CBC
    comp-lzo
    user nobody
    group nogroup
    persist-key
    persist-tun
    status openvpn-status.log
    log-append  /var/log/openvpn.log
    verb 4
    explicit-exit-notify 1
    remote-cert-eku "TLS Web Client Authentication"

    In the above configuration file, make sure line number 9 is not conflicting with your AWS VPC CIDR. Line number 14 (push “route 169.254.169.254 255.255.255.255”) does a trick for us and is the heart of this blog post. This assures that when a client connects via OpenVPN, a route is added to the client machine so that packets targeting 168.254.169.254 are routed via OpenVPN tunnel. (Note: If you do not add this here, you can manually add a route to your client-side once OpenVPN is connected. ip route add 169.254.169.254/32 YOUR_TUNNEL_IP dev tun0)

    6. Generate an OpenVPN client configuration file:

    #These commands are executed on your EC2 (OopenvVpn)
    cd ~/easy-rsa/easyrsa3
    cat <<EOF >/tmp/client.ovpn
    client
    dev tun
    proto udp
    remote YOUR-ELASTIC-IP-ADDRESS 1194
    resolv-retry infinite
    nobind
    persist-key
    persist-tun
    cipher AES-256-CBC
    comp-lzo
    verb 3
    key-direction 1
    EOF
    #append ca certificate
    echo '<ca>' >>/tmp/client.ovpn
    cat ./pki/ca.crt >>/tmp/client.ovpn
    echo '</ca>' >>/tmp/client.ovpn
    #append client certificate
    echo '<cert>' >>/tmp/client.ovpn
    sed -n '/BEGIN CERTIFICATE/,/END CERTIFICATE/{p;/END CERTIFICATE/q}' ./pki/issued/client.crt >>/tmp/client.ovpn
    echo '</cert>' >>/tmp/client.ovpn
    #append client key
    echo '<key>' >>/tmp/client.ovpn
    cat ./pki/private/client.key >>/tmp/client.ovpn
    echo '</key>' >>/tmp/client.ovpn
    #append TLS auth key
    echo '<tls-auth>' >>/tmp/client.ovpn
    cat /etc/openvpn/myvpn.tlsauth >>/tmp/client.ovpn
    echo '</tls-auth>' >>/tmp/client.ovpn

    In the above configuration file, make sure to update line number 9. This could be your EC2 elastic IP address (or domain if mapped and configured).

    7. Finally, download the /tmp/client.ovpn file to your local machine. Install the OpenVPN client software, import the client.ovpn file, and connect. If you are using a Linux machine, you may connect using sudo openvpn –config /path/to/client.ovpn.

    Testing:

    Let us say you have configured the IAM role with permission that lets you list S3 buckets. You should be able to access AWS resources once the OpenVPN client is connected. Your SDK should automatically look for credentials via metadata link-local address. You may install the aws-cli utility and run aws s3 ls to list S3 buckets.

    Conclusion:

    IAM roles are meant to be used with AWS resources like EC2, ECS, Lambda, etc. so that you don’t keep the credentials hardcoded in the code or in the configuration file left unsecured on the disk. Our goal was to use the IAM role directly from the local machine (laptop). We achieved this by using OpenVPN secure SSL tunnel. The VPN assures that we are in a private network, thus keeping the environment compliant. This guide is not meant for how one should set up an OpenVPN server/client. Therefore, you must harden the OpenVPN server. You may put the server behind the network load balancer and may enforce MAC binding features to your clients.

  • Container Security: Let’s Secure Your Enterprise Container Infrastructure!

    Introduction

    Containerized applications are becoming more popular with each passing year. A reason for this rise in popularity could be the pivotal role they play in Continuous Delivery by enabling fast and automated deployment of software services.

    Security still remains a major concern mainly because of the way container images are being used. In the world of VMs, infra/security team used to validate the OS images and installed packages for vulnerabilities. But with the adoption of containers, developers are building their own container images. Images are rarely built from scratch. They are typically built on some base image, which is itself built on top of other base images. When a developer builds a container image, he typically grabs a base image and other layers from public third party sources. These images and libraries may contain obsolete or vulnerable packages, thereby putting your infrastructure at risk. An added complexity is that many existing vulnerability-scanning tools may not work with containers, nor do they support container delivery workflows including registries and CI/CD pipelines. In addition, you can’t simply scan for vulnerabilities – you must scan, manage vulnerability fixes and enforce vulnerability-based policies.

    The Container Security Problem

    The table below shows the number of vulnerabilities found in the images available on dockerhub. Note that (as of April 2016) the worst offending community images contained almost 1,800 vulnerabilities! Official images were much better, but still contained 392 vulnerabilities in the worst case.

    If we look at the distribution of vulnerability severities, we see that pretty much all of them are high severity, for both official and community images. What we’re not told is the underlying distribution of vulnerability severities in the CVE database, so this could simply be a reflection of that distribution.

    Over 80% of the latest versions of official images contained at least one high severity vulnerability!

    • There are so many docker images readily available on dockerhub – are you sure the ones you are using are safe?
    • Do you know where your containers come from?
    • Are your developers downloading container images and libraries from unknown and potentially harmful sources?
    • Do the containers use third party library code that is obsolete or vulnerable?

    In this blog post, I will explain some of the solutions available which can help with these challenges. Solutions like ‘Docker scanning services‘, ‘Twistlock Trust’ and an open-source solution ‘Clair‘ from Coreos.com which can help in scanning and fixing vulnerability problems making your container images secure.

    Clair

    Clair is an open source project for the static analysis of vulnerabilities in application containers. It works as an API that analyzes every container layer to find known vulnerabilities using existing package managers such as Debian (dpkg), Ubuntu (dpkg), CentOS (rpm). It also can be used from the command line. It provides a list of vulnerabilities that threaten a container, and can notify users when new vulnerabilities that affect existing containers become known. In regular intervals, Clair ingests vulnerability metadata from a configured set of sources and stores it in the database. Clients use the Clair API to index their container images; this parses a list of installed source packages and stores them in the database. Clients use the Clair API to query the database; correlating data in real time, rather than a cached result that needs re-scanning.

    Clair identifies security issues that developers introduce in their container images. The vanilla process for using Clair is as follows:

    1. A developer programmatically submits their container image to Clair
    2. Clair analyzes the image, looking for security vulnerabilities
    3. Clair returns a detailed report of security vulnerabilities present in the image
    4. Developer acts based on the report

    How to use Clair

    Docker is required to follow along with this demonstration. Once Docker is installed, use the Dockerfile below to create an Ubuntu image that contains a version of SSL that is susceptible to Heartbleed attacks.

    #Dockerfile
    FROM ubuntu:precise-20160303
    #Install WGet
    RUN apt-get update
    RUN apt-get -f install
    RUN apt-get install -y wget
    #Install an OpenSSL vulnerable to Heartbleed (CVE-2014-0160)
    RUN wget --no-check-certificate https://launchpad.net/~ubuntu-security/+archive/ubuntu/ppa/+build/5436462/+files/openssl_1.0.1-4ubuntu5.11_amd64.deb
    RUN dpkg -i openssl_1.0.1-4ubuntu5.11_amd64.deb

    Build the image using below command:

    $ docker build . -t madhurnawandar/heartbeat

    After creating the insecure Docker image, the next step is to download and install Clair from here. The installation choice used for this demonstration was the Docker Compose solution. Once Clair is installed, it can be used via querying its API or through the clairctl command line tool. Submit the insecure Docker image created above to Clair for analysis and it will catch the Heartbleed vulnerability.

    $ clairctl analyze --local madhurnawandar/heartbeat
    Image: /madhurnawandar/heartbeat:latest
    9 layers found 
    ➜ Analysis [f3ce93f27451] found 0 vulnerabilities. 
    ➜ Analysis [738d67d10278] found 0 vulnerabilities. 
    ➜ Analysis [14dfb8014dea] found 0 vulnerabilities. 
    ➜ Analysis [2ef560f052c7] found 0 vulnerabilities. 
    ➜ Analysis [69a7b8948d35] found 0 vulnerabilities. 
    ➜ Analysis [a246ec1b6259] found 0 vulnerabilities. 
    ➜ Analysis [fc298ae7d587] found 0 vulnerabilities. 
    ➜ Analysis [7ebd44baf4ff] found 0 vulnerabilities. 
    ➜ Analysis [c7aacca5143d] found 52 vulnerabilities.
    $ clairctl report --local --format json madhurnawandar/heartbeat
    JSON report at reports/json/analysis-madhurnawandar-heartbeat-latest.json

    You can view the detailed report here.

    Docker Security Scanning

    Docker Cloud and Docker Hub can scan images in private repositories to verify that they are free from known security vulnerabilities or exposures, and report the results of the scan for each image tag. Docker Security Scanning is available as an add-on to Docker hosted private repositories on both Docker Cloud and Docker Hub.

    Security scanning is enabled on a per-repository basis and is only available for private repositories. Scans run each time a build pushes a new image to your private repository. They also run when you add a new image or tag. The scan traverses each layer of the image, identifies the software components in each layer, and indexes the SHA of each component.

    The scan compares the SHA of each component against the Common Vulnerabilities and Exposures (CVE®) database. The CVE is a “dictionary” of known information security vulnerabilities. When the CVE database is updated, the service reviews the indexed components for any that match the new vulnerability. If the new vulnerability is detected in an image, the service sends an email alert to the maintainers of the image.

    A single component can contain multiple vulnerabilities or exposures and Docker Security Scanning reports on each one. You can click an individual vulnerability report from the scan results and navigate to the specific CVE report data to learn more about it.

    Twistlock

    Twistlock is a rule-based access control policy system for Docker and Kubernetes containers. Twistlock is able to be fully integrated within Docker, with out-of-the-box security policies that are ready to use.

    Security policies can set the conditions for users to, say, create new containers but not delete them; or, they can launch containers but aren’t allowed to push code to them. Twistlock features the same policy management rules as those on Kubernetes, wherein a user can modify management policies but cannot delete them.

    Twistlock also handles image scanning. Users can scan an entire container image, including any packaged Docker application. Twistlock has done its due-diligence in this area, correlating with Red Hat and Mirantis to ensure no container is left vulnerable while a scan is running.

    Twistlock also deals with image scanning of containers within the registries themselves. In runtime environments, Twistlock features a Docker proxy running on the same server with an application’s other containers. This is essentially traffic filtering, whereupon the application container calling the Docker daemon is then re-routed through Twistlock. This approach enforces access control, allowing for safer configuration where no containers are set to run as root. It’s also able to SSH into an instance, for example. In order to delve into these layers of security, Twistlock enforces the policy at runtime.

    When new code is written in images, it is then integrated into the Twistlock API to push an event, whereupon the new image is deposited into the registry along with its unique IDs. It is then pulled out by Twistlock and scanned to ensure it complies with the set security policies in place. Twistlock deposits the scan result into the CI process so that developers can view the result for debugging purposes.

    Integrating these vulnerability scanning tools into your CI/CD Pipeline:

    These tools becomes more interesting paired with a CI server like Jenkins, TravisCI, etc. Given proper configuration, process becomes:

    1. A developer submits application code to source control
    2. Source control triggers a Jenkins build
    3. Jenkins builds the software containers necessary for the application
    4. Jenkins submits the container images to vulnerability scanning tool
    5. Tool identifies security vulnerabilities in the container
    6. Jenkins receives the security report, identifies a high vulnerability in the report, and stops the build

    Conclusion

    There are many solutions like ‘Docker scanning services’, ‘Twistlock Trust’, ‘Clair‘, etc to secure your containers. It’s critical for organizations to adopt such tools in their CI/CD pipelines. But this itself is not going to make containers secure. There are lot of guidelines available in the CIS Benchmark for containers like tuning kernel parameters, setting proper network configurations for inter-container connectivity, securing access to host level directories and others. I will cover these items in the next set of blogs. Stay tuned!

  • Exploring Upgrade Strategies for Stateful Sets in Kubernetes

    Introduction

    In the age of continuous delivery and agility where the software is being deployed 10s of times per day and sometimes per hour as well using container orchestration platforms, a seamless upgrade mechanism becomes a critical aspect of any technology adoption, Kubernetes being no exception. 

    Kubernetes provides a variety of controllers that define how pods are set up and deployed within the Kubernetes cluster. These controllers can group pods together according to their runtime needs and can be used to define pod replication and pod startup ordering. Kubernetes controllers are nothing but an application pattern. The controller controls the pods(smallest unit in Kubernetes), so, you don’t need to create, manage and delete the pods. There are few types of controllers in Kubernetes like,

    1. Deployment
    2. Statefulset
    3. Daemonset
    4. Job
    5. Replica sets

    Each controller represents an application pattern. For example, Deployment represents the stateless application pattern in which you don’t store the state of your application. Statefulset represents the statefulset application pattern where you store the data, for example, databases, message queues.  We will be focusing on Statefulset controller and its update feature in this blog.

    Statefulset

    The StatefulSet acts as a controller in Kubernetes to deploy applications according to a specified rule set and is aimed towards the use of persistent and stateful applications. It is an ordered and graceful deployment. Statefulset is generally used with a distributed applications that require each node to have a persistent state and the ability to configure an arbitrary number of nodes. StatefulSet pods have a unique identity that is comprised of an ordinal, a stable network identity, and stable storage. The identity sticks to the pod, regardless of which node it’s scheduled on. For more details check here.

    Update Strategies FOR STATEFULSETS

    There are a couple of different strategies available for upgrades – Blue/Green and Rolling updates. Let’s review them in detail:

    Blue-Green DeploymentBlue-green deployment is one of the commonly used update strategies. There are 2 identical environments of your application in this strategy. One is the Blue environment which is running the current deployment and the Green environment is the new deployment to which we want to upgrade. The approach is simple:

    1. Switch the load balancer to route traffic to the Green environment.
    2. Delete the Blue environment once the Green environment is verified. 

    Disadvantages of Blue-Green deployment:

    1. One of the disadvantages of this strategy is that all current transactions and sessions will be lost, due to the physical switch from one machine serving the traffic to another one.
    2. Implementing blue-green deployment become complex with the database, especially if, the database schema changes across version.
    3. In blue-green deployment, you need the extra cloud setup/hardware which increases the overall costing.

    Rolling update strategy

    After Blue-Green deployment, let’s take a look at Rolling updates and how it works.

    1. In short, as the name suggests this strategy replaces currently running instances of the application with new instances, one by one. 
    2. In this strategy, health checks play an important role i.e. old instances of the application are removed only if new version are healthy. Due to this, the existing deployment becomes heterogeneous while moving from the old version of the application to new version. 
    3. The benefit of this strategy is that its incremental approach to roll out the update and verification happens in parallel while increasing traffic to the application.
    4. In rolling update strategy, you don’t need extra hardware/cloud setup and hence it’s cost-effective technique of upgrade.

    Statefulset upgrade strategies

    With the basic understanding of upgrade strategies, let’s explore the update strategies available for Stateful sets in Kubernetes. Statefulsets are used for databases where the state of the application is the crucial part of the deployment. We will take the example of Cassandra to learn about statefulset upgrade feature. We will use the gce-pd storage to store the data. StatefulSets(since Kubernetes 1.7) uses an update strategy to configure and disable automated rolling updates for containers, labels, resource request/limits, and annotations for its pods. The update strategy is configured using the updateStrategy field.

    The updateStrategy field accepts one of the following value 

    1. OnDelete
    2. RollingUpdate

    OnDelete update strategy

    OnDelete prevents the controller from automatically updating its pods. One needs to delete the pod manually for the changes to take effect. It’s more of a manual update process for the Statefulset application and this is the main difference between OnDelete and RollingUpdate strategy. OnDelete update strategy plays an important role where the user needs to perform few action/verification post the update of each pod. For example, after updating a single pod of Cassandra user might need to check if the updated pod joined the Cassandra cluster correctly.

    We will now create a Statefulset deployment first. Let’s take a simple example of Cassandra and deploy it using a Statefulset controller. Persistent storage is the key point in Statefulset controller. You can read more about the storage class here.

    For the purpose of this blog, we will use the Google Kubernetes Engine.

    • First, define the storage class as follows:
    apiVersion: storage.k8s.io/v1
    kind: StorageClass
    metadata:
      name: fast
    provisioner: kubernetes.io/gce-pd
    parameters:
      type: pd-ssd

    • Then create the Storage class using kubectl:
    $ kubectl create -f storage_class.yaml

    • Here is the YAML file for the Cassandra service and the Statefulset deployment.
    apiVersion: v1
    kind: Service
    metadata:
      labels:
        app: cassandra
      name: cassandra
    spec:
      clusterIP: None
      ports:
      - port: 9042
      selector:
        app: cassandra
    ---
    apiVersion: apps/v1beta2
    kind: StatefulSet
    metadata:
      name: cassandra
      labels:
        app: cassandra
    spec:
      serviceName: cassandra
      replicas: 3
      updateStrategy:
        type: OnDelete
      selector:
        matchLabels:
          app: cassandra
      template:
        metadata:
          labels:
            app: cassandra
        spec:
          terminationGracePeriodSeconds: 1800
          containers:
          - name: cassandra
            image: gcr.io/google-samples/cassandra:v12
            imagePullPolicy: Always
            ports:
            - containerPort: 7000
              name: intra-node
            - containerPort: 7001
              name: tls-intra-node
            - containerPort: 7199
              name: jmx
            - containerPort: 9042
              name: cql
            resources:
              limits:
                cpu: "500m"
                memory: 1Gi
              requests:
               cpu: "500m"
               memory: 1Gi
            securityContext:
              capabilities:
                add:
                  - IPC_LOCK
            lifecycle:
              preStop:
                exec:
                  command: 
                  - /bin/sh
                  - -c
                  - nodetool drain
            env:
              - name: MAX_HEAP_SIZE
                value: 512M
              - name: HEAP_NEWSIZE
                value: 100M
              - name: CASSANDRA_SEEDS
                value: "cassandra-0.cassandra.default.svc.cluster.local"
              - name: CASSANDRA_CLUSTER_NAME
                value: "K8Demo"
              - name: CASSANDRA_DC
                value: "DC1-K8Demo"
              - name: CASSANDRA_RACK
                value: "Rack1-K8Demo"
              - name: POD_IP
                valueFrom:
                  fieldRef:
                    fieldPath: status.podIP
            readinessProbe:
              exec:
                command:
                - /bin/bash
                - -c
                - /ready-probe.sh
              initialDelaySeconds: 15
              timeoutSeconds: 5
            volumeMounts:
            - name: cassandra-data
              mountPath: /cassandra_data
      volumeClaimTemplates:
      - metadata:
          name: cassandra-data
        spec:
          accessModes: [ "ReadWriteOnce" ]
          storageClassName: "fast"
          resources:
            requests:
              storage: 5Gi

    • Let’s create the Statefulset now.
    $ kubectl create -f cassandra.yaml

    • After creating Cassandra Statefulset, if you check the running pods then you will find something like,
    $ kubectl get podsNAME READY STATUS RESTARTS AGE
    cassandra-0 1/1 Running 0 2m
    cassandra-1 1/1 Running 0 2m
    cassandra-2 1/1 Running 0 2m

    • Check if Cassandra cluster is formed correctly using following command:
    $ kubectl exec -it cassandra-0 -- nodetool statusDatacenter: DC1-K8Demo
    #ERROR!
    Status=Up/Down
    |/ State=Normal/Leaving/Joining/Moving
    
    Address Load Tokens Owns Host ID Rack
    UN 192.168.4.193 101.15 KiB 32 72.0% abd9f52d-85ef-44ee-863c-e1b174cd9412 Rack1-K8Demo
    UN 192.168.199.67 187.81 KiB 32 72.8% c40e89e4-44fe-4fc2-9e8a-863b6a74c90c Rack1-K8Demo
    UN 192.168.187.196 131.42 KiB 32 55.2% c235505c-eec5-43bc-a4d9-350858814fe5 Rack1-K8Demo

    • Let’s describe the running pod first before updating. Look for the image field in the output of the following command
    $ kubectl describe pod cassandra-0

    • The Image field will show gcr.io/google-samples/cassandra:v12 . Now, let’s patch the Cassandra statefulset with the latest image to which we want to update. The latest image might contain the new Cassandra version or database schema changes. Before upgrading such crucial components, it’s always safe to have the backup of the data,
    $ kubectl patch statefulset cassandra --type='json' -p='[{"op": "replace", "path": "/spec/template/spec/containers/0/image", "value":"gcr.io/google-samples/cassandra:v13"}]'

    You will see output as `statefulset.apps “cassandra” patched`, but controller won’t update the running pod automatically in this strategy. You need to delete the pods once and wait till pods with new configuration comes up. Let’s try deleting the cassandra-0 pod.

    $ kubectl delete pod cassandra-0

    • Wait till cassandra-0 comes up in running state and then check if the cassandra-0 is running with intended/updated image i.e. gcr.io/google-samples/cassandra:v13 Now, cassandra-0 is running the new image while cassandra-1 and cassandra-2 are still running the old image. You need to delete these pods for the new image to take effect in this strategy.

    Rolling update strategy

    Rolling update is an automated update process. In this, the controller deletes and then recreates each of its pods. Pods get updated one at a time. While updating, the controller makes sure that an updated pod is running and is in ready state before updating its predecessor. The pods in the StatefulSet are updated in reverse ordinal order(same as pod termination order i.e from the largest ordinal to the smallest)

    For the rolling update strategy, we will create the Cassandra statefulset with the .spec.updateStrategy field pointing to RollingUpdate

    apiVersion: v1
    kind: Service
    metadata:
      labels:
        app: cassandra
      name: cassandra
    spec:
      clusterIP: None
      ports:
      - port: 9042
      selector:
        app: cassandra
    ---
    apiVersion: apps/v1beta2
    kind: StatefulSet
    metadata:
      name: cassandra
      labels:
        app: cassandra
    spec:
      serviceName: cassandra
      replicas: 3
      updateStrategy:
        type: RollingUpdate
      selector:
        matchLabels:
          app: cassandra
      template:
        metadata:
          labels:
            app: cassandra
        spec:
          terminationGracePeriodSeconds: 1800
          containers:
          - name: cassandra
            image: gcr.io/google-samples/cassandra:v12
            imagePullPolicy: Always
            ports:
            - containerPort: 7000
              name: intra-node
            - containerPort: 7001
              name: tls-intra-node
            - containerPort: 7199
              name: jmx
            - containerPort: 9042
              name: cql
            resources:
              limits:
                cpu: "500m"
                memory: 1Gi
              requests:
               cpu: "500m"
               memory: 1Gi
            securityContext:
              capabilities:
                add:
                  - IPC_LOCK
            lifecycle:
              preStop:
                exec:
                  command: 
                  - /bin/sh
                  - -c
                  - nodetool drain
            env:
              - name: MAX_HEAP_SIZE
                value: 512M
              - name: HEAP_NEWSIZE
                value: 100M
              - name: CASSANDRA_SEEDS
                value: "cassandra-0.cassandra.default.svc.cluster.local"
              - name: CASSANDRA_CLUSTER_NAME
                value: "K8Demo"
              - name: CASSANDRA_DC
                value: "DC1-K8Demo"
              - name: CASSANDRA_RACK
                value: "Rack1-K8Demo"
              - name: POD_IP
                valueFrom:
                  fieldRef:
                    fieldPath: status.podIP
            readinessProbe:
              exec:
                command:
                - /bin/bash
                - -c
                - /ready-probe.sh
              initialDelaySeconds: 15
              timeoutSeconds: 5
            volumeMounts:
            - name: cassandra-data
              mountPath: /cassandra_data
      volumeClaimTemplates:
      - metadata:
          name: cassandra-data
        spec:
          accessModes: [ "ReadWriteOnce" ]
          storageClassName: "fast"
          resources:
            requests:
              storage: 5Gi

    • To try the rolling update feature, we can patch the existing statefulset with the updated image.
    $ kubectl patch statefulset cassandra --type='json' -p='[{"op": "replace", "path": "/spec/template/spec/containers/0/image", "value":"gcr.io/google-samples/cassandra:v13"}]'

    • Once you execute the above command, monitor the output of the following command,
    $ kubectl get pods -w

    In the case of failure in update process, controller restores any pod that fails during the update to its current version i.e. pods that have already received the update will be restored to the updated version, and pods that have not yet received the update will be restored to the previous version.

    Partitioning a RollingUpdate (Staging an Update)

    The updateStrategy contains one more field for partitioning the RollingUpdate. If a partition is specified, all pods with an ordinal greater than or equal to that of the provided partition will be updated and the pods with an ordinal that is less than the partition will not be updated. If the pods with an ordinal value less than the partition get deleted, then those pods will get recreated with the old definition/version. This partitioning rolling update feature plays important role in the scenario where if you want to stage an update, roll out a canary, or perform a phased rollout.

    RollingUpdate supports partitioning option. You can define the partition parameter in the .spec.updateStrategy

    $ kubectl patch statefulset cassandra -p '{"spec":{"updateStrategy":{"type":"RollingUpdate","rollingUpdate":{"partition":2}}}}'

    In the above command, we are giving partition value as 2, which will patch the Cassandra statefulset in such a way that, whenever we try to update the Cassandra statefulset, it will update the cassandra-2 pod only. Let’s try to patch the updated image to existing statefulset.

    $ kubectl patch statefulset cassandra --type='json' -p='[{"op": "replace", "path": "/spec/template/spec/containers/0/image", "value":"gcr.io/google-samples/cassandra:v14"}]'

    After patching, watch the following command output,

    $ kubectl get pods -w

    You can keep decrementing the partition value and that many pods will keep taking the effect of the applied patch. For example, if you patch the statefulset with partition=0 then all the pods of the Cassandra statefulset will get updated with provided upgrade configuration.

    Verifying if the upgrade was successful

    Verifying the upgrade process of your application is the important step to conclude the upgrade. This step might differ as per the application. Here, in the blog we have taken the Cassandra example, so we will verify if the cluster of the Cassandra nodes is being formed properly.

    Use `nodetool status` command to verify the cluster. After upgrading all the pods, you might want to run some post-processing like migrating schema if your upgrade dictates that etc.

    As per the upgrade strategy, verification of your application can be done by following ways.

    1. In OnDelete update strategy, you can keep updating pod one by one and keep checking the application status to make sure the upgrade working fine.
    2. In RollingUpdate strategy, you can check the application status once all the running pods of your application gets upgraded.

    For Cassandra like application, OnDelete update is more preferred than RollingUpdate. In rolling update, we saw that Cassandra pod gets updated one by one, starting from high to low ordinal index. There might be the case where after updating 2 pods, Cassandra cluster might go in failed state but you can not recover it like the OnDelete strategy. You have to try to recover Cassandra once the complete upgrade is done i.e. once all the pods get upgraded to provided image. If you have to use the rolling update then try partitioning the rolling update.

    Conclusion

    In this blog, we went through the Kubernetes controllers and mainly through statefulsets. We learnt about the differences between blue-green deployment and rolling update strategies then we played with the Cassandra statefulset example and successfully upgraded it with update strategies like OnDelete and RollingUpdate. Do let us know if you have any questions, queries and additional thoughts in the comments section below.

  • A Quick Introduction to Data Analysis With Pandas

    Python is a great language for doing data analysis, primarily because of the fantastic ecosystem of data-centric Python packages. Pandas is one of those packages and makes importing and analyzing data much easier.

    Pandas aims to integrate the functionality of NumPy and matplotlib to give you a convenient tool for data analytics and visualization. Besides the integration,  it also makes the usage far more better.

    In this blog, I’ll give you a list of useful pandas snippets that can be reused over and over again. These will definitely save you some time that you may otherwise need to skim through the comprehensive Pandas docs.

    The data structures in Pandas are capable of holding elements of any type: Series, DataFrame.

    Series

    A one-dimensional object that can hold any data type such as integers, floats, and strings

    A Series object can be created of different values. Series can be remembered similar to a Python list.

    In the below example, NaN is NumPy’s nan symbol which tells us that the element is not a number but it can be used as one numerical type pointing out to be not a number. The type of series is an object because the series has mixed contents of strings and numbers.

    >>> import pandas as pd
    >>> import numpy as np
    >>> series = pd.Series([12,32,54,2, np.nan, "a string", 6])
    >>> series
    0          12
    1          32
    2          54
    3           2
    4         NaN
    5    a string
    6           6
    dtype: object

    Now if we use only numerical values, we get the basic NumPy dtype – float for our series.

    >>> series = pd.Series([1,2,np.nan, 4])
    >>> series
    0    1.0
    1    2.0
    2    NaN
    3    4.0
    dtype: float64

    DataFrame

    A two-dimensional labeled data structure where columns can be of different types.

    Each column in a Pandas DataFrame represents a Series object in memory.

    In order to convert a certain Python object (dictionary, lists, etc) to a DataFrame, it is extremely easy. From the python dictionaries, the keys map to Column names while values correspond to a list of column values.

    >>> d = {
        "stats": pd.Series(np.arange(10,15,1.0)),
        "year": pd.Series(["2012","2007","2012","2003"]),
        "intake": pd.Series(["SUMMER","WINTER","WINTER","SUMMER"]),
    }
    >>> df = pd.DataFrame(d)
    >>> df

    Reading CSV files

    Pandas can work with various file types while reading any file you need to remember.

    pd.read_filetype()

    Now you will have to only replace “filetype” with the actual type of the file, like csv or excel. You will have to give the path of the file inside the parenthesis as the first argument. You can also pass in different arguments that relate to opening the file. (Reading a csv file? See this)

    >>> df = pd.read_csv('companies.csv')
    >>> df.head()
    view raw

    Accessing Columns and Rows

    DataFrame comprises of three sub-components, the indexcolumns, and the data (also known as values).

    The index represents a sequence of values. In the DataFrame, it always on the left side. Values in an index are in bold font. Each individual value of the index is called a label. Index is like positions while the labels are values at that particular index. Sometimes the index is also referred to as row labels. In all the examples below, the labels and indexes are the same and are just integers beginning from 0 up to n-1, where n is the number of rows in the table.

    Selecting rows is done using loc and iloc:

    • loc gets rows (or columns) with particular labels from the index. Raises KeyError when the items are not found.
    • iloc gets rows (or columns) at particular positions/index (so it only takes integers). Raises IndexError if a requested indexer is out-of-bounds.
    >>> df.loc[:5]              #similar to df.head()

    Accessing the data using column names

    Pandas takes an extra step and allows us to access data through labels in DataFrames.

    >>> df.loc[:5, ["name","vertical", "url"]]

    In Pandas, selecting data is very easy and similar to accessing an element from a dictionary or a list.

    You can select a column (df[col_name]) and it will return column with label col_name as a Series, because rows and columns are stored as Series in a DataFrame, If you need to access more columns (df[[col_name_1, col_name_2]]) and it returns columns as a new DataFrame.

    Filtering DataFrames with Conditional Logic

    Let’s say we want all the companies with the vertical as B2B, the logic would be:

    >>> df[(df['vertical'] == 'B2B')]

    If we want the companies for the year 2009, we would use:

    >>> df[(df['year'] == 2009)]

    Need to combine them both? Here’s how you would do it:

    >>> df[(df['vertical'] == 'B2B') & (df['year'] == 2009)]

    Get all companies with vertical as B2B for the year 2009

    Sort and Groupby

    Sorting

    Sort values by a certain column in ascending order by using:

    >>> df.sort_values(colname)

    >>> df.sort_values(colname,ascending=False)

    Furthermore, it’s also possible to sort values by multiple columns with different orders. colname_1 is being sorted in ascending order and colname_2 in descending order by using:

    >>> df.sort_values([colname_1,colname_2],ascending=[True,False])

    Grouping

    This operation involves 3 steps; splitting of the data, applying a function on each of the group, and finally combining the results into a data structure. This can be used to group large amounts of data and compute operations on these groups.

    df.groupby(colname) returns a groupby object for values from one column while df.groupby([col1,col2]) returns a groupby object for values from multiple columns.

    Data Cleansing

    Data cleaning is a very important step in data analysis.

    Checking missing values in the data

    Check null values in the DataFrame by using:

    >>> df.isnull()

    This returns a boolean array (an array of true for missing values and false for non-missing values).

    >>> df.isnull().sum()

    Check non null values in the DataFrame using pd.notnull(). It returns a boolean array, exactly converse of df.notnull()

    Removing Empty Values

    Dropping empty values can be done easily by using:

    >>> df.dropna()

    This drops the rows having empty values or df.dropna(axis=1) to drop the columns.

    Also, if you wish to fill the missing values with other values, use df.fillna(x). This fills all the missing values with the value x (here you can put any value that you want) or s.fillna(s.mean()) which replaces null values with the mean (mean can be replaced with any function from the arithmetic section).

    Operations on Complete Rows, Columns, or Even All Data

    >>> df["url_len"] = df["url"].map(len)

    The .map() operation applies a function to each element of a column.

    .apply() applies a function to columns. Use .apply(axis=1) to do it on the rows.

    Iterating over rows

    Very similar to iterating any of the python primitive types such as list, tuples, dictionaries.

    >>> for i, row in df.iterrows():
            print("Index {0}".format(i))
            print("Row {0}".format(row))

    The .iterrows() loops 2 variables together i.e, the index of the row and the row itself, variable is the index and variable row is the row in the code above.

    Tips & Tricks

    Using ufuncs (also known as Universal Functions). Python has the .apply() which applies a function to columns/rows. Similarly, Ufuncs can be used while preprocessing. What is the difference between ufuncs and .apply()?

    Ufuncs is a numpy library, implemented in C which is highly efficient (ufuncs are around 10 times faster).

    A list of common Ufuncs:

    isinf: Element-wise checks for positive or negative infinity.

    isnan: Element-wise checks for NaN and returns result as a boolean array.

    isnat: Element-wise checks for NaT (not time) and returns result as a boolean array.

    trunc: Return the truncated value of the input, element-wise.

    .dt commands: Element-wise processing for date objects.

    High-Performance Pandas

    Pandas performs various vectorized/broadcasted operations and grouping-type operations. These operations are efficient and effective.

    As of version 0.13, Pandas included tools that allow us to directly access C-speed operations without costly allocation of intermediate arrays. There are two functions, eval() and query().

    DataFrame.eval() for efficient operations:

    >>> import pandas as pd
    >>> nrows, ncols = 100000, 100
    >>> rng = np.random.RandomState(42)
    >>> df1, df2, df3, df4 = (pd.DataFrame(rng.rand(nrows, ncols))
                          for i in range(4))

    To compute the sum of df1, df2, df3, and df4 DataFrames using the typical Pandas approach, we can just write the sum:

    >>> %timeit df1 + df2 + df3 + df4
    
    10 loops, best of 3: 103.1 ms per loop

    A better and optimized approach for the same operation can be computed via pd.eval():

    >>> %timeit pd.eval('df1 + df2 + df3 + df4')
    
    10 loops, best of 3: 53.6 ms per loop

    %timeit — Measure execution time of small code snippets.

    The eval() expression is about 50% faster (it also consumes mush less memory).

    And it performs the same result:

    >>> np.allclose(df1 + df2 + df3 + df4,d.eval('df1 + df2 + df3 + df4'))
    
    True

    np.allclose() is a numpy function which returns True if two arrays are element-wise equal within a tolerance.

    Column-Wise & Assignment Operations Using df.eval()

    Normal expression to split the first character of a column and assigning it to the same column can be done by using:

    >>> df['batch'] = df['batch'].str[0]

    By using df.eval(), same expression can be performed much faster:

    >>> df.eval("batch=batch.str[0]")

    DataFrame.query() for efficient operations:

    Similar to performing filtering operations with conditional logic, to filter rows with vertical as B2B and year as 2009, we do it by using:

    >>> %timeit df[(df['vertical'] == 'B2B') & (df['year'] == 2009)]
    
    1.69 ms ± 57 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

    With .query() the same filtering can be performed about 50% faster.

    >>> %timeit df.query("vertical == 'B2B' and year == 2009")
    
    875 µs ± 24.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

    When to use eval() and query()? 

    Two aspects: computation time and memory usage. 

    Memory usage: Every operation which involves NumPy/Pandas DataFrames results into implicit creation of temporary variables. In such cases, if the memory usage of these temporary variables is greater, using eval() and query() is an appropriate choice to reduce the memory usage.

    Computation time: Traditional method of performing NumPy/Pandas operations is faster for smaller arrays! The real benefit of eval()/query() is achieved mainly because of the saved memory, and also because of the cleaner syntax they offer.

    Conclusion

    Pandas is a powerful and fun library for data manipulation/analysis. It comes with easy syntax and fast operations. The blog highlights the most used pandas implementation and optimizations. Best way to master your skills over pandas is to use real datasets, beginning with Kaggle kernels to learning how to use pandas for data analysis. Check out more on real time text classification using Kafka and Scikit-learn and explanatory vs. predictive models in machine learning here.  

  • MQTT Protocol Overview – Everything You Need To Know

    MQTT is the open protocol. This is used for asynchronous message queuing. This has been developed and matured over several years. MQTT is a machine to machine protocol. It’s been widely used with embedded devices. Microsoft is having its own MQTT tool with huge support. Here, we are going to overview the MQTT protocol & its details.

    MQTT Protocol:

    MQTT is a very simple publish / subscribe protocol. It allows you to send messages on a topic (channels) passed through a centralized message broker.

    The MQTT module of API will take care of the publish/ subscribe mechanism along with additional features like authentication, retaining messages and sending duplicate messages to unreachable clients.

    There are three parts of MQTT architecture –

    • MQTT Broker – All messages passed from the client to the server should be sent via the broker.
    • MQTT Server – The API acts as an MQTT server. The MQTT server will be responsible for publishing the data to the clients.
    • MQTT Client – Any third party client who wishes to subscribe to data published by API, is considered as an MQTT Client.

    The MQTT Client and the MQTT Server need to connect to the Broker in order to publish or subscribe messages.

    MQTT Communication Program

    Suppose our API is sending sensor data to get more ideas on MQTT.
    API gathers the sensor data through the Monitoring module, and the MQTT module publishes the data to provide different channels. On the successful connection of external client to the MQTT module of the API, the client would receive sensor data on the subscribed channel.

    Below diagram shows the flow of data from the API Module to the External clients.

    MQTT Broker – EMQTT:

    EMQTT (Erlang MQTT Broker) is a massively scalable and clusterable MQTT V3.1/V3.1.1 broker, written in Erlang/OTP.

    Main responsibilities of a Broker are-

    • Receive all messages
    • Filter messages
    • Decide which are interested clients
    • Publish messages to all the subscribed clients

    All messages published are passed through the broker. The broker generates the Client ID and Message ID, maintains the message queue, and publishes the message.

    There are several brokers that can be used. Default EMQTT broker developed in ErLang.

    MQTT Topics:

    A topic is a string(UTF-8). Using this string, Broker filters messages for all connected clients. One topic may consist of one or more topic levels. Forward slash(topic level separator) is used for separating each topic level.

     

    When API starts, the Monitoring API will monitor the sensor data and publish it in a combination of topics. The third party client can subscribe to any of those topics, based on the requirement.

    The topics are framed in such a way that it provides options for the user to subscribe at level 1, level 2, level 3, level 4, or individual sensor level data.

    While subscribing to each level of sensor data, the client needs to specify the hierarchy of the IDs. For e.g. to subscribe to level 4 sensor data, the client needs to specify level 1 id/ level 2 id/ level 3 id/ level 4 id.

    The user can subscribe to any type of sensor by specifying the sensor role as the last part of the topic.

    If the user doesn’t specify the role, the client will be subscribed to all types of sensors on that particular level.

    The user can also specify the sensor id that they wish to subscribe to. In that case, they need to specify the whole hierarchy of the sensor, starting from project id and ending with sensor id.

    Following is the list of topics exposed by API on startup.

     

    Features supported by MQTT:

    1. Authentication:

    EMQTT provides authentication of every user who intends to publish or subscribe to particular data. The user id and password is stored in the API database, into a separate collection called ‘mqtt

    While connecting to EMQTT broker, we provide the username name and password, and the MQTT Broker will validate the credentials based on the values present in the database.

    2. Access Control:

    EMQTT determines which user is allowed to access which topics. This information is stored in MongoDB under the table ‘mqtt_acl’

    By default, all users are allowed to access all topics by specifying ‘#’ as the allowed topic to publish and subscribe for all users.

    3. QoS:

    The Quality of Service (QoS) level is the Quality transfer of messages which ensures the delivery of messages between sending body & receiving body. There are 3 QoS levels in MQTT:

    • At most once(0) –The message is delivered at most once, or it is not delivered at all.
    • At least once(1) – The message is always delivered at least once.
    • Exactly once(2) – The message is always delivered exactly once.

    4. Last Will Message:

    MQTT uses the Last Will & Testament(LWT) mechanism to notify ungraceful disconnection of a client to other clients. In this mechanism, when a client is connecting to a broker, each client specifies its last will message which is a normal MQTT message with QoS, topic, retained flag & payload. This message is stored by the Broker until it it detects that the client has disconnected ungracefully.

    5. Retain Message:

    MQTT also has a feature of Message Retention. It is done by setting TRUE to retain the flag. It then retained the last message & QoS for the topic. When a client subscribes to a topic, the broker matches the topic with a retained message. Clients will receive messages immediately if the topic and the retained message are matched. Brokers only store one retained message for each topic.

    6. Duplicate Message:

    If a publisher doesn’t receive the acknowledgement of the published packet, it will resend the packet with DUP flag set to true. A duplicate message contains the same Message ID as the original message.

    7. Session:

    In general, when a client connects with a broker for the first time, the client needs to create subscriptions for all topics for which they are willing to receive data/messages from the broker. Suppose a session is not maintained, or there is no persistent session, or the client lost a connection with the broker, then users have to resubscribe to all the topics after reconnecting to the broker. For the clients with limited resources, it would be very tedious to subscribe to all topics again. So brokers use a persistent session mechanism, in which it saves all information relevant to the client. ‘clientId’ provided by client is used as ‘session identifier’, when the client establishes a connection with the broker.

    Features not-supported by MQTT:

    1. Not RESTful:

    MQTT does not allow a client to expose RESTful API endpoints. The only way to communicate is through the publish /subscribe mechanism.

    2. Obtaining Subscription List:

    The MQTT Broker doesn’t have the Client IDs and the subscribed topics by the clients. Hence, the API needs to publish all data to all possible combinations of topics. This would lead to a problem of network congestion in case of large data.

    MQTT Wildcards:

    MQTT clients can subscribe to one or more topics. At a time, one can subscribe to a single topic only. So we can use the following two wildcards to create a topic which can subscribe to many topics to receive data/message.

    1. Plus sign(+):

    This is a single level wildcard. This is used to match specific topic level. We can use this wildcard when we want to subscribe at topic level.

    Example: Suppose we want to subscribe for all Floor level ‘AL’(Ambient light) sensors, we can use Plus (+) sign level wild card instead of a specific zone level. We can use following topic:

    <project_id>/<building_id>/<floor_id>/+/AL</floor_id></building_id></project_id>

    2. Hash Sign(#):

    This is a multi level wildcard. This wildcard can be used only at the end of a topic. All data/messages get subscribed which match to left-hand side of the ‘#’ wildcard.

    Example: In case we want to receive all the messages related to all sensors for floor1 , we can use Hash sing(#) multi level wildcard after floor name & the slash( / ). We can use following topic-

    <level 1_id=””>/<level 2_id=””>/<level 3_id=””>/#</level></level></level>

    MQTT Test tools:

    Following are some popular open source testing tools for MQTT.

    1. MQTT Lens
    2. MQTT SPY
    3. MQTT FX

    Difference between MQTT & AMQP:

    MQTT is designed for lightweight devices like Embedded systems, where bandwidth is costly and the minimum overhead is required. MQTT uses byte stream to exchange data and control everything. Byte stream has optimized 2 byte fixed header, which is prefered for IoT.

    AMQP is designed with more advanced features and uses more system resources. It provides more advanced features related to messaging, topic-based publish & subscribe messaging, reliable queuing, transactions, flexible routing and security.

    Difference between MQTT & HTTP:

    MQTT is data-centric, whereas HTTP is document-centric. HTTP is a request-response protocol for client-server, on the other hand, MQTT uses publish-subscribe mechanism. Publish/subscribe model provides clients with the independent existence from one another and enhances the reliability of the whole system. Even if any of the client is out of network, the system keeps itself up and running

    As compared to HTTP, MQTT is lightweight (very short message header and the smallest packet message size of 2 bytes), and allows to compose lengthy headers and messages.

    MQTT Protocol ensures high delivery guarantees compared to HTTP.

    There are 3 levels of Quality of Services:

    at most once: it guarantees that message will be delivered with the best effort.

    at least once: It guarantees that message will be delivered at a minimum of one time. But the message can also be delivered again..

    exactly once: It guarantees that message will be delivered one and only one time.

    Last will & testament and Retained messages are the options provided by MQTT to users. With Last Will & Testament, in case of unexpected disconnection of a client, all subscribed clients will get a message from the broker. Newly subscribed clients will get immediate status updates via Retained message.

    HTTP Protocol has none of these abilities.

    Conclusion:

    MQTT is one of its kind message queuing protocols, best suited for embedded hardware devices. On the software level, it supports all major operating systems and platforms. It has proven its certainty as an ISO standard in IoT platforms because of its more pragmatic security and message reliability.

  • Building A Scalable API Testing Framework With Jest And SuperTest

    Focus on API testing

    Before starting off, below listed are the reasons why API testing should be encouraged:

    • Identifies bugs before it goes to UI
    • Effective testing at a lower level over high-level broad-stack testing
    • Reduces future efforts to fix defects
    • Time-saving

    Well, QA practices are becoming more automation-centric with evolving requirements, but identifying the appropriate approach is the primary and the most essential step. This implies choosing a framework or a tool to develop a test setup which should be:

    • Scalable 
    • Modular
    • Maintainable
    • Able to provide maximum test coverage
    • Extensible
    • Able to generate test reports
    • Easy to integrate with source control tool and CI pipeline

    To attain the goal, why not develop your own asset rather than relying on the ready-made tools like Postman, JMeter, or any? Let’s have a look at why you should choose ‘writing your own code’ over depending on the API testing tools available in the market:

    1. Customizable
    2. Saves you from the trap of limitations of a ready-made tool
    3. Freedom to add configurations and libraries as required and not really depend on the specific supported plugins of the tool
    4. No limit on the usage and no question of cost
    5. Let’s take Postman for example. If we are going with Newman (CLI of Postman), there are several efforts that are likely to evolve with growing or changing requirements. Adding a new test requires editing in Postman, saving it in the collection, exporting it again and running the entire collection.json through Newman. Isn’t it tedious to repeat the same process every time?

    We can overcome such annoyance and meet our purpose using a self-built Jest framework using SuperTest. Come on, let’s dive in!

    Source: school.geekwall

    Why Jest?

    Jest is pretty impressive. 

    • High performance
    • Easy and minimal setup
    • Provides in-built assertion library and mocking support
    • Several in-built testing features without any additional configuration
    • Snapshot testing
    • Brilliant test coverage
    • Allows interactive watch mode ( jest –watch or jest –watchAll )

    Hold on. Before moving forward, let’s quickly visit Jest configurations, Jest CLI commands, Jest Globals and Javascript async/await for better understanding of the coming content.

    Ready, set, go!

    Creating a node project jest-supertest in our local and doing npm init. Into the workspace, we will install Jest, jest-stare for generating custom test reports, jest-serial-runner to disable parallel execution (since our tests might be dependent) and save these as dependencies.

    npm install jest jest-stare jest-serial-runner --save-dev

    Tags to the scripts block in our package.json. 

    
    "scripts": {
        "test": "NODE_TLS_REJECT_UNAUTHORIZED=0 jest --reporters default jest-stare --coverage --detectOpenHandles --runInBand --testTimeout=60000",
        "test:watch": "jest --verbose --watchAll"
      }

    npm run test command will invoke the test parameter with the following:

    • NODE_TLS_REJECT_UNAUTHORIZED=0: ignores the SSL certificate
    • jest: runs the framework with the configurations defined under Jest block
    • –reporters: default jest-stare 
    • –coverage: invokes test coverage
    • –detectOpenHandles: for debugging
    • –runInBand: serial execution of Jest tests
    • –forceExit: to shut down cleanly
    • –testTimeout = 60000 (custom timeout, default is 5000 milliseconds)

    Jest configurations:

    [Note: This is customizable as per requirements]

    "jest": {
        "verbose": true,
        "testSequencer": "/home/abc/jest-supertest/testSequencer.js",
        "coverageDirectory": "/home/abc/jest-supertest/coverage/my_reports/",
        "coverageReporters": ["html","text"],
        "coverageThreshold": {
          "global": {
            "branches": 100,
            "functions": 100,
            "lines": 100,
            "statements": 100
          }
        }
      }

    testSequencer: to invoke testSequencer.js in the workspace to customize the order of running our test files

    touch testSequencer.js

    Below code in testSequencer.js will run our test files in alphabetical order.

    const Sequencer = require('@jest/test-sequencer').default;
    
    class CustomSequencer extends Sequencer {
      sort(tests) {
        // Test structure information
        // https://github.com/facebook/jest/blob/6b8b1404a1d9254e7d5d90a8934087a9c9899dab/packages/jest-runner/src/types.ts#L17-L21
        const copyTests = Array.from(tests);
        return copyTests.sort((testA, testB) => (testA.path > testB.path ? 1 : -1));
      }
    }
    
    module.exports = CustomSequencer;

    • verbose: to display individual test results
    • coverageDirectory: creates a custom directory for coverage reports
    • coverageReporters: format of reports generated
    • coverageThreshold: minimum and maximum threshold enforcements for coverage results

    Testing endpoints with SuperTest

    SuperTest is a node library, superagent driven, to extensively test Restful web services. It hits the HTTP server to send requests (GET, POST, PATCH, PUT, DELETE ) and fetch responses.

    Install SuperTest and save it as a dependency.

    npm install supertest --save-dev

    "devDependencies": {
        "jest": "^25.5.4",
        "jest-serial-runner": "^1.1.0",
        "jest-stare": "^2.0.1",
        "supertest": "^4.0.2"
      }

    All the required dependencies are installed and our package.json looks like:

    {
      "name": "supertestjest",
      "version": "1.0.0",
      "description": "",
      "main": "index.js",
      "jest": {
        "verbose": true,
        "testSequencer": "/home/abc/jest-supertest/testSequencer.js",
        "coverageDirectory": "/home/abc/jest-supertest/coverage/my_reports/",
        "coverageReporters": ["html","text"],
        "coverageThreshold": {
          "global": {
            "branches": 100,
            "functions": 100,
            "lines": 100,
            "statements": 100
          }
        }
      },
      "scripts": {
        "test": "NODE_TLS_REJECT_UNAUTHORIZED=0 jest --reporters default jest-stare --coverage --detectOpenHandles --runInBand --testTimeout=60000",
        "test:watch": "jest --verbose --watchAll"
      },
      "author": "",
      "license": "ISC",
      "devDependencies": {
        "jest": "^25.5.4",
        "jest-serial-runner": "^1.1.0",
        "jest-stare": "^2.0.1",
        "supertest": "^4.0.2"
      }
    }

    Now we are ready to create our Jest tests with some defined conventions:

    • describe block assembles multiple tests or its
    • test block – (an alias usually used is ‘it’) holds single test 
    • expect() –  performs assertions 

    It recognizes the test files in __test__/ folder

    • with .test.js extension
    • with .spec.js extension

    Here is a reference app for API tests.

    Let’s write commonTests.js which will be required by every test file. This hits the app through SuperTest, logs in (if required) and saves authorization token. The aliases are exported from here to be used in all the tests. 

    [Note: commonTests.js, be created or not, will vary as per the test requirements]

    touch commonTests.js

    var supertest = require('supertest'); //require supertest
    const request = supertest('https://reqres.in/'); //supertest hits the HTTP server (your app)
    
    /*
    This piece of code is for getting the authorization token after login to your app.
    const token;
    test("Login to the application", function(){
        return request.post(``).then((response)=>{
            token = response.body.token  //to save the login token for further requests
        })
    }); 
    */
    
    module.exports = 
    {
        request
            //, token     -- export if token is generated
    }

    Moving forward to writing our tests on POST, GET, PUT and DELETE requests for the basic understanding of the setup. For that, we are creating two test files to also see and understand if the sequencer works.

    mkdir __test__/
    touch __test__/postAndGet.test.js __test__/putAndDelete.test.js

    As mentioned above and sticking to Jest protocols, we have our tests written.

    postAndGet.test.js test file:

    • requires commonTests.js into ‘request’ alias
    • POST requests to api/users endpoint, calls supertest.post() 
    • GET requests to api/users endpoint, calls supertest.get()
    • uses file system to write globals and read those across all the tests
    • validates response returned on hitting the HTTP endpoints
    const request = require('../commonTests');
    const fs = require('fs');
    let userID;
    
    //Create a new user
    describe("POST request", () => {
      
      try{
        let userDetails;
        beforeEach(function () {  
            console.log("Input user details!")
            userDetails = {
              "name": "morpheus",
              "job": "leader"
          }; //new user details to be created
          });
        
        afterEach(function () {
          console.log("User is created with ID : ", userID)
        });
    
    	  it("Create user data", async done => {
    
            return request.request.post(`api/users`) //post() of supertest
                    //.set('Authorization', `Token $  {request.token}`) //Authorization token
                    .send(userDetails) //Request header
                    .expect(201) //response to be 201
                    .then((res) => {
                        expect(res.body).toBeDefined(); //test if response body is defined
                        //expect(res.body.status).toBe("success")
                        userID = res.body.id;
                        let jsonContent = JSON.stringify({userId: res.body.id}); // create a json
                        fs.writeFile("data.json", jsonContent, 'utf8', function (err) //write user id into global json file to be used 
                        {
                        if (err) {
                            return console.log(err);
                        }
                        console.log("POST response body : ", res.body)
                        done();
                        });
                      })
                    })
                  }
                  catch(err){
                    console.log("Exception : ", err)
                  }
            });
    
    //GET all users      
    describe("GET all user details", () => {
      
      try{
          beforeEach(function () {
            console.log("GET all users details ")
        });
              
          afterEach(function () {
            console.log("All users' details are retrieved")
        });
    
          test("GET user output", async done =>{
            await request.request.get(`api/users`) //get() of supertest
                                    //.set('Authorization', `Token ${request.token}`) 
                                    .expect(200).then((response) =>{
                                    console.log("GET RESPONSE : ", response.body);
                                    done();
                        })
          })
        }
      catch(err){
        console.log("Exception : ", err)
        }
    });

    putAndDelete.test.js file:

    • requires commonsTests into ‘request’ alias
    • calls data.json into ‘data’ alias which was created by the file system in our previous test to write global variables into it
    • PUT sto api/users/${data.userId} endpoint, calls supertest.put() 
    • DELETE requests to api/users/${data.userId} endpoint, calls supertest.delete() 
    • validates response returned by the endpoints
    • removes data.json (similar to unsetting global variables) after all the tests are done
    const request = require('../commonTests');
    const fs = require('fs'); //file system
    const data = require('../data.json'); //data.json containing the global variables
    
    //Update user data
    describe("PUT user details", () => {
    
        try{
            let newDetails;
            beforeEach(function () {
                console.log("Input updated user's details");
                newDetails = {
                    "name": "morpheus",
                    "job": "zion resident"
                }; // details to be updated
      
            });
            afterEach(function () {
                console.log("user details are updated");
            });
      
            test("Update user now", async done =>{
    
                console.log("User to be updated : ", data.userId)
    
                const response = await request.request.put(`api/users/${data.userId}`).send(newDetails) //call put() of supertest
                                    //.set('Authorization', `Token ${request.token}`) 
                                            .expect(200)
                expect(response.body.updatedAt).toBeDefined();
                console.log("UPDATED RESPONSE : ", response.body);
                done();
        })
      }
        catch(err){
            console.log("ERROR : ", err)
        }
    });
    
    //DELETE the user
    describe("DELETE user details", () =>{
        try{
            beforeAll(function (){
                console.log("To delete user : ", data.userId)
            });
    
            test("Delete request", async done =>{
                const response = await request.request.delete(`api/users/${data.userId}`) //invoke delete() of supertest
                                            .expect(204) 
                console.log("DELETE RESPONSE : ", response.body);
                done(); 
            });
    
            afterAll(function (){
                console.log("user is deleted!!")
                fs.unlinkSync('data.json'); //remove data.json after all tests are run
            });
        }
    
        catch(err){
            console.log("EXCEPTION : ", err);
        }
    });

    And we are done with setting up a decent framework and just a command away!

    npm test

    Once complete, the test results will be immediately visible on the terminal.

    Test results HTML report is also generated as index.html under jest-stare/ 

    And test coverage details are created under coverage/my_reports/ in the workspace.

    Similarly, other HTTP methods can also be tested, like OPTIONS – supertest.options() which allows dealing with CORS, PATCH – supertest.patch(), HEAD – supertest.head() and many more.

    Wasn’t it a convenient and successful journey?

    Conclusion

    So, wrapping it up with a note that API testing needs attention, and as a QA, let’s abide by the concept of a testing pyramid which is nothing but the mindset of a tester and how to combat issues at a lower level and avoid chaos at upper levels, i.e. UI. 

    Testing Pyramid

    I hope you had a good read. Kindly spread the word. Happy coding!

  • Unit Testing Data at Scale using Deequ and Apache Spark

    Everyone knows the importance of knowledge and how critical it is to progress. In today’s world, data is knowledge. But that’s only when the data is “good” and correctly interpreted. Let’s focus on the “good” part. What do we really mean by “good data”?

    Its definition can change from use case to use case but, in general terms, good data can be defined by its accuracy, legitimacy, reliability, consistency, completeness, and availability.

    Bad data can lead to failures in production systems, unexpected outputs, and wrong inferences, leading to poor business decisions.

    It’s important to have something in place that can tell us about the quality of the data we have, how close it is to our expectations, and whether we can rely on it.

    This is basically the problem we’re trying to solve.

    The Problem and the Potential Solutions

    A manual approach to data quality testing is definitely one of the solutions and can work well.

    We’ll need to write code for computing various statistical measures, running them manually on different columns, maybe draw some plots, and then conduct some spot checks to see if there’s something not right or unexpected. The overall process can get tedious and time-consuming if we need to do it on a daily basis.

    Certain tools can make life easier for us, like:

    In this blog, we’ll be focussing on Amazon Deequ.

    Amazon Deequ

    Amazon Deequ is an open-source tool developed and used at Amazon. It’s built on top of Apache Spark, so it’s great at handling big data. Deequ computes data quality metrics regularly, based on the checks and validations set, and generates relevant reports.

    Deequ provides a lot of interesting features, and we’ll be discussing them in detail. Here’s a look at its main components:

    Source: AWS

    Prerequisites

    Working with Deequ requires having Apache Spark up and running with Deequ as one of the dependencies.

    As of this blog, the latest version of Deequ, 1.1.0, supports Spark 2.2.x to 2.4.x and Spark 3.0.x.

    Sample Dataset

    For learning more about Deequ and its features, we’ll be using an open-source IMDb dataset which has the following schema: 

    root
     |-- tconst: string (nullable = true)
     |-- titleType: string (nullable = true)
     |-- primaryTitle: string (nullable = true)
     |-- originalTitle: string (nullable = true)
     |-- isAdult: integer (nullable = true)
     |-- startYear: string (nullable = true)
     |-- endYear: string (nullable = true)
     |-- runtimeMinutes: string (nullable = true)
     |-- genres: string (nullable = true)
     |-- averageRating: double (nullable = true)
     |-- numVotes: integer (nullable = true)

    Here, tconst is the primary key, and the rest of the columns are pretty much self-explanatory.

    Data Analysis and Validation

    Before we start defining checks on the data, if we want to compute some basic stats on the dataset, Deequ provides us with an easy way to do that. They’re called metrics.

    Deequ provides support for the following metrics:

    ApproxCountDistinct, ApproxQuantile, ApproxQuantiles, Completeness, Compliance, Correlation, CountDistinct, DataType, Distance, Distinctness, Entropy, Histogram, Maximum, MaxLength, Mean, Minimum, MinLength, MutualInformation, PatternMatch, Size, StandardDeviation, Sum, UniqueValueRatio, Uniqueness

    Let’s go ahead and apply some metrics to our dataset.

    val runAnalyzer: AnalyzerContext = { AnalysisRunner
      .onData(data)
      .addAnalyzer(Size())
      .addAnalyzer(Completeness("averageRating"))
      .addAnalyzer(Uniqueness("tconst"))
      .addAnalyzer(Mean("averageRating"))
      .addAnalyzer(StandardDeviation("averageRating"))
      .addAnalyzer(Compliance("top rating", "averageRating >= 7.0"))
      .addAnalyzer(Correlation("numVotes", "averageRating"))
      .addAnalyzer(Distinctness("tconst"))
      .addAnalyzer(Maximum("averageRating"))
      .addAnalyzer(Minimum("averageRating"))
      .run()
    }
    
    val metricsResult = successMetricsAsDataFrame(spark, runAnalyzer)
    metricsResult.show(false)

    We get the following output by running the code above:

    +-----------+----------------------+-----------------+--------------------+
    |entity     |instance              |name             |value               |
    +-----------+----------------------+-----------------+--------------------+
    |Mutlicolumn|numVotes,averageRating|Correlation      |0.013454113877394851|
    |Column     |tconst                |Uniqueness       |1.0                 |
    |Column     |tconst                |Distinctness     |1.0                 |
    |Dataset    |*                     |Size             |7339583.0           |
    |Column     |averageRating         |Completeness     |0.14858528066240276 |
    |Column     |averageRating         |Mean             |6.886130810579155   |
    |Column     |averageRating         |StandardDeviation|1.3982924856469208  |
    |Column     |averageRating         |Maximum          |10.0                |
    |Column     |averageRating         |Minimum          |1.0                 |
    |Column     |top rating            |Compliance       |0.080230443609671   |
    +-----------+----------------------+-----------------+--------------------+

    Let’s try to quickly understand what this tells us.

    • The dataset has 7,339,583 rows.
    • The distinctness and uniqueness of the tconst column is 1.0, which means that all the values in the column are distinct and unique, which should be expected as it’s the primary key column.
    • The averageRating column has a min of 1 and a max of 10 with a mean of 6.88 and a standard deviation of 1.39, which tells us about the variation in the average rating values across the data.
    • The completeness of the averageRating column is 0.148, which tells us that we have an average rating available for around 15% of the dataset’s records.
    • Then, we tried to see if there’s any correlation between the numVotes and averageRating column. This metric calculates the Pearson correlation coefficient, which has a value of 0.01, meaning there’s no correlation between the two columns, which is expected.

    This feature of Deequ can be really helpful if we want to quickly do some basic analysis on a dataset.

    Let’s move on to defining and running tests and checks on the data.

    Data Validation

    For writing tests for our dataset, we use Deequ’s VerificationSuite and add checks on attributes of the dataset.

    Deequ has a big handy list of validators available to use, which are:

    hasSize, isComplete, hasCompleteness, isUnique, isPrimaryKey, hasUniqueness, hasDistinctness, hasUniqueValueRatio, hasNumberOfDistinctValues, hasHistogramValues, hasEntropy, hasMutualInformation, hasApproxQuantile, hasMinLength, hasMaxLength, hasMin, hasMax, hasMean, hasSum, hasStandardDeviation, hasApproxCountDistinct, hasCorrelation, satisfies, hasPattern, containsCreditCardNumber, containsEmail, containsURL, containsSocialSecurityNumber, hasDataType, isNonNegative, isPositive, isLessThan, isLessThanOrEqualTo, isGreaterThan, isGreaterThanOrEqualTo, isContainedIn

    Let’s apply some checks to our dataset.

    val validationResult: VerificationResult = { VerificationSuite()
      .onData(data)
      .addCheck(
        Check(CheckLevel.Error, "Review Check") 
          .hasSize(_ >= 100000) // check if the data has atleast 100k records
          .hasMin("averageRating", _ > 0.0) // min rating should not be less than 0
          .hasMax("averageRating", _ < 9.0) // max rating should not be greater than 9
          .containsURL("titleType") // verify that titleType column has URLs
          .isComplete("primaryTitle") // primaryTitle should never be NULL
          .isNonNegative("numVotes") // should not contain negative values
          .isPrimaryKey("tconst") // verify that tconst is the primary key column
          .hasDataType("isAdult", ConstrainableDataTypes.Integral) 
          //column contains Integer values only, expected as values this col has are 0 or 1
          )
      .run()
    }
    
    val results = checkResultsAsDataFrame(spark, validationResult)
    results.select("constraint","constraint_status","constraint_message").show(false)

    We have added some checks to our dataset, and the details about the check can be seen as comments in the above code.

    We expect all checks to pass for our dataset except the containsURL and hasMax ones.

    That’s because the titleType column doesn’t have URLs, and we know that the max rating is 10.0, but we are checking against 9.0.

    We can see the output below:

    +--------------------------------------------------------------------------------------------+-----------------+-----------------------------------------------------+
    |constraint                                                                                  |constraint_status|constraint_message                                   |
    +--------------------------------------------------------------------------------------------+-----------------+-----------------------------------------------------+
    |SizeConstraint(Size(None))                                                                  |Success          |                                                     |
    |MinimumConstraint(Minimum(averageRating,None))                                              |Success          |                                                     |
    |MaximumConstraint(Maximum(averageRating,None))                                              |Failure          |Value: 10.0 does not meet the constraint requirement!|
    |containsURL(titleType)                                                                      |Failure          |Value: 0.0 does not meet the constraint requirement! |
    |CompletenessConstraint(Completeness(primaryTitle,None))                                     |Success          |                                                     |
    |ComplianceConstraint(Compliance(numVotes is non-negative,COALESCE(numVotes, 0.0) >= 0,None))|Success          |                                                     |
    |UniquenessConstraint(Uniqueness(List(tconst),None))                                         |Success          |                                                     |
    |AnalysisBasedConstraint(DataType(isAdult,None),<function1>,Some(<function1>),None)          |Success          |                                                     |
    +--------------------------------------------------------------------------------------------+-----------------+-----------------------------------------------------+
    view raw

    In order to perform these checks, behind the scenes, Deequ calculated metrics that we saw in the previous section.

    To look at the metrics Deequ computed for the checks we defined, we can use: 

    VerificationResult.successMetricsAsDataFrame(spark,validationResult)
                      .show(truncate=false)

    Automated Constraint Suggestion

    Automated constraint suggestion is a really interesting and useful feature provided by Deequ.

    Adding validation checks on a dataset with hundreds of columns or on a large number of datasets can be challenging. With this feature, Deequ tries to make our task easier. Deequ analyses the data distribution and, based on that, suggests potential useful constraints that can be used as validation checks.

    Let’s see how this works.

    This piece of code can automatically generate constraint suggestions for us:

    val constraintResult = { ConstraintSuggestionRunner()
      .onData(data)
      .addConstraintRules(Rules.DEFAULT)
      .run()
    }
    
    val suggestionsDF = constraintResult.constraintSuggestions.flatMap { 
      case (column, suggestions) => 
        suggestions.map { constraint =>
          (column, constraint.description, constraint.codeForConstraint)
        } 
    }.toSeq.toDS()
    
    suggestionsDF.select("_1","_2").show(false)

    Let’s look at constraint suggestions generated by Deequ:

    +--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |runtimeMinutes|'runtimeMinutes' has less than 72% missing values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
    |tconst        |'tconst' is not null                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
    |titleType     |'titleType' is not null                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
    |titleType     |'titleType' has value range 'tvEpisode', 'short', 'movie', 'video', 'tvSeries', 'tvMovie', 'tvMiniSeries', 'tvSpecial', 'videoGame', 'tvShort'                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
    |titleType     |'titleType' has value range 'tvEpisode', 'short', 'movie' for at least 90.0% of values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
    |averageRating |'averageRating' has no negative values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
    |originalTitle |'originalTitle' is not null                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
    |startYear     |'startYear' has less than 9% missing values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
    |startYear     |'startYear' has type Integral                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
    |startYear     |'startYear' has no negative values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
    |endYear       |'endYear' has type Integral  
    |endYear       |'endYear' has value range '2017', '2018', '2019', '2016', '2015', '2020', '2014', '2013', '2012', '2011', '2010',......|
    |endYear       |'endYear' has value range '' for at least 99.0% of values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
    |endYear       |'endYear' has no negative values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
    |numVotes      |'numVotes' has no negative values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
    |primaryTitle  |'primaryTitle' is not null                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
    |isAdult       |'isAdult' is not null                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
    |isAdult       |'isAdult' has no negative values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
    |genres        |'genres' has less than 7% missing values                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
    +--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

    We shouldn’t expect the constraint suggestions generated by Deequ to always make sense. They should always be verified before using.

    This is because the algorithm that generates the constraint suggestions just works on the data distribution and isn’t exactly “intelligent.”

    We can see that most of the suggestions generated make sense even though they might be really trivial.

    For the endYear column, one of the suggestions is that endYear should be contained in a list of years, which indeed is true for our dataset. However, it can’t be generalized as every passing year, the value for endYear continues to increase.

    But on the other hand, the suggestion that titleType can take the following values: ‘tvEpisode,’ ‘short,’ ‘movie,’ ‘video,’ ‘tvSeries,’ ‘tvMovie,’ ‘tvMiniSeries,’ ‘tvSpecial,’ ‘videoGame,’ and ‘tvShort’ makes sense and can be generalized, which makes it a great suggestion.

    And this is why we should not blindly use the constraints suggested by Deequ and always cross-check them.

    Something we can do to improve the constraint suggestions is to use the useTrainTestSplitWithTestsetRatio method in ConstraintSuggestionRunner.
    It makes a lot of sense to use this on large datasets.

    How does this work? If we use the config useTrainTestSplitWithTestsetRatio(0.1), Deequ would compute constraint suggestions on 90% of the data and evaluate the suggested constraints on the remaining 10%, which would improve the quality of the suggested constraints.

    Anomaly Detection

    Deequ also supports anomaly detection for data quality metrics.

    The idea behind Deequ’s anomaly detection is that often we have a sense of how much change in certain metrics of our data can be expected. Say we are getting new data every day, and we know that the number of records we get on a daily basis are around 8 to 12k. On a random day, if we get 40k records, we know something went wrong with the data ingestion job or some other job didn’t go right.

    Deequ will regularly store the metrics of our data in a MetricsRepository. Once that’s done, anomaly detection checks can be run. These compare the current values of the metrics to the historical values stored in the MetricsRepository, and that helps Deequ to detect anomalous changes that are a red flag.

    One of Deequ’s anomaly detection strategies is the RateOfChangeStrategy, which limits the maximum change in the metrics by some numerical factor that can be passed as a parameter.

    Deequ supports other strategies that can be found here. And code examples for anomaly detection can be found here.

    Conclusion

    We learned about the main features and capabilities of AWS Lab’s Deequ.

    It might feel a little daunting to people unfamiliar with Scala or Spark, but using Deequ is very easy and straightforward. Someone with a basic understanding of Scala or Spark should be able to work with Deequ’s primary features without any friction.

    For someone who rarely deals with data quality checks, manual test runs might be a good enough option. However, for someone dealing with new datasets frequently, as in multiple times in a day or a week, using a tool like Deequ to perform automated data quality testing makes a lot of sense in terms of time and effort.

    We hope this article helped you get a deep dive into data quality testing and using Deequ for these types of engineering practices.

  • How to Make Asynchronous Calls in Redux Without Middlewares

    Redux has greatly helped in reducing the complexities of state management. Its one way data flow is easier to reason about and it also provides a powerful mechanism to include middlewares which can be chained together to do our biding. One of the most common use cases for the middleware is to make async calls in the application. Different middlewares like redux-thunk, redux-sagas, redux-observable, etc are a few examples. All of these come with their own learning curve and are best suited for tackling different scenarios.

    But what if our use-case is simple enough and we don’t want to have the added complexities that implementing a middleware brings? Can we somehow implement the most common use-case of making async API calls using only redux and javascript?

    The answer is Yes! This blog will try to explain on how to implement async action calls in redux without the use of any middlewares.

    So let us first start by making a simple react project by using create-react-app

    npx create-react-app async-redux-without-middlewares
    cd async-redux-without-middlewares
    npm start

    Also we will be using react-redux in addition to redux to make our life a little easier. And to mock the APIs we will be using https://jsonplaceholder.typicode.com/

    We will just implement two API calls to not to over complicate things.

    Create a new file called api.js .It is the file in which we will keep the fetch calls to the endpoint.

    export const getPostsById = id => fetch(`https://jsonplaceholder.typicode.com/Posts/${id}`);
     
    export const getPostsBulk = () => fetch("https://jsonplaceholder.typicode.com/posts");

    Each API call has three base actions associated with it. Namely, REQUEST, SUCCESS and FAIL. Each of our APIs will be in one of these three states at any given time. And depending on these states we can decide how to show our UI. Like when it is in REQUEST state we can have the UI show a loader and when it is in FAIL state we can show a custom UI to tell the user that something has went wrong.

    So we create three constants of REQUEST, SUCCESS and FAIL for each API call which we will be making. In our case the constants.js file will look something like this:

    export const GET_POSTS_BY_ID_REQUEST = "getpostsbyidrequest";
    export const GET_POSTS_BY_ID_SUCCESS = "getpostsbyidsuccess";
    export const GET_POSTS_BY_ID_FAIL = "getpostsbyidfail";
     
    export const GET_POSTS_BULK_REQUEST = "getpostsbulkrequest";
    export const GET_POSTS_BULK_SUCCESS = "getpostsbulksuccess";
    export const GET_POSTS_BULK_FAIL = "getpostsbulkfail";

    The store.js file and the initialState of our application is as follows:

    import { createStore } from 'redux'
    import reducer from './reducers';
     
    const initialState = {
        byId: {
            isLoading: null,
            error: null,
            data: null
        },
        byBulk: {
            isLoading: null,
            error: null,
            data: null
        }
    };
     
    const store = createStore(reducer, initialState, window.__REDUX_DEVTOOLS_EXTENSION__ && window.__REDUX_DEVTOOLS_EXTENSION__());
     
    export default store;

    As can be seen from the above code, each of our APIs data lives in one object the the state object. Keys isLoading tells us if the API is in the REQUEST state.

    Now as we have our store defined, let us see how we will manipulate the statewith different phases that an API call can be in. Below is our reducers.js file.

    import {
        GET_POSTS_BY_ID_REQUEST,
        GET_POSTS_BY_ID_SUCCESS,
        GET_POSTS_BY_ID_FAIL,
     
        GET_POSTS_BULK_REQUEST,
        GET_POSTS_BULK_SUCCESS,
        GET_POSTS_BULK_FAIL
     
    } from "./constants";
     
    const reducer = (state, action) => {
        switch (action.type) {
            case GET_POSTS_BY_ID_REQUEST:
                return {
                    ...state,
                    byId: {
                        isLoading: true,
                        error: null,
                        data: null
                    }
                }
            case GET_POSTS_BY_ID_SUCCESS:
                return {
                    ...state,
                    byId: {
                        isLoading: false,
                        error: false,
                        data: action.payload
                    }
                }
            case GET_POSTS_BY_ID_FAIL:
                return {
                    ...state,
                    byId: {
                        isLoading: false,
                        error: action.payload,
                        data: false
                    }
                }
     
                case GET_POSTS_BULK_REQUEST:
                return {
                    ...state,
                    byBulk: {
                        isLoading: true,
                        error: null,
                        data: null
                    }
                }
            case GET_POSTS_BULK_SUCCESS:
                return {
                    ...state,
                    byBulk: {
                        isLoading: false,
                        error: false,
                        data: action.payload
                    }
                }
            case GET_POSTS_BULK_FAIL:
                return {
                    ...state,
                    byBulk: {
                        isLoading: false,
                        error: action.payload,
                        data: false
                    }
                }
            default: return state;
        }
    }
     
    export default reducer;

    By giving each individual API call its own variable to denote the loading phase we can now easily implement something like multiple loaders in the same screen according to which API call is in which phase.

    Now to actually implement the async behaviour in the actions we just need a normal JavaScript function which will pass the dispatch as the first argument. We pass dispatch to the function because it dispatches actions to the store. Normally a component has access to dispatch but since we want an external function to take control over dispatching, we need to give it control over dispatching.

    const getPostById = async (dispatch, id) => {
        dispatch({ type: GET_POSTS_BY_ID_REQUEST });
     
        try {
            const response = await getPostsById(id);
            const res = await response.json();
            dispatch({ type: GET_POSTS_BY_ID_SUCCESS, payload: res });
        } catch (e) {
            dispatch({ type: GET_POSTS_BY_ID_FAIL, payload: e });
        }
    };

    And a function to give dispatch in the above function’s scope:

    export const getPostByIdFunc = dispatch => {
        return id => getPostById(dispatch, id);
    }

    So now our complete actions.js file looks like this:

    import {
        GET_POSTS_BY_ID_REQUEST,
        GET_POSTS_BY_ID_SUCCESS,
        GET_POSTS_BY_ID_FAIL,
     
        GET_POSTS_BULK_REQUEST,
        GET_POSTS_BULK_SUCCESS,
        GET_POSTS_BULK_FAIL
     
    } from "./constants";
     
    import {
        getPostsById,
        getPostsBulk
    } from "./api";
     
    const getPostById = async (dispatch, id) => {
        dispatch({ type: GET_POSTS_BY_ID_REQUEST });
     
        try {
            const response = await getPostsById(id);
            const res = await response.json();
            dispatch({ type: GET_POSTS_BY_ID_SUCCESS, payload: res });
        } catch (e) {
            dispatch({ type: GET_POSTS_BY_ID_FAIL, payload: e });
        }
    };
     
    const getPostBulk = async dispatch => {
        dispatch({ type: GET_POSTS_BULK_REQUEST });
     
        try {
            const response = await getPostsBulk();
            const res = await response.json();
            dispatch({ type: GET_POSTS_BULK_SUCCESS, payload: res });
        } catch (e) {
            dispatch({ type: GET_POSTS_BULK_FAIL, payload: e });
        }
    };
     
    export const getPostByIdFunc = dispatch => {
        return id => getPostById(dispatch, id);
    }
     
    export const getPostsBulkFunc = dispatch => {
        return () => getPostBulk(dispatch);
    }

    Once this is done, all that is left to do is to pass these functions in mapDispatchToProps of our connected component.

    const mapDispatchToProps = dispatch => {
      return {
        getPostById: getPostByIdFunc(dispatch),
        getPostBulk: getPostsBulkFunc(dispatch)
      }
    };

    Our App.js file looks like the one below:

    import React, { Component } from 'react';
    import './App.css';
     
    import { connect } from 'react-redux';
    import { getPostByIdFunc, getPostsBulkFunc } from './actions';
     
    class App extends Component {
      render() {
        console.log(this.props);
        return (
          <div className="App">
            <button onClick={() => {
              this.props.getPostById(1);
            }}>By Id</button>
            <button onClick={() => {
              this.props.getPostBulk();
            }}>In bulk</button>
          </div>
        );
      }
    }
     
    const mapStateToProps = state => {
      return {
        state
      };
    }
     
    const mapDispatchToProps = dispatch => {
      return {
        getPostById: getPostByIdFunc(dispatch),
        getPostBulk: getPostsBulkFunc(dispatch)
      }
    };

    This is how we do async calls without middlewares in redux. This is a much simpler approach than using a middleware and the learning curve associated with it. If this approach covers all your use cases then by all means use it.

    Conclusion

    This type of approach really shines when you have to make a simple enough application like a demo of sorts, where API calls is all the side-effect that you need. In larger and more complicated applications there are a few inconveniences with this approach. First we have to pass dispatch around to which seems kind of yucky. Also, remember which call requires dispatch and which do not.

    The full code can be found here.

  • The Ultimate Beginner’s Guide to Jupyter Notebooks

    Jupyter Notebooks offer a great way to write and iterate on your Python code. It is a powerful tool for developing data science projects in an interactive way. Jupyter Notebook allows to showcase the source code and its corresponding output at a single place helping combine narrative text, visualizations and other rich media.The intuitive workflow promotes iterative and rapid development, making notebooks the first choice for data scientists. Creating Jupyter Notebooks is completely free as it falls under Project Jupyter which is completely open source.

    Project Jupyter is the successor to an earlier project IPython Notebook, which was first published as a prototype in 2010. Jupyter Notebook is built on top of iPython, an interactive tool for executing Python code in the terminal using REPL model(Read-Eval-Print-Loop). The iPython kernel executes the python code and communicates with the Jupyter Notebook front-end interface. Jupyter Notebooks also provide additional features like storing your code and output and keep the markdown by extending iPython.

    Although Jupyter Notebooks support using various programming languages, we will focus on Python and its application in this article.

    Getting Started with Jupyter Notebooks!

    Installation

    Prerequisites

    As you would have surmised from the above abstract we need to have Python installed on your machine. Either Python 2.7 or Python 3.+ will do.

    Install Using Anaconda

    The simplest way to get started with Jupyter Notebooks is by installing it using Anaconda. Anaconda installs both Python3 and Jupyter and also includes quite a lot of packages commonly used in the data science and machine learning community. You can follow the latest guidelines from here.

    Install Using Pip

    If, for some reason, you decide not to use Anaconda, then you can install Jupyter manually using Python pip package, just follow the below code:

    pip install jupyter

    Launching First Notebook

    Open your terminal, navigate to the directory where you would like to store you notebook and launch the Jupyter Notebooks. Then type the below command and the program will instantiate a local server at http://localhost:8888/tree.

    jupyter notebook

    A new window with the Jupyter Notebook interface will open in your internet browser. As you might have already noticed Jupyter starts up a local Python server to serve web apps in your browser, where you can access the Dashboard and work with the Jupyter Notebooks. The Jupyter Notebooks are platform independent which makes it easier to collaborate and share with others.

    The list of all files is displayed under the Files tab whereas all the running processes can be viewed by clicking on the Running tab and the third tab, Clusters is extended from IPython parallel, IPython’s parallel computing framework. It helps you to control multiple engines, extended from the IPython kernel.

    Let’s start by making a new notebook. We can easily do this by clicking on the New drop-down list in the top- right corner of the dashboard. You see that you have the option to make a Python 3 notebook as well as regular text file, a folder, and a terminal. Please select the Python 3 notebook option.

    Your Jupyter Notebook will open in a new tab as shown in below image.

    Now each notebook is opened in a new tab so that you can simultaneously work with multiple notebooks. If you go back to the dashboard tab, you will see the new file Untitled.ipynb and you should see some green icon to it’s left which indicates your new notebook is running.

     

    Why a .ipynb file?

    .ipynb is the standard file format for storing Jupyter Notebooks, hence the file name Untitled.ipynb. Let’s begin by first understanding what an .ipynb file is and what it might contain. Each .ipynb file is a text file that describes the content of your notebook in a JSON format. The content of each cell, whether it is text, code or image attachments that have been converted into strings, along with some additional metadata is stored in the .ipynb file. You can also edit the metadata by selecting “Edit > Edit Notebook Metadata” from the menu options in the notebook.

    You can also view the content of your notebook files by selecting “Edit” from the controls on the dashboard, there’s no reason to do so unless you really want to edit the file manually.

    Understanding the Notebook Interface

    Now that you have created a notebook, let’s have a look at the various menu options and functions, which are readily available. Take some time out to scroll through the the list of commands that opens up when you click on the keyboard icon (or press Ctrl + Shift + P).

    There are two prominent terminologies that you should care to learn about: cells and kernels are key both to understanding Jupyter and to what makes it more than just a content writing tool. Fortunately, these concepts are not difficult to understand.

    • A kernel is a program that interprets and executes the user’s code. The Jupyter Notebook App has an inbuilt kernel for Python code, but there are also kernels available for other programming languages.
    • A cell is a container which holds the executable code or normal text 

    Cells

    Cells form the body of a notebook. If you look at the screenshot above for a new notebook (Untitled.ipynb), the text box with the green border is an empty cell. There are 4 types of cells:

    • Code – This is where you type your code and when executed the kernel will display its output below the cell.
    • Markdown – This is where you type your text formatted using Markdown and the output is displayed in place when it is run.
    • Raw NBConvert – It’s a command line tool to convert your notebook into another format (like HTML, PDF etc.)
    • Heading – This is where you add Headings to separate sections and make your notebook look tidy and neat. This has now been merged into the Markdown option itself. Adding a ‘#’ at the beginning ensures that whatever you type after that will be taken as a heading.

    Let’s test out how the cells work with a basic “hello world” example. Type print(‘Hello World!’) in the cell and press Ctrl + Enter or click on the Run button in the toolbar at the top.

    print("Hello World!")

    Hello World!

    When you run the cell, the output will be displayed below, and the label to its left changes from In[ ] to In[1] . Moreover, to signify that the cell is still running, Jupyter changes the label to In[*]

    Additionally, it is important to note that the output of a code cell comes from any of the print statements in the code cell, as well as the value of the last line in the cell, irrespective of it being a variable, function call or some other code snippet.

    Markdown

    Markdown is a lightweight, markup language for formatting plain text. Its syntax has a one-to-one correspondence with HTML tags. As this article has been written in a Jupyter notebook, all of the narrative text and images you can see, are written in Markdown. Let’s go through the basics with the following example.

    # This is a level 1 heading 
    ### This is a level 3 heading
    This is how you write some plain text that would form a paragraph.
    You can emphasize the text by enclosing the text like "**" or "__" to make it bold and enclosing the text in "*" or "_" to make it italic. 
    Paragraphs are separated by an empty line.
    * We can include lists.
      * And also indent them.
    
    1. Getting Numbered lists is
    2. Also easy.
    
    [To include hyperlinks enclose the text with square braces and then add the link url in round braces](http://www.example.com)
    
    Inline code uses single backticks: `foo()`, and code blocks use triple backticks:
    
    ``` 
    foo()
    ```
    
    Or can be indented by 4 spaces: 
    
        foo()
        
    And finally, adding images is easy: ![Online Image](https://www.example.com/image.jpg) or ![Local Image](img/image.jpg) or ![Image Attachment](attachment:image.jpg)

    We have 3 different ways to attach images

    • Link the URL of an image from the web.
    • Use relative path of an image present locally
    • Add an attachment to the notebook by using “Edit>Insert Image” option; This method converts the image into a string and store it inside your notebook

    Note that adding an image as an attachment will make the .ipynb file much larger because it is stored inside the notebook in a string format.

    There are a lot more features available in Markdown. To learn more about markdown, you can refer to the official guide from the creator, John Gruber, on his website.

    Kernels

    Every notebook runs on top of a kernel. Whenever you execute a code cell, the content of the cell is executed within the kernel and any output is returned back to the cell for display. The kernel’s state applies to the document as a whole and not individual cells and is persisted over time.

    For example, if you declare a variable or import some libraries in a cell, they will be accessible in other cells. Now let’s understand this with the help of an example. First we’ll import a Python package and then define a function.

    import os, binascii
    def sum(x,y):
      return x+y

    Once the cell above  is executed, we can reference os, binascii and sum in any other cell.

    rand_hex_string = binascii.b2a_hex(os.urandom(15)) 
    print(rand_hex_string)
    x = 1
    y = 2
    z = sum(x,y)
    print('Sum of %d and %d is %d' % (x, y, z))

    The output should look something like this:

    c84766ca4a3ce52c3602bbf02a
    d1f7 Sum of 1 and 2 is 3

    The execution flow of a notebook is generally from top-to-bottom, but it’s common to go back to make changes. The order of execution is shown to the left of each cell, such as In [2] , will let you know whether any of your cells have stale output. Additionally, there are multiple options in the Kernel menu which often come very handy.

    • Restart: restarts the kernel, thus clearing all the variables etc that were defined.
    • Restart & Clear Output: same as above but will also wipe the output displayed below your code cells.
    • Restart & Run All: same as above but will also run all your cells in order from top-to-bottom.
    • Interrupt: If your kernel is ever stuck on a computation and you wish to stop it, you can choose the Interrupt option.

    Naming Your Notebooks

    It is always a best practice to give a meaningful name to your notebooks. You can rename your notebooks from the notebook app itself by double-clicking on the existing name at the top left corner. You can also use the dashboard or the file browser to rename the notebook file. We’ll head back to the dashboard to rename the file we created earlier, which will have the default notebook file name Untitled.ipynb.

    Now that you are back on the dashboard, you can simply select your notebook and click “Rename” in the dashboard controls

    Jupyter notebook - Rename

    Shutting Down your Notebooks

    We can shutdown a running notebook by selecting “File > Close and Halt” from the notebook menu. However, we can also shutdown the kernel either by selecting the notebook in the dashboard and clicking “Shutdown” or by going to “Kernel > Shutdown” from within the notebook app (see images below).

    Shutdown the kernel from Notebook App:

     

    Shutdown the kernel from Dashboard:

     

     

    Sharing Your Notebooks

    When we talk about sharing a notebook, there are two things that might come to our mind. In most cases, we would want to share the end-result of the work, i.e. sharing non-interactive, pre-rendered version of the notebook, very much similar to this article; however, in some cases we might want to share the code and collaborate with others on notebooks with the aid of version control systems such as Git which is also possible.

    Before You Start Sharing

    The state of the shared notebook including the output of any code cells is maintained when exported to a file. Hence, to ensure that the notebook is share-ready, we should follow below steps before sharing.

    1. Click “Cell > All Output > Clear”
    2. Click “Kernel > Restart & Run All”
    3. After the code cells have finished executing, validate the output. 

    This ensures that your notebooks don’t have a stale state or contain intermediary output.

    Exporting Your Notebooks

    Jupyter has built-in support for exporting to HTML, Markdown and PDF as well as several other formats, which you can find from the menu under “File > Download as” . It is a very convenient way to share the results with others. But if sharing exported files isn’t suitable for you, there are some other popular methods of sharing the notebooks directly on the web.

    • GitHub
    • With home to over 2 million notebooks, GitHub is the most popular place for sharing Jupyter projects with the world. GitHub has integrated support for rendering .ipynb files directly both in repositories and gists on its website.
    • You can just follow the GitHub guides for you to get started on your own.
    • Nbviewer
    • NBViewer is one of the most prominent notebook renderers on the web.
    • It also renders your notebook from GitHub and other such code storage platforms and provide a shareable URL along with it. nbviewer.jupyter.org provides a free rendering service as part of Project Jupyter.

    Data Analysis in a Jupyter Notebook

    Now that we’ve looked at what a Jupyter Notebook is, it’s time to look at how they’re used in practice, which should give you a clearer understanding of why they are so popular. As we walk through the sample analysis, you will be able to see how the flow of a notebook makes the task intuitive to work through ourselves, as well as for others to understand when we share it with them. We also hope to learn some of the more advanced features of Jupyter notebooks along the way. So let’s get started, shall we?

    Analyzing the Revenue and Profit Trends of Fortune 500 US companies from 1955-2013

    So, let’s say you’ve been tasked with finding out how the revenues and profits of the largest companies in the US changed historically over the past 60 years. We shall begin by gathering the data to analyze.

    Gathering the DataSet

    The data set that we will be using to analyze the revenue and profit trends of fortune 500 companies has been sourced from Fortune 500 Archives and Top Foreign Stocks. For your ease we have compiled the data from both the sources and created a CSV for you.

    Importing the Required Dependencies

    Let’s start off with a code cell specifically for imports and initial setup, so that if we need to add or change anything at a later point in time, we can simply edit and re-run the cell without having to change the other cells. We can start by importing Pandas to work with our data, Matplotlib to plot the charts and Seaborn to make our charts prettier.

    import pandas as pd
    import matplotlib.pyplot as plt
    import seaborn as sns
    import sys

    Set the design styles for the charts

    sns.set(style="darkgrid")

    Load the Input Data to be Analyzed

    As we plan on using pandas to aid in our analysis, let’s begin by importing our input data set into the most widely used pandas data-structure, DataFrame.

    df = pd.read_csv('../data/fortune500_1955_2013.csv')

    Now that we are done loading our input dataset, let us see how it looks like!

    df.head()

    Looking good. Each row corresponds to a single company per year and all the columns we need are present.

    Exploring the Dataset

    Next, let’s begin by exploring our data set. We will primarily look into the number of records imported and the data types for each of the different columns that were imported.

    As we have 500 data points per year and since the data set has records between 1955 and 2012, the total number of records in the dataset looks good!

    Now, let’s move on to the individual data types for each of the column.

    df.columns = ['year', 'rank', 'company', 'revenue', 'profit']
    len(df)

    df.dtypes

    As we can see from the output of the above command the data types for the columns revenue and profit are being shown as object whereas the expected data type should be float. It indicates that there may be some non-numeric values in the revenue and profit columns.

    So let’s first look at the details of imported values for revenue.

    non_numeric_revenues = df.revenue.str.contains('[^0-9.-]')
    df.loc[non_numeric_revenues].head()

    print("Number of Non-numeric revenue values: ", len(df.loc[non_numeric_revenues]))

    Number of Non-numeric revenue values:	1

    print("List of distinct Non-numeric revenue values: ", set(df.revenue[non_numeric_revenues]))

    List of distinct Non-numeric revenue values:	{'N.A.'}

    As the number of non-numeric revenue values is considerably less compared to the total size of our data set. Hence, it would be easier to just remove those rows.

    df = df.loc[~non_numeric_revenues]
    df.revenue = df.revenue.apply(pd.to_numeric)
    eval(In[6])

    Now that the data type issue for column revenue is resolved, let’s move on to values in column profit.

    non_numeric_profits = df.profit.str.contains('[^0-9.-]')
    df.loc[non_numeric_profits].head()

    print("Number of Non-numeric profit values: ", len(df.loc[non_numeric_profits]))

    Number of Non-numeric profit values:	374

    print("List of distinct Non-numeric profit values: ", set(df.profit[non_numeric_profits]))

    List of distinct Non-numeric profit values:	{'N.A.'}

    As the number of non-numeric profit values is around 1.5% which is a small percentage of our data set, but not completely inconsequential. Let’s take a quick look at the distribution of values and if the rows having N.A. values are uniformly distributed over the years then it would be wise to just remove the rows with missing values.

    bin_sizes, _, _ = plt.hist(df.year[non_numeric_profits], bins=range(1955, 2013))

    As observed from the histogram above, majority of invalid values in single year is fewer than 25, removing these values would account for less than 4% of the data as there are 500 data points per year. Also, other than a surge around 1990, most years have fewer than less than 10 values missing. Let’s assume that this is acceptable for us and move ahead with removing these rows.

    df = df.loc[~non_numeric_profits]
    df.profit = df.profit.apply(pd.to_numeric)

    We should validate if that worked!

    eval(In[6])

    Hurray! Our dataset has been cleaned up.

    Time to Plot the graphs

    Let’s begin with defining a function to plot the graph, set the title and add lables for the x-axis and y-axis.

    # function to plot the graphs for average revenues or profits of the fortune 500 companies against year
    def plot(x, y, ax, title, y_label):
        ax.set_title(title)
        ax.set_ylabel(y_label)
        ax.plot(x, y)
        ax.margins(x=0, y=0)
        
    # function to plot the graphs with superimposed standard deviation    
    def plot_with_std(x, y, stds, ax, title, y_label):
        ax.fill_between(x, y - stds, y + stds, alpha=0.2)
        plot(x, y, ax, title, y_label)

    Let’s plot the average profit by year and average revenue by year using Matplotlib.

    group_by_year = df.loc[:, ['year', 'revenue', 'profit']].groupby('year')
    avgs = group_by_year.mean()
    x = avgs.index
    y = avgs.profit
    
    fig, ax = plt.subplots()
    plot(x, y, ax, 'Increase in mean Fortune 500 company profits from 1955 to 2013', 'Profit (millions)')

    y2 = avgs.revenue
    fig, ax = plt.subplots()
    plot(x, y2, ax, 'Increase in mean Fortune 500 company revenues from 1955 to 2013', 'Revenue (millions)')

    Woah! The charts for profits has got some huge ups and downs. It seems like they correspond to the early 1990s recession, the dot-com bubble in the early 2000s and the Great Recession in 2008.

    On the other hand, the Revenues are constantly growing and are comparatively stable. Also it does help to understand how the average profits recovered so quickly after the staggering drops because of the recession.

    Let’s also take a look at how the average profits and revenues compare to their standard deviations.

    fig, (ax1, ax2) = plt.subplots(ncols=2)
    title = 'Increase in mean and std Fortune 500 company %s from 1955 to 2013'
    stds1 = group_by_year.std().profit.values
    stds2 = group_by_year.std().revenue.values
    plot_with_std(x, y.values, stds1, ax1, title % 'profits', 'Profit (millions)')
    plot_with_std(x, y2.values, stds2, ax2, title % 'revenues', 'Revenue (millions)')
    fig.set_size_inches(14, 4)
    fig.tight_layout()

     

    That’s astonishing, the standard deviations are huge. Some companies are making billions while some others are losing as much, and the risk certainly has increased along with rising profits and revenues over the years. Although we could keep on playing around with our data set and plot plenty more charts to analyze, it is time to bring this article to a close.

    Conclusion

    As part of this article we have seen various features of the Jupyter notebooks, from basics like installation, creating, and running code cells to more advanced features like plotting graphs. The power of Jupyter Notebooks to promote a productive working experience and provide an ease of use is evident from the above example, and I do hope that you feel confident to begin using Jupyter Notebooks in your own work and start exploring more advanced features. You can read more about data analytics using Pandas here.

    If you’d like to further explore and want to look at more examples, Jupyter has put together A Gallery of Interesting Jupyter Notebooks that you may find helpful and the Nbviewer homepage provides a lot of examples for further references. Find the entire code here on Github.

  • Continuous Deployment with Azure Kubernetes Service, Azure Container Registry & Jenkins

    Introduction

    Containerization has taken the application development world by storm. Kubernetes has become the standard way of deploying new containerized distributed applications used by the largest enterprises in a wide range of industries for mission-critical tasks, it has become one of the biggest open-source success stories.

    Although Google Cloud has been providing Kubernetes as a service since November 2014 (Note it started with a beta project), Microsoft with AKS (Azure Kubernetes Service) and Amazon with EKS (Elastic Kubernetes Service)  have jumped on to the scene in the second half of 2017.

    Example:

    AWS had KOPS

    Azure had Azure Container Service.

    However, they were wrapper tools available prior to these services which would help a user create a Kubernetes cluster, but the management and the maintenance (like monitoring and upgrades) needed efforts.

    Azure Container Registry:

    With container demand growing, there is always a need in the market for storing and protecting the container images. Microsoft provides a Geo Replica featured private repository as a service named Azure Container Registry.

    Azure Container Registry is a registry offering from Microsoft for hosting container images privately. It integrates well with orchestrators like Azure Container Service, including Docker Swarm, DC/OS, and the new Azure Kubernetes service. Moreover, ACR  provides capabilities such as Azure Active Directory-based authentication, webhook support, and delete operations.

    The coolest feature provided is Geo-Replication. This will create multiple copies of your image and distribute it across the globe and the container when spawned will have access to the image which is nearest.

    Although Microsoft has good documentation on how to set up ACR  in your Azure Subscription, we did encounter some issues and hence decided to write a blog on the precautions and steps required to configure the Registry in the correct manner.

    Note: We tried this using a free trial account. You can setup it up by referring the following link

    Prerequisites:

    • Make sure you have resource groups created in the supported region.
      Supported Regions: eastus, westeurope, centralus, canada central, canadaeast
    • If you are using Azure CLI for operations please make sure you use the version: 2.0.23 or 2.0.25 (This was the latest version at the time of writing this blog)

    Steps to install Azure CLI 2.0.23 or 2.0.25 (ubuntu 16.04 workstation):

    echo "deb [arch=amd64] https://packages.microsoft.com/repos/azure-cli/ wheezy main" |            
    sudo tee /etc/apt/sources.list.d/azure-cli.list
    sudo apt-key adv --keyserver packages.microsoft.com --recv-keys 52E16F86FEE04B979B07E28DB02C46DF417A0893
    sudo apt-get install apt-transport-httpssudo apt-get update && sudo apt-get install azure-cli
    
    Install a specific version:
    
    sudo apt install azure-cli=2.0.23-1
    sudo apt install azure-cli=2.0.25.1

    Steps for Container Registry Setup:

    • Login to your Azure Account:
    az  login --username --password

    • Create a resource group:
    az group create --name <RESOURCE-GROUP-NAME>  --location eastus
    Example : az group create --name acr-rg  --location eastus

    • Create a Container Registry:
    az acr create --resource-group <RESOURCE-GROUP-NAME> --name <CONTAINER-REGISTRY-NAME> --sku Basic --admin-enabled true
    Example : az acr create --resource-group acr-rg --name testacr --sku Basic --admin-enabled true

    Note: SKU defines the storage available for the registry for type Basic the storage available is 10GB, 1 WebHook and the billing amount is 11 Rs/day.

    For detailed information on the different SKU available visit the following link

    • Login to the registry :
    az acr login --name <CONTAINER-REGISTRY-NAME>
    Example :az acr login --name testacr

    • Sample docker file for a node application :
    FROM node:carbon
    # Create app directory
    WORKDIR /usr/src/app
    COPY package*.json ./
    # RUN npm install
    EXPOSE 8080
    CMD [ "npm", "start" ]

    • Build the docker image :
    docker build -t <image-tag>:<software>
    Example :docker build -t base:node8

    • Get the login server value for your ACR :
    az acr list --resource-group acr-rg --query "[].{acrLoginServer:loginServer}" --output table
    Output  :testacr.azurecr.io

    • Tag the image with the Login Server Value:
      Note: Get the image ID from docker images command

    Example:

    docker tag image-id testacr.azurecr.io/base:node8

    Push the image to the Azure Container Registry:Example:

    docker push testacr.azurecr.io/base:node8

    Microsoft does provide a GUI option to create the ACR.

    • List Images in the Registry:

    Example:

    az acr repository list --name testacr --output table

    • List tags for the Images:

    Example:

    az acr repository show-tags --name testacr --repository <name> --output table

    • How to use the ACR image in Kubernetes deployment: Use the login Server Name + the image name

    Example :

    containers:- 
    name: demo
    image: testacr.azurecr.io/base:node8

    Azure Kubernetes Service

    Microsoft released the public preview of Managed Kubernetes for Azure Container Service (AKS) on October 24, 2017. This service simplifies the deployment, management, and operations of Kubernetes. It features an Azure-hosted control plane, automated upgrades, self-healing, easy scaling.

    Similarly to Google AKE and Amazon EKS, this new service will allow access to the nodes only and the master will be managed by Cloud Provider. For more information visit the following link.

    Let’s now get our hands dirty and deploy an AKS infrastructure to play with:

    • Enable AKS preview for your Azure Subscription: At the time of writing this blog, AKS is in preview mode, it requires a feature flag on your subscription.
    az provider register -n Microsoft.ContainerService

    • Kubernetes Cluster Creation Command: Note: A new separate resource group should be created for the Kubernetes service.Since the service is in preview, it is available only to certain regions.

    Make sure you create a resource group under the following regions.

    eastus, westeurope, centralus, canadacentral, canadaeast
    az  group create  --name  <RESOURCE-GROUP>   --location eastus
    Example : az group create --name aks-rg --location eastus
    az aks create --resource-group <RESOURCE-GROUP-NAME> --name <CLUSTER-NAME>   --node-count 2 --generate-ssh-keys
    Example : az aks create --resource-group aks-rg --name akscluster  --node-count 2 --generate-ssh-keys

    Example with different arguments :

    Create a Kubernetes cluster with a specific version.

    az aks create -g MyResourceGroup -n MyManagedCluster --kubernetes-version 1.8.1

    Create a Kubernetes cluster with a larger node pool.

    az aks create -g MyResourceGroup -n MyManagedCluster --node-count 7

    Install the Kubectl CLI :

    To connect to the kubernetes cluster from the client computer Kubectl command line client is required.

    sudo az aks install-cli

    Note: If you’re using Azure CloudShell, kubectl is already installed. If you want to install it locally, run the above  command:

    • To configure kubectl to connect to your Kubernetes cluster :
    az aks get-credentials --resource-group=<RESOURCE-GROUP-NAME> --name=<CLUSTER-NAME>

    Example :

    CODE: <a href="https://gist.github.com/velotiotech/ac40b6014a435271f49ca0e3779e800f">https://gist.github.com/velotiotech/ac40b6014a435271f49ca0e3779e800f</a>.js

    • Verify the connection to the cluster :
    kubectl get nodes -o wide 

    • For all the command line features available for Azure check the link: https://docs.microsoft.com/en-us/cli/azure/aks?view=azure-cli-latest

    We had encountered a few issues while setting up the AKS cluster at the time of writing this blog. Listing them along with the workaround/fix:

    az aks create --resource-group aks-rg --name akscluster  --node-count 2 --generate-ssh-keys

    Error: Operation failed with status: ‘Bad Request’.

    Details: Resource provider registrations Microsoft.Compute, Microsoft.Storage, Microsoft.Network are needed we need to enable them.

    Fix: If you are using the trial account, click on subscriptions and check whether the following providers are registered or not :

    • Microsoft.Compute
    • Microsoft.Storage
    • Microsoft.Network
    • Microsoft.ContainerRegistry
    • Microsoft.ContainerService

    Error: We had encountered the following mentioned open issues at the time of writing this blog.

    1. Issue-1
    2. Issue-2
    3. Issue-3

    Jenkins setup for CI/CD with ACR, AKS

    Microsoft provides a solution template which will install the latest stable Jenkins version on a Linux (Ubuntu 14.04 LTS) VM along with tools and plugins configured to work with Azure. This includes:

    • git for source control
    • Azure Credentials plugin for connecting securely
    • Azure VM Agents plugin for elastic build, test and continuous integration
    • Azure Storage plugin for storing artifacts
    • Azure CLI to deploy apps using scripts

    Refer the below link to bring up the Instance

    Pipeline plan for Spinning up a Nodejs Application using ACR – AKS – Jenkins

    What the pipeline accomplishes :

    Stage 1:

    The code gets pushed in the Github. The Jenkins job gets triggered automatically. The Dockerfile is checked out from Github.

    Stage 2:

    Docker builds an image from the Dockerfile and then the image is tagged with the build number.Additionally, the latest tag is also attached to the image for the containers to use.

    Stage 3:

    We have default deployment and service YAML files stored on the Jenkins server. Jenkins makes a copy of the default YAML files, make the necessary changes according to the build and put them in a separate folder.

    Stage 4:

    kubectl was initially configured at the time of setting up AKS on the Jenkins server. The YAML files are fed to the kubectl util which in turn creates pods and services.

    Sample Jenkins pipeline code :

    node {      
      // Mark the code checkout 'stage'....        
        stage('Checkout the dockefile from GitHub') {            
          git branch: 'docker-file', credentialsId: 'git_credentials', url: 'https://gitlab.com/demo.git'        
        }        
        // Build and Deploy to ACR 'stage'...        
        stage('Build the Image and Push to Azure Container Registry') {                
          app = docker.build('testacr.azurecr.io/demo')                
          withDockerRegistry([credentialsId: 'acr_credentials', url: 'https://testacr.azurecr.io']) {                
          app.push("${env.BUILD_NUMBER}")                
          app.push('latest')                
          }        
         }        
         stage('Build the Kubernetes YAML Files for New App') {
    <The code here will differ depending on the YAMLs used for the application>        
      }        
      stage('Delpoying the App on Azure Kubernetes Service') {            
        app = docker.image('testacr.azurecr.io/demo:latest')            
        withDockerRegistry([credentialsId: 'acr_credentials', url: 'https://testacr.azurecr.io']) {            
        app.pull()            
        sh "kubectl create -f ."            
        }       
       }    
    }

    What we achieved:

    • We managed to create a private Docker registry on Azure using the ACR feature using az-cli 2.0.25.
    • Secondly, we were able to spin up a private Kubernetes cluster on Azure with 2 nodes.
    • Setup Up Jenkins using a pre-cooked template which had all the plugins necessary for communication with ACR and AKS.
    • Orchestrate  a Continuous Deployment pipeline in Jenkins which uses docker features.