Category: Services

  • An Introduction To Cloudflare Workers And Cloudflare KV store

    Cloudflare Workers

    This post gives a brief introduction to Cloudflare Workers and Cloudflare KV store. They address a fairly common set of problems around scaling an application globally. There are standard ways of doing this but they usually require a considerable amount of upfront engineering work and developers have to be aware of the ‘scalability’ issues to some degree. Serverless application tools target easy scalability and quick response times around the globe while keeping the developers focused on the application logic rather than infra nitty-gritties.

    Global responsiveness

    When an application is expected to be accessed around the globe, requests from users sitting in different time-zones should take a similar amount of time. There can be multiple ways of achieving this depending upon how data intensive the requests are and what those requests actually do.

    Data intensive requests are harder and more expensive to globalize, but again not all the requests are same. On the other hand, static requests like getting a documentation page or a blog post can be globalized by generating markup at build time and deploying them on a CDN.

    And there are semi-dynamic requests. They render static content either with some small amount of data or their content change based on the timezone the request came from.

    The above is a loose classification of requests but there are exceptions, for example, not all the static requests are presentational.

    Serverless frameworks are particularly useful in scaling static and semi-static requests.

    Cloudflare Workers Overview

    Cloudflare worker is essentially a function deployment service. They provide a serverless execution environment which can be used to develop and deploy small(although not necessarily) and modular cloud functions with minimal effort.

    It is very trivial to start with workers. First, lets install wrangler, a tool for managing Cloudfare Worker projects.

    npm i @cloudflare/wrangler -g

    Wrangler handles all the standard stuff for you like project generation from templates, build, config, publishing among other things.

    A worker primarily contains 2 parts: an event listener that invokes a worker and an event handler that returns a response object. Creating a worker is as easy as adding an event listener to a button.

    addEventListener('fetch', event => {
        event.respondWith(handleRequest(event.request))
    })
    
    async function handleRequest(request) {
        return new Response("hello world")
    }

    Above is a simple hello world example. Wrangler can be used to build and get a live preview of your worker.

    wrangler build

    will build your worker. And 

    wrangler preview 

    can be used to take a live preview on the browser. The preview is only meant to be used for testing(either by you or others). If you want the workers to be triggered by your own domain or a workers.dev subdomain, you need to publish it.

    Publishing is fairly straightforward and requires very less configuration on both wrangler and your project.

    Wrangler Configuration

    Just create an account on Cloudflare and get API key. To configure wrangler, just do:

    wrangler config

    It will ask for the registered email and API key, and you are good to go.

    To publish your worker on a workers.dev subdomain, just fill your account ID in the wrangler.toml and hit wrangler publish. The worker will be deployed and live at a generated workers.dev subdomain.

    Regarding Routes

    When you publish on a {script-name}.{subdomain}.workers.dev domain, the script or project associated with script-name will be invoked. There is no way to call a script just from {subdomain}.workers.dev.

    Worker KV

    Workers alone can’t be used to make anything complex without any persistent storage, that’s where Workers KV comes into the picture. Workers KV as it sounds, is a low-latency, high-volume, key-value store that is designed for efficient reads.

    It optimizes the read latency by dynamically spreading the most frequently read entries to the edges(replicated in several regions) and storing less frequent entries centrally.

    Newly added keys(or a CREATE) are immediately reflected in every region while a value change in the keys(or an UPDATE) may take as long as 60 seconds to propagate, depending upon the region.

    Workers KV is only available to paid users of Cloudflare.

    Writing Data in Workers KV

    curl "https://api.cloudflare.com/client/v4/accounts/$ACCOUNT_ID/storage/kv/namespaces" 
    -X POST 
    -H "X-Auth-Email: $CLOUDFLARE_EMAIL" 
    -H "X-Auth-Key: $CLOUDFLARE_AUTH_KEY" 
    -H "Content-Type: application/json" 
    --data '{"title": "Requests"}'
    The above HTTP request will create a namespace by the name Requests. The response should look something like this:
    {
        "result": {
            "id": "30b52f55aafb41d88546d01d5f69440a",
            "title": "Requests",
            "supports_url_encoding": true
        },
        "success": true,
        "errors": [],
        "messages": []
    }

    Now we can write KV pairs in this namespace. The following HTTP requests will do the same:

    curl "https://api.cloudflare.com/client/v4/accounts/$ACCOUNT_ID/storage/kv/namespaces/$NAMESPACE_ID/values/first-key" 
    -X PUT 
    -H "X-Auth-Email: $CLOUDFLARE_EMAIL" 
    -H "X-Auth-Key: $CLOUDFLARE_AUTH_KEY" 
    --data 'My first value!'

    Here the NAMESPACE_ID is the same ID that we received in the last request. First-key is the key name and the My first value is the value.

    Let’s complicate things a little

    Above overview just introduces the managed cloud workers with a ‘hello world’ app and basics of the Workers KV, but now let’s make something more complicated. We will make an app which will tell how many requests have been made from your country till now. For example, if you pinged the worker from the US then it will return number of requests made so far from the US.

    We will need: 

    • Some place to store the count of requests for each country. 
    • Find from which country the Worker was invoked.

    For the first part, we will use the Workers KV to store the count for every request.

    Let’s start

    First, we will create a new project using wrangler: wrangler generate request-count.

    We will be making HTTP calls to write values in the Workers KV, so let’s add ‘node-fetch’ to the project:

    npm install node-fetch

    Now, how do we find from which country each request is coming from? The answer is the cf object that is provided with each request to a worker.

    The cf object is a special object that is passed with each request and can be accessed with request.cf. This mainly contains region specific information along with TLS and Auth information. The details of what is provided in the cf, can be found here.

    As we can see from the documentation, we can get country from

    request.cf.country.

    The cf object is not correctly populated in the wrangler preview, you will need to publish your worker in order to test cf’s usage. An open issue mentioning the same can be found here.

    Now, the logic is pretty straightforward here. When we get a request from a country for which we don’t have an entry in the Worker’s KV, we make an entry with value 1, else we increment the value of the country key.

    To use Workers KV, we need to create a namespace. A namespace is just a collection of key-value pairs where all the keys have to be unique.

    A namespace can be created under the KV tab in Cloudflare web UI by giving the name or using the API call above. You can also view/browse all of your namespaces from the web UI. Following API call can be used to read the value of a key from a namespace:

    curl "https://api.cloudflare.com/client/v4/accounts/$ACCOUNT_ID/storage/kv/namespaces/$NAMESPACE_ID/values/first-key" 
    -H "X-Auth-Email: $CLOUDFLARE_EMAIL" 
    -H "X-Auth-Key: $CLOUDFLARE_AUTH_KEY" 

    But, it is neither the fastest nor the easiest way. Cloudflare provides a better and faster way to read data from your namespaces. It’s called binding. Each KV namespace can be bound to a worker script so to make it available in the script by the variable name. Any namespace can be bound with any worker. A KV namespace can be bound to a worker by going to the editing menu of a worker from the Cloudflare UI. 

    Following steps show you how to bind a namespace to a worker:

    Go to the edit page of the worker in Cloudflare web UI and click on the KV tab:

    Then add a binding by clicking the ‘Add binding’ button.

    You can select the namespace name and the variable name by which it will be bound. More details can be found here. A binding that I’ve made can be seen in the above image.

    That’s all we need to get this to work. Following is the relevant part of the script:

    const fetch = require('node-fetch')
    
    addEventListener('fetch', event => {
    event.respondWith(handleRequest(event.request))
    })
    
    /**
    * Fetch and log a request
    * @param {Request} request
    */
    async function handleRequest(request) {
        const country = request.cf.country
    
        const url = `https://api.cloudflare.com/client/v4/accounts/account-id/storage/kv/namespaces/namespace-id/values/${country}`
    
        let count = await requests.get(country)
    
        if (!count) {
            count = 1
        } else {
            count = parseInt(count) + 1
        }
    
        try {
            response = await fetch(url, {
            method: 'PUT',
            headers: {"X-Auth-Email": "email", "X-Auth-Key": "auth-key"},
            body: `${count}`
            })
        } catch (error) {
            return new Response(error, { status: 500 })
        }
    
        return new Response(`${country}: ${count}`, { status: 200 }) 
    }

    In the above code, I bound the Requests namespace that we created by the requests variable that would be dynamically resolved when we publish.

    The full source of this can be found here.

    This small application also demonstrates some of the practical aspects of the workers. For example, you would notice that the updates take some time to get reflected and response time of the workers is quick, especially when they are deployed on a .workers.dev subdomain here.

    Side note: You will have to recreate the namespace-worker binding everytime you deploy the worker or you do wrangler publish.

    Workers vs. AWS Lambda

    AWS Lambda has been a major player in the serverless market for a while now. So, how is Cloudflare Workers as compared to it? Let’s see.

    Architecture:

    Cloudflare Workers `Isolates` instead of a container based underlying architecture. `Isolates` is the technology that allows V8(Google Chrome’s JavaScript Engine) to run thousands of processes on a single server in an efficient and secure manner. This effectively translates into faster code execution and lowers memory usage. More details can be found here.

    Price:

    The above mentioned architectural difference allows Workers to be significantly cheaper than Lambda. While a Worker offering 50 milliseconds of CPU costs $0.50 per million requests, the equivalent Lambda costs $1.84 per million. A more detailed price comparison can be found here.

    Speed:

    Workers also show significantly better performance numbers than Lambda and Lambda@Edge. Tests run by Cloudflare claim that they are 441% faster than Lambda and 192% faster than Lambda@Edge. A detailed performance comparison can be found here.

    This better performance is also confirmed by serverless-benchmark.

    Wrapping Up:

    As we have seen, Cloudflare Workers along with the KV Store does make it very easy to start with a serverless application. They provide fantastic performance while using less cost along with intuitive deployment. These properties make them ideal for making globally accessible serverless applications.

  • Explanatory vs. Predictive Models in Machine Learning

    My vision on Data Analysis is that there is continuum between explanatory models on one side and predictive models on the other side. The decisions you make during the modeling process depend on your goal. Let’s take Customer Churn as an example, you can ask yourself why are customers leaving? Or you can ask yourself which customers are leaving? The first question has as its primary goal to explain churn, while the second question has as its primary goal to predict churn. These are two fundamentally different questions and this has implications for the decisions you take along the way. The predictive side of Data Analysis is closely related to terms like Data Mining and Machine Learning.

    SPSS & SAS

    When we’re looking at SPSS and SAS, both of these languages originate from the explanatory side of Data Analysis. They are developed in an academic environment, where hypotheses testing plays a major role. This makes that they have significant less methods and techniques in comparison to R and Python. Nowadays, SAS and SPSS both have data mining tools (SAS Enterprise Miner and SPSS Modeler), however these are different tools and you’ll need extra licenses.

    I have spent some time to build extensive macros in SAS EG to seamlessly create predictive models, which also does a decent job at explaining the feature importance. While a Neural Network may do a fair job at making predictions, it is extremely difficult to explain such models, let alone feature importance. The macros that I have built in SAS EG does precisely the job of explaining the features, apart from producing excellent predictions.

    Open source TOOLS: R & PYTHON

    One of the major advantages of open source tools is that the community continuously improves and increases functionality. R was created by academics, who wanted their algorithms to spread as easily as possible. R has the widest range of algorithms, which makes R strong on the explanatory side and on the predictive side of Data Analysis.

    Python is developed with a strong focus on (business) applications, not from an academic or statistical standpoint. This makes Python very powerful when algorithms are directly used in applications. Hence, we see that the statistical capabilities are primarily focused on the predictive side. Python is mostly used in Data Mining or Machine Learning applications where a data analyst doesn’t need to intervene. Python is therefore also strong in analyzing images and videos. Python is also the easiest language to use when using Big Data Frameworks like Spark. With the plethora of packages and ever improving functionality, Python is a very accessible tool for data scientists.

    MACHINE LEARNING MODELS

    While procedures like Logistic Regression are very good at explaining the features used in a prediction, some others like Neural Networks are not. The latter procedures may be preferred over the former when it comes to only prediction accuracy and not explaining the models. Interpreting or explaining the model becomes an issue for Neural Networks. You can’t just peek inside a deep neural network to figure out how it works. A network’s reasoning is embedded in the behavior of numerous simulated neurons, arranged into dozens or even hundreds of interconnected layers. In most cases the Product Marketing Officer may be interested in knowing what are the factors that are most important for a specific advertising project. What can they concentrate on to get the response rates higher, rather than, what will be their response rate, or revenues in the upcoming year. These questions are better answered by procedures which can be interpreted in an easier way. This is a great article about the technical and ethical consequences of the lack of explanations provided by complex AI models.

    Procedures like Decision Trees are very good at explaining and visualizing what exactly are the decision points (features and their metrics). However, those do not produce the best models. Random Forests, Boosting are the procedures which use Decision Trees as the basic starting point to build the predictive models, which are by far some of the best methods to build sophisticated prediction models.

    While Random Forests use fully grown (highly complex) Trees, and by taking random samples from the training set (a process called Bootstrapping), then each split uses only a proper subset of features from the entire feature set to actually make the split, rather than using all of the features. This process of bootstrapping helps with lower number of training data (in many cases there is no choice to get more data). The (proper) subsetting of the features has a tremendous effect on de-correlating the Trees grown in the Forest (hence randomizing it), leading to a drop in Test Set error. A fresh subset of features is chosen at each step of splitting, making the method robust. The strategy also stops the strongest feature from appearing each time a split is considered, making all the trees in the forest similar. The final result is obtained by averaging the result over all trees (in case of Regression problems), or by taking a majority class vote (in case of classification problem).

    On the other hand, Boosting is a method where a Forest is grown using Trees which are NOT fully grown, or in other words, with Weak Learners. One has to specify the number of trees to be grown, and the initial weights of those trees for taking a majority vote for class selection. The default weight, if not specified is the average of the number of trees requested. At each iteration, the method fits these weak learners, finds the residuals. Then the weights of those trees which failed to predict the correct class is increased so that those trees can concentrate better on the failed examples. This way, the method proceeds by improving the accuracy of the Boosted Trees, stopping when the improvement is below a threshold. One particularly implementation of Boosting, AdaBoost has very good accuracy over other implementations. AdaBoost uses Trees of depth 1, known as Decision Stump as each member of the Forest. These are slightly better than random guessing to start with, but over time they learn the pattern and perform extremely well on test set. This method is more like a feedback control mechanism (where the system learns from the errors). To address overfitting, one can use the hyper-parameter Learning Rate (lambda) by choosing values in the range: (0,1]. Very small values of lambda will take more time to converge, however larger values may have difficulty converging. This can be achieved by a iterative process to select the correct value for lambda, plotting the test error rate against values of lambda. The value of lambda with the lowest test error should be chosen.

    In all these methods, as we move from Logistic Regression, to Decision Trees to Random Forests and Boosting, the complexity of the models increase, making it almost impossible to EXPLAIN the Boosting model to marketers/product managers. Decision Trees are easy to visualize, Logisitic Regression results can be used to demonstrate the most important factors in a customer acquisition model and hence will be well received by business leaders. On the other hand, the Random Forest and Boosting methods are extremely good predictors, without much scope for explaining. But there is hope: These models have functions for revealing the most important variables, although it is not possible to visualize why. 

    USING A BALANCED APPROACH

    So I use a mixed strategy: Use the previous methods as a step in Exploratory Data Analysis, present the importance of features, characteristics of the data to the business leaders in phase one, and then use the more complicated models to build the prediction models for deployment, after building competing models. That way, one not only gets to understand what is happening and why, but also gets the best predictive power. In most cases that I have worked, I have rarely seen a mismatch between the explanation and the predictions using different methods. After all, this is all math and the way of delivery should not change end results. Now that’s a happy ending for all sides of the business!

  • Installing Redis Service in DC/OS With Persistent Storage Using AWS Volumes

    Redis is an open source (BSD licensed), in-memory data structure store, used as a database, cache, and message broker.

    It supports various data structures such as Strings, Hashes, Lists, Sets etc. DCOS offers Redis as a service

    Why Do We Use External Persistent Storage for Redis Mesos Containers?

    Since Redis is an in-memory database, an instance/service restart will result in loss of data. To counter this, it is always advisable to snapshot the Redis in-memory database from time to time.

    This helps Redis instance to recover from the point in time failure.

    In DCOS, Redis is deployed as a stateless service. To make it a stateful and persistent data, we can configure local volumes or external volumes.

    The disadvantage of having a local volume mapped to Mesos containers is when a slave node goes down, your local volume becomes unavailable, and the data loss occurs.

    However, with external persistent volumes, as they are available on each node of the DCOS cluster, a slave node failure does not impact the data availability.

    Rex-Ray

    REX-Ray is an open source, storage management solution designed to support container runtimes such as Docker and Mesos.

    REX-Ray enables stateful applications such as databases to persist and maintain its data after the life cycle of the container has ended. Built-in high availability enables orchestrators such as Docker Swarm, Kubernetes, and Mesos Frameworks like Marathon to automatically orchestrate storage tasks between hosts in a cluster.

    Built on top of the libStorage framework, REX-Ray’s simplified architecture consists of a single binary and runs as a stateless service on every host using a configuration file to orchestrate multiple storage platforms.

    Objective: To create a Redis service in DC/OS environment with persistent storage.

    Warning: The Persistent Volume feature is still in beta Phase for DC/OS Version 1.11.

    Prerequisites:

    • Make sure the rexray service is running and is in a healthy state for the cluster.

    Steps:

    • Click on the Add button in Services component of DC/OS GUI.
    • Click on JSON Configuration.  

    Note: For persistent storage, below code should be added in the normal Redis service configuration JSON file to mount external persistent volumes.

    "volumes": [
          {
            "containerPath": "/data",
            "mode": "RW",
            "external": {
              "name": "redis4volume",
              "provider": "dvdi",
              "options": {
                "dvdi/driver": "rexray"
              }
            }
          }
        ],

    • Make sure the service is up and in a running state:

    If you look closely, the service was suspended and respawned on a different slave node. We populated the database with dummy data and saved the snapshot in the data directory.

    When the service did come upon a different node 10.0.3.204, the data persisted and the volume was visible on the new node.

    core@ip-10-0-3-204 ~ $ /opt/mesosphere/bin/rexray volume list
    
    - name: datavolume
      volumeid: vol-00aacade602cf960c
      availabilityzone: us-east-1a
      status: in-use
      volumetype: standard
      iops: 0
      size: "16"
      networkname: ""
      attachments:
      - volumeid: vol-00aacade602cf960c
        instanceid: i-0d7cad91b62ec9a64
        devicename: /dev/xvdb
    

    •  Check the volume tab :

    Note: For external volumes, the status will be unavailable. This is an issue with DC/OS.

    The Entire Service JSON file:

    {
      "id": "/redis4.0-new-failover-test",
      "instances": 1,
      "cpus": 1.001,
      "mem": 2,
      "disk": 0,
      "gpus": 0,
      "backoffSeconds": 1,
      "backoffFactor": 1.15,
      "maxLaunchDelaySeconds": 3600,
      "container": {
        "type": "DOCKER",
        "volumes": [
          {
            "containerPath": "/data",
            "mode": "RW",
            "external": {
              "name": "redis4volume",
              "provider": "dvdi",
              "options": {
                "dvdi/driver": "rexray"
              }
            }
          }
        ],
        "docker": {
          "image": "redis:4",
          "network": "BRIDGE",
          "portMappings": [
            {
              "containerPort": 6379,
              "hostPort": 0,
              "servicePort": 10101,
              "protocol": "tcp",
              "name": "default",
              "labels": {
                "VIP_0": "/redis4.0:6379"
              }
            }
          ],
          "privileged": false,
          "forcePullImage": false
        }
      },
      "healthChecks": [
        {
          "gracePeriodSeconds": 60,
          "intervalSeconds": 5,
          "timeoutSeconds": 5,
          "maxConsecutiveFailures": 3,
          "portIndex": 0,
          "protocol": "TCP"
        }
      ],
      "upgradeStrategy": {
        "minimumHealthCapacity": 0.5,
        "maximumOverCapacity": 0
      },
      "unreachableStrategy": {
        "inactiveAfterSeconds": 300,
        "expungeAfterSeconds": 600
      },
      "killSelection": "YOUNGEST_FIRST",
      "requirePorts": true
    }

    Redis entrypoint

    To connect with Redis service, use below host:port in your applications:

    redis.marathon.l4lb.thisdcos.directory:6379

    Conclusion

    We learned about Standalone Redis Service deployment from DCOS catalog on DCOS.  Also, we saw how to add Persistent storage to it using RexRay. We also learned how RexRay automatically manages volumes over AWS ebs and how to integrate them in DCOS apps/services.  Finally, we saw how other applications can communicate with this Redis service.

    References

  • Build ML Pipelines at Scale with Kubeflow

    Setting up a ML stack requires lots of tools, analyzing data, and training a model in the ML pipeline. But it is even harder to set up the same stack in multi-cloud environments. This is when Kubeflow comes into the picture and makes it easy to develop, deploy, and manage ML pipelines.

    In this article, we are going to learn how to install Kubeflow on Kubernetes (GKE), train a ML model on Kubernetes and publish the results. This introductory guide will be helpful for anyone who wants to understand how to use Kubernetes to run a ML pipeline in a simple, portable and scalable way.

    Kubeflow Installation on GKE

    You can install Kubeflow onto any Kubernetes cluster no matter which cloud it is, but the cluster needs to fulfill the following minimum requirements:

    • 4 CPU
    • 50 GB storage
    • 12 GB memory

    The recommended Kubernetes version is 1.14 and above.

    You need to download kfctl from the Kubeflow website and untar the file:
    tar -xvf kfctl_v1.0.2_<platform>.tar.gz -C /home/velotio/kubeflow</platform>

    Also, install kustomize using these instructions.

    Start by exporting the following environment variables:

    export PATH=$PATH:/home/velotio/kubeflow/
    export KF_NAME=kubeml
    export BASE_DIR=/home/velotio/kubeflow/
    export KF_DIR=${BASE_DIR}/${KF_NAME}
    export CONFIG_URI="https://raw.githubusercontent.com/kubeflow/manifests/v1.0-branch/kfdef/kfctl_k8s_istio.v1.0.2.yaml"

    After we’ve exported these variables, we can build the kubebuilder and customize everything according to our needs. Run the following command:

    cd ${KF_DIR}
    kfctl build -V -f ${CONFIG_URI}

    This will download the file kfctl_k8s_istio.v1.0.2.yaml and a kustomize folder. If you want to expose the UI with LoadBalancer, change the file $KF_DIR/kustomize/istio-install/base/istio-noauth.yaml and edit the service istio-ingressgateway from NodePort to LoadBalancer.

    Now, you can install KubeFlow using the following commands:

    export CONFIG_FILE=${KF_DIR}/kfctl_k8s_istio.v1.0.2.yaml
    kfctl apply -V -f ${CONFIG_FILE}

    This will install a bunch of services that are required to run the ML workflows.

    Once successfully deployed, you can access the Kubeflow UI dashboard on the istio-ingressgateway service. You can find the IP using following command:

    kubectl get svc istio-ingressgateway -n istio-system -o jsonpath={.status.loadBalancer.ingress[0].ip}

    ML Workflow

    Developing your ML application consists of several stages:

    1. Gathering data and data analysis
    2. Researching the model for the type of data collected
    3. Training and testing the model
    4. Tuning the model
    5. Deploy the model

    These are multi-stage models for any ML problem you’re trying to solve, but where does Kubeflow fit in this model?

    Kubeflow provides its own pipelines to solve this problem. The Kubeflow pipeline consists of the ML workflow description, the different stages of the workflow, and how they combine in the form of graph. 

    Kubeflow provides an ability to run your ML pipeline on any hardware be it your laptop, cloud or multi-cloud environment. Wherever you can run Kubernetes, you can run your ML pipeline.

    Training your ML Model on Kubeflow

    Once you’ve deployed Kubeflow in the first step, you should be able to access the Kubeflow UI, which would look like:

    The first step is to upload your pipeline. However, to do that, you need to prepare your pipeline in the first place. We are going to use a financial series database and train our model. You can find the example code here:

    git clone https://github.com/kubeflow/examples.git
    cd examples/financial_time_series/
    export TRAIN_PATH=gcr.io/<project>/<image-name>/cpu:v1
    gcloud builds submit --tag $TRAIN_PATH .

    This command above will build the docker images, and we will create the bucket to store our data and model artifacts. 

    # create storage bucket that will be used to store models
    BUCKET_NAME=<your-bucket-name>
    gsutil mb gs://$BUCKET_NAME/

    Once we have our image ready on the GCR repo, we can start our training job on Kubernetes. Please have a look at the tfjob resource in CPU/tfjob1.yaml and update the image and bucket reference.

    kubectl apply -f CPU/tfjob1.yaml
    POD_NAME=$(kubectl get pods -n kubeflow --selector=tf-job-name=tfjob-flat 
          --template '{{range .items}}{{.metadata.name}}{{"n"}}{{end}}')
    kubectl logs -f $POD_NAME -n kubeflow

    Kubeflow Pipelines needs our pipeline file into a domain-specific-language. We can compile our python3 file with a tool called dsl-compile that comes with the Python3 SDK, which compile our pipeline into DSL.  So, first, install that SDK:

    pip3 install python-dateutil kfp==0.1.36

    Next, inspect the ml_pipline.py and update the ml_pipeline.py with the CPU image path that you built in the previous steps. Then, compile the DSL, using:

    python3 ml_pipeline.py

    Now, a file ml_pipeline.py.tar_gz is generated, which we can upload to the Kubeflow pipelines UI.

    Once the pipeline is uploaded, you can see the stages in a graph-like format.

    Next, we can click on the pipeline and create a run. For each run, you need to specify the params that you want to use. When the pipeline is running, you can inspect the logs:

    Run Jupyter Notebook in your ML Pipeline

    You can also interactively define your pipeline from the Jupyter notebook:

    1. Navigate to the Notebook Servers through the Kubeflow UI

    2. Select the namespace and click on “new server.”

    3. Give the server a name and provide the docker image for the TensorFlow on which you want to train your model. I took the TensorFlow 1.15 image.

    4. Once a notebook server is available, click on “connect” to connect to the server.

    5. This will open up a new window and a Jupyter terminal.

    6. Input the following command: pip install -U kfp.

    7. Download the notebook using following command: 

    curl -O https://raw.githubusercontent.com/kubeflow/examples/master/github_issue_summarization/pipelines/example_pipelines/pipelines-notebook.ipynb

    8. Now that you have notebook, you can replace the environment variables like WORKING_DIR, PROJECT_NAME and GITHUB_TOKEN. Once you do that, you can run the notebook step-by-step (one cell at a time) by pressing shift+enter, or you can run the whole notebook by clicking on menu and run all options.

    Conclusion

    The ML world has its own challenges; the environments are tightly coupled and the tools you needed to deploy to build an ML stack was extremely hard to set up and configure. This becomes harder in production environments because you have to be extremely cautious you are not breaking the components that are already present.

    Kubeflow makes getting started on ML highly accessible. You can run your ML workflows anywhere you can run Kubernetes. Kubeflow made it possible to run your ML stack on multi cloud environments, which enables ML engineers to easily train their models at scale with the scalability of Kubernetes.

    Related Articles

    1. The Ultimate Beginner’s Guide to Jupyter Notebooks
    2. Demystifying High Availability in Kubernetes Using Kubeadm
  • Real Time Text Classification Using Kafka and Scikit-learn

    Introduction:

    Text classification is one of the essential tasks in supervised machine learning (ML). Assigning categories to text, which can be tweets, Facebook posts, web page, library book, media articles, gallery, etc. has many applications like spam filtering, sentiment analysis, etc. In this blog, we build a text classification engine to classify topics in an incoming Twitter stream using Apache Kafka and scikit-learn – a Python based Machine Learning Library.

    Let’s dive into the details. Here is a diagram to explain visually the components and data flow. The Kafka producer will ingest data from Twitter and send it to Kafka broker. The Kafka consumer will ask the Kafka broker for the tweets. We convert the tweets binary stream from Kafka to human readable strings and perform predictions using saved models. We train the models using Twenty Newsgroups which is a prebuilt training data from Sci-kit. It is a standard data set used for training classification algorithms. 

    In this blog we will use the following machine learning models:

    We have used the following libraries/tools:

    • tweepy – Twitter library for python
    • Apache Kafka
    • scikit-learn
    • pickle – Python Object serialization library

    Let’s first understand the following key concepts:

    • Word to Vector Methodology (Word2Vec)
    • Bag-of-Words
    • tf-idf
    • Multinomial Naive Bayes classifier

    Word2Vec methodology

    One of the key ideas in Natural Language Processing(NLP) is how we can efficiently convert words into numeric vectors which can then be given as an input to machine learning models to perform predictions.

    Neural networks or any other machine learning models are nothing but mathematical functions which need numbers or vectors to churn out the output except tree based methods, they can work on words.

    For this we have an approach known as Word2Vec. A very trivial solution to this would be to use “one-hot” method of converting the word into a sparse matrix with only one element of the vector set to 1, the rest being zero.

    For example, “the apple a day the good” would have following representation

    Here we have transformed the above sentence into a 6×5 matrix, with the 5 being the size of the vocabulary as “the” is repeated. But what are we supposed to do when we have a gigantic dictionary to learn from say more than 100000 words? Here one hot encoding fails. In one hot encoding the relationship between the words is lost. Like “Lanka” should come after “Sri”.

    Here is where Word2Vec comes in. Our goal is to vectorize the words while maintaining the context. Word2vec can utilize either of two model architectures to produce a distributed representation of words: continuous bag-of-words (CBOW) or continuous skip-gram. In the continuous bag-of-words architecture, the model predicts the current word from a window of surrounding context words. The order of context words does not influence prediction (bag-of-words assumption). In the continuous skip-gram architecture, the model uses the current word to predict the surrounding window of context words. 

    Tf-idf (term frequency–inverse document frequency)

    TF-IDF is a statistic which determines how important is a word to the document in given corpus. Variations of tf-idf is used by search engines, for text summarizations etc. You can read more about tf-idf – here.

    Multinomial Naive Bayes classifier

    Naive Bayes Classifier comes from family of probabilistic classifiers based on Bayes theorem. We use it to classify spam or not spam, sports or politics etc. We are going to use this for classifying streams of tweets coming in. You can explore it – here.

    Lets how they fit in together.

    The data from the “20 newsgroups datasets” is completely in text format. We cannot feed it directly to any model to do mathematical calculations. We have to extract features from the datasets and have to convert them to numbers which a model can ingest and then produce an output.
    So, we use Continuous Bag of Words and tf-idf for extracting features from datasets and then ingest them to multinomial naive bayes classifier to get predictions.

    1. Train Your Model

    We are going to use this dataset. We create another file and import the needed libraries We are using sklearn for ML and pickle to save trained model. Now we define the model.

    from __future__ import division,print_function, absolute_import
    from sklearn.datasets import fetch_20newsgroups #built-in dataset
    from sklearn.feature_extraction.text import CountVectorizer
    from sklearn.feature_extraction.text import TfidfTransformer
    from sklearn.naive_bayes import MultinomialNB
    import pickle
    from kafka import KafkaConsumer
    
    #Defining model and training it
    categories = ["talk.politics.misc","misc.forsale","rec.motorcycles",
    "comp.sys.mac.hardware","sci.med","talk.religion.misc"] #http://qwone.com/~jason/20Newsgroups/ for reference
    
    def fetch_train_dataset(categories):
    twenty_train = fetch_20newsgroups(subset='train', categories=categories, shuffle=True, random_state=42)
    return twenty_train
    
    def bag_of_words(categories):
    count_vect = CountVectorizer()
    X_train_counts = count_vect.fit_transform(fetch_train_dataset(categories).data)
    pickle.dump(count_vect.vocabulary_, open("vocab.pickle", 'wb'))
    return X_train_counts
    
    def tf_idf(categories):
    tf_transformer = TfidfTransformer()
    return (tf_transformer,tf_transformer.fit_transform(bag_of_words(categories)))
    
    def model(categories):
    clf = MultinomialNB().fit(tf_idf(categories)[1], fetch_train_dataset(categories).target)
    return clf
    
    model = model(categories)
    pickle.dump(model,open("model.pickle", 'wb'))
    print("Training Finished!")
    #Training Finished Here

    2. The Kafka Tweet Producer

    We have the trained model in place. Now lets get the real time stream of Twitter via Kafka. We define the Producer.

    # import required libraries
    from kafka import SimpleProducer, KafkaClient
    from tweepy.streaming import StreamListener
    from tweepy import OAuthHandler
    from tweepy import Stream
    from twitter_config import consumer_key, consumer_secret, access_token, access_token_secret
    import json

    Now we will define Kafka settings and will create KafkaPusher Class. This is necessary because we need to send the data coming from tweepy stream to Kafka producer.

    # Kafka settings
    topic = b'twitter-stream'
    
    # setting up Kafka producer
    kafka = KafkaClient('localhost:9092')
    producer = SimpleProducer(kafka)
    
    class KafkaPusher(StreamListener):
    
    def on_data(self, data):
    all_data = json.loads(data)
    tweet = all_data["text"]
    producer.send_messages(topic, tweet.encode('utf-8'))
    return True
    
    def on_error(self, status):
    print statusWORDS_TO_TRACK = ["Politics","Apple","Google","Microsoft","Bikes","Harley Davidson","Medicine"]
    
    if __name__ == '__main__':
    l = KafkaPusher()
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    stream = Stream(auth, l)
    while True:
    try:
    stream.filter(languages=["en"], track=WORDS_TO_TRACK)
    except:
    pass

    Note – You need to start Kafka server before running this script.

    3. Loading your model for predictions

    Now we have the trained model in step 1 and a twitter stream in step 2. Lets use the model now to do actual predictions. The first step is to load the model:

    #Loading model and vocab
    print("Loading pre-trained model")
    vocabulary_to_load = pickle.load(open("vocab.pickle", 'rb'))
    count_vect = CountVectorizer(vocabulary=vocabulary_to_load)
    load_model = pickle.load(open("model.pickle", 'rb'))count_vect._validate_vocabulary()
    tfidf_transformer = tf_idf(categories)[0]

    Then we start the kafka consumer and begin predictions:

    #predicting the streaming kafka messages
    consumer = KafkaConsumer('twitter-stream',bootstrap_servers=['localhost:9092'])
    print("Starting ML predictions.")
    for message in consumer:
    X_new_counts = count_vect.transform([message.value])
    X_new_tfidf = tfidf_transformer.transform(X_new_counts)
    predicted = load_model.predict(X_new_tfidf)
    print(message.value+" => "+fetch_train_dataset(categories).target_names[predicted[0]])

    Following are some of the classification done by our model

    • RT @amazingatheist: Making fun of kids who survived a school shooting just days after the event because you disagree with their politics is… => talk.politics.misc
    • sci.med
    • RT @DavidKlion: Apropos of that D’Souza tweet; I think in order to make sense of our politics, you need to understand that there are some t… => talk.politics.misc
    • RT @BeauWillimon: These students have already cemented a place in history with their activism, and they’re just getting started. No one wil… => talk.politics.misc
    • RT @byedavo: Cause we ain’t got no president => talk.politics.misc
    • RT @appleinsider: .@Apple reportedly in talks to buy cobalt, key Li-ion battery ingredient, directly from miners … => comp.sys.mac.hardware

    Here is the link to the complete git repository

    Conclusion:

    In this blog, we were successful in creating a data pipeline where we were using the Naive Bayes model for doing classification of the streaming twitter data. We can classify other sources of data like news articles, blog posts etc. Do let us know if you have any questions, queries and additional thoughts in the comments section below.

    Happy coding!

  • Taking Amazon’s Elastic Kubernetes Service for a Spin

    With the introduction of Elastic Kubernetes service at AWS re: Invent last year, AWS finally threw their hat in the ever booming space of managed Kubernetes services. In this blog post, we will learn the basic concepts of EKS, launch an EKS cluster and also deploy a multi-tier application on it.

    What is Elastic Kubernetes service (EKS)?

    Kubernetes works on a master-slave architecture. The master is also referred to as control plane. If the master goes down it brings our entire cluster down, thus ensuring high availability of master is absolutely critical as it can be a single point of failure. Ensuring high availability of master and managing all the worker nodes along with it becomes a cumbersome task in itself, thus it is most desirable for organizations to have managed Kubernetes cluster so that they can focus on the most important task which is to run their applications rather than managing the cluster. Other cloud providers like Google cloud and Azure already had their managed Kubernetes service named GKE and AKS respectively. Similarly now with EKS Amazon has also rolled out its managed Kubernetes cluster to provide a seamless way to run Kubernetes workloads.

    Key EKS concepts:

    EKS takes full advantage of the fact that it is running on AWS so instead of creating Kubernetes specific features from the scratch they have reused/plugged in the existing AWS services with EKS for achieving Kubernetes specific functionalities. Here is a brief overview:

    IAM-integration: Amazon EKS integrates IAM authentication with Kubernetes RBAC ( role-based access control system native to Kubernetes) with the help of Heptio Authenticator which is a tool that uses AWS IAM credentials to authenticate to a Kubernetes cluster. Here we can directly attach an RBAC role with an IAM entity this saves the pain of managing another set of credentials at the cluster level.

    Container Interface:  AWS has developed an open source cni plugin which takes advantage of the fact that multiple network interfaces can be attached to a single EC2 instance and these interfaces can have multiple secondary private ips associated with them, these secondary ips are used to provide pods running on EKS with real ip address from VPC cidr pool. This improves the latency for inter pod communications as the traffic flows without any overlay.  

    ELB Support:  We can use any of the AWS ELB offerings (classic, network, application) to route traffic to our service running on the working nodes.

    Auto scaling:  The number of worker nodes in the cluster can grow and shrink using the EC2 auto scaling service.

    Route 53: With the help of the External DNS project and AWS route53 we can manage the DNS entries for the load balancers which get created when we create an ingress object in our EKS cluster or when we create a service of type LoadBalancer in our cluster. This way the DNS names are always in sync with the load balancers and we don’t have to give separate attention to it.   

    Shared responsibility for cluster: The responsibilities of an EKS cluster is shared between AWS and customer. AWS takes care of the most critical part of managing the control plane (api server and etcd database) and customers need to manage the worker node. Amazon EKS automatically runs Kubernetes with three masters across three Availability Zones to protect against a single point of failure, control plane nodes are also monitored and replaced if they fail, and are also patched and updated automatically this ensures high availability of the cluster and makes it extremely simple to migrate existing workloads to EKS.

    Prerequisites for launching an EKS cluster:

    1.  IAM role to be assumed by the cluster: Create an IAM role that allows EKS to manage a cluster on your behalf. Choose EKS as the service which will assume this role and add AWS managed policies ‘AmazonEKSClusterPolicy’ and ‘AmazonEKSServicePolicy’ to it.

    2.  VPC for the cluster:  We need to create the VPC where our cluster is going to reside. We need a VPC with subnets, internet gateways and other components configured. We can use an existing VPC for this if we wish or create one using the CloudFormation script provided by AWS here or use the Terraform script available here. The scripts take ‘cidr’ block of the VPC and three other subnets as arguments.

    Launching an EKS cluster:

    1.  Using the web console: With the prerequisites in place now we can go to the EKS console and launch an EKS cluster when we try to launch an EKS cluster we need to provide a the name of the EKS cluster, choose the Kubernetes version to use, provide the IAM role we created in step one and also choose a VPC, once we choose a VPC we also need to select subnets from the VPC where we want our worker nodes to be launched by default all the subnets in the VPC are selected we also need to provide a security group which is applied to the elastic network interfaces (eni) that EKS creates to allow control plane communicate with the worker nodes.

    NOTE: Couple of things to note here is that the subnets must be in at least two different availability zones and the security group that we provided is later updated when we create worker node cluster so it is better to not use this security group with any other entity or be completely sure of the changes happening to it.

    2. Using awscli :

    aws eks create-cluster --name eks-blog-cluster --role-arn arn:aws:iam::XXXXXXXXXXXX:role/eks-service-role  
    --resources-vpc-config subnetIds=subnet-0b8da2094908e1b23,subnet-01a46af43b2c5e16c,securityGroupIds=sg-03fa0c02886c183d4

    {
        "cluster": {
            "status": "CREATING",
            "name": "eks-blog-cluster",
            "certificateAuthority": {},
            "roleArn": "arn:aws:iam::XXXXXXXXXXXX:role/eks-service-role",
            "resourcesVpcConfig": {
                "subnetIds": [
                    "subnet-0b8da2094908e1b23",
                    "subnet-01a46af43b2c5e16c"
                ],
                "vpcId": "vpc-0364b5ed9f85e7ce1",
                "securityGroupIds": [
                    "sg-03fa0c02886c183d4"
                ]
            },
            "version": "1.10",
            "arn": "arn:aws:eks:us-east-1:XXXXXXXXXXXX:cluster/eks-blog-cluster",
            "createdAt": 1535269577.147
        }
    }

    In the response, we see that the cluster is in creating state. It will take a few minutes before it is available. We can check the status using the below command:

    aws eks describe-cluster --name=eks-blog-cluster

    Configure kubectl for EKS:

    We know that in Kubernetes we interact with the control plane by making requests to the API server. The most common way to interact with the API server is via kubectl command line utility. As our cluster is ready now we need to install kubectl.

    1.  Install the kubectl binary

    curl -LO https://storage.googleapis.com/kubernetes-release/release/`curl -s 
    https://storage.googleapis.com/kubernetes-release/release/stable.txt`/bin/linux/amd64/kubectl

    Give executable permission to the binary.

    chmod +x ./kubectl

    Move the kubectl binary to a folder in your system’s $PATH.

    sudo cp ./kubectl /bin/kubectl && export PATH=$HOME/bin:$PATH

    As discussed earlier EKS uses AWS IAM Authenticator for Kubernetes to allow IAM authentication for your Kubernetes cluster. So we need to download and install the same.

    2.  Install aws-iam-authenticator

    curl -o aws-iam-authenticator https://amazon-eks.s3-us-west-2.amazonaws.com/1.10.3/2018-07-26/bin/linux/amd64/aws-iam-authenticator

    Give executable permission to the binary

    chmod +x ./aws-iam-authenticator

    Move the aws-iam-authenticator binary to a folder in your system’s $PATH.

    sudo cp ./aws-iam-authenticator /bin/aws-iam-authenticator

    3.  Create the kubeconfig file

    First create the directory.

    mkdir -p ~/.kube

    Open a config file in the folder created above

    sudo vi .kube/config-eks-blog-cluster

    Paste the below code in the file

    clusters:      
    - cluster:       
    server: https://DBFE36D09896EECAB426959C35FFCC47.sk1.us-east-1.eks.amazonaws.com        
    certificate-authority-data: ”....................”        
    name: kubernetes        
    contexts:        
    - context:             
    cluster: kubernetes             
    user: aws          
    name: aws        
    current-context: aws        
    kind: Config       
    preferences: {}        
    users:           
    - name: aws            
    user:                
    exec:                    
    apiVersion: client.authentication.k8s.io/v1alpha1                    
    command: aws-iam-authenticator                    
    args:                       
    - "token"                       
    - "-i"                     
    - “eks-blog-cluster"

    Replace the values of the server and certificateauthority data with the values of your cluster and certificate and also update the cluster name in the args section. You can get these values from the web console as well as using the command.

    aws eks describe-cluster --name=eks-blog-cluster

    Save and exit.

    Add that file path to your KUBECONFIG environment variable so that kubectl knows where to look for your cluster configuration.

    export KUBECONFIG=$KUBECONFIG:~/.kube/config-eks-blog-cluster

    To verify that the kubectl is now properly configured :

    kubectl get all
    NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
    service/kubernetes ClusterIP 172.20.0.1  443/TCP 50m

    Launch and configure worker nodes :

    Now we need to launch worker nodes before we can start deploying apps. We can create the worker node cluster by using the CloudFormation script provided by AWS which is available here or use the Terraform script available here.

    • ClusterName: Name of the Amazon EKS cluster we created earlier.
    • ClusterControlPlaneSecurityGroup: Id of the security group we used in EKS cluster.
    • NodeGroupName: Name for the worker node auto scaling group.
    • NodeAutoScalingGroupMinSize: Minimum number of worker nodes that you always want in your cluster.
    • NodeAutoScalingGroupMaxSize: Maximum number of worker nodes that you want in your cluster.
    • NodeInstanceType: Type of worker node you wish to launch.
    • NodeImageId: AWS provides Amazon EKS-optimized AMI to be used as worker nodes. Currently AKS is available in only two AWS regions Oregon and N.virginia and the AMI ids are ami-02415125ccd555295 and ami-048486555686d18a0 respectively
    • KeyName: Name of the key you will use to ssh into the worker node.
    • VpcId: Id of the VPC that we created earlier.
    • Subnets: Subnets from the VPC we created earlier.

    To enable worker nodes to join your cluster, we need to download, edit and apply the AWS authenticator config map.

    Download the config map:

    curl -O https://amazon-eks.s3-us-west-2.amazonaws.com/1.10.3/2018-07-26/aws-auth-cm.yaml

    Open it in an editor

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: aws-auth
      namespace: kube-system
    data:
      mapRoles: |
        - rolearn: <ARN of instance role (not instance profile)>
          username: system:node:{{EC2PrivateDNSName}}
          groups:
            - system:bootstrappers
            - system:nodes

    Edit the value of rolearn with the arn of the role of your worker nodes. This value is available in the output of the scripts that you ran. Save the change and then apply

    kubectl apply -f aws-auth-cm.yaml

    Now you can check if the nodes have joined the cluster or not.

    kubectl get nodes
    NAME STATUS ROLES AGE VERSION
    ip-10-0-2-171.ec2.internal Ready  12s v1.10.3
    ip-10-0-3-58.ec2.internal Ready  14s v1.10.3

    Deploying an application:

    As our cluster is completely ready now we can start deploying applications on it. We will deploy a simple books api application which connects to a mongodb database and allows users to store,list and delete book information.

    1. MongoDB Deployment YAML

    apiVersion: extensions/v1beta1
    kind: Deployment
    metadata:
      name: mongodb
    spec:
      template:
        metadata:
          labels:
            app: mongodb
        spec:
          containers:
          - name: mongodb
            image: mongo
            ports:
            - name: mongodbport
              containerPort: 27017
              protocol: TCP

    2. Test Application Development YAML

    apiVersion: apps/v1beta1
    kind: Deployment
    metadata:
      name: test-app
    spec:
      replicas: 1
      template:
        metadata:
          labels:
            app: test-app
        spec:
          containers:
          - name: test-app
            image: akash125/pyapp
            imagePullPolicy: IfNotPresent
            ports:
            - containerPort: 3000

    3. MongoDB Service YAML

    apiVersion: v1
    kind: Service
    metadata:
      name: mongodb-service
    spec:
      ports:
      - port: 27017
        targetPort: 27017
        protocol: TCP
        name: mongodbport
      selector:
        app: mongodb

    4. Test Application Service YAML

    apiVersion: v1
    kind: Service
    metadata:
      name: test-service
    spec:
      type: LoadBalancer
      ports:
      - name: test-service
        port: 80
        protocol: TCP
        targetPort: 3000
      selector:
        app: test-app

    Services

    $ kubectl create -f mongodb-service.yaml
    $ kubectl create -f testapp-service.yaml

    Deployments

    $ kubectl create -f mongodb-deployment.yaml
    $ kubectl create -f testapp-deployment.yaml$ kubectl get services
    NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
    kubernetes ClusterIP 172.20.0.1 <none> 443/TCP 12m
    mongodb-service ClusterIP 172.20.55.194 <none> 27017/TCP 4m
    test-service LoadBalancer 172.20.188.77 a7ee4f4c3b0ea 80:31427/TCP 3m

    In the EXTERNAL-IP section of the test-service we see dns of an load balancer we can now access the application from outside the cluster using this dns.

    To Store Data :

    curl -X POST -d '{"name":"A Game of Thrones (A Song of Ice and Fire)“, "author":"George R.R. Martin","price":343}' http://a7ee4f4c3b0ea11e8b0f912f36098e4d-672471149.us-east-1.elb.amazonaws.com/books
    {"id":"5b8fab49fa142b000108d6aa","name":"A Game of Thrones (A Song of Ice and Fire)","author":"George R.R. Martin","price":343}

    To Get Data :

    curl -X GET http://a7ee4f4c3b0ea11e8b0f912f36098e4d-672471149.us-east-1.elb.amazonaws.com/books
    [{"id":"5b8fab49fa142b000108d6aa","name":"A Game of Thrones (A Song of Ice and Fire)","author":"George R.R. Martin","price":343}]

    We can directly put the URL used in the curl operation above in our browser as well, we will get the same response.

    Now our application is deployed on EKS and can be accessed by the users.

    Comparison BETWEEN GKE, ECS and EKS:

    Cluster creation: Creating GKE and ECS cluster is way simpler than creating an EKS cluster. GKE being the simplest of all three.

    Cost: In case of both, GKE and ECS we pay only for the infrastructure that is visible to us i.e., servers, volumes, ELB etc. and there is no cost for master nodes or other cluster management services but with EKS there is a charge of 0.2 $ per hour for the control plane.

    Add-ons: GKE provides the option of using Calico as the network plugin which helps in defining network policies for controlling inter pod communication (by default all pods in k8s can communicate with each other).

    Serverless: ECS cluster can be created using Fargate which is container as a Service (CaaS) offering from AWS. Similarly EKS is also expected to support Fargate very soon.

    In terms of availability and scalability all the services are at par with each other.

    Conclusion:

    In this blog post we learned the basics concepts of EKS, launched our own EKS cluster and deployed an application as well. EKS is much awaited service from AWS especially for the folks who were already running their Kubernetes workloads on AWS, as now they can easily migrate to EKS and have a fully managed Kubernetes control plane. EKS is expected to be adopted by many organisations in near future.

    References:

  • Implementing Async Features in Python – A Step-by-step Guide

    Asynchronous programming is a characteristic of modern programming languages that allows an application to perform various operations without waiting for any of them. Asynchronicity is one of the big reasons for the popularity of Node.js.

    We have discussed Python’s asynchronous features as part of our previous post: an introduction to asynchronous programming in Python. This blog is a natural progression on the same topic. We are going to discuss async features in Python in detail and look at some hands-on examples.

    Consider a traditional web scraping application that needs to open thousands of network connections. We could open one network connection, fetch the result, and then move to the next ones iteratively. This approach increases the latency of the program. It spends a lot of time opening a connection and waiting for others to finish their bit of work.

    On the other hand, async provides you a method of opening thousands of connections at once and swapping among each connection as they finish and return their results. Basically, it sends the request to a connection and moves to the next one instead of waiting for the previous one’s response. It continues like this until all the connections have returned the outputs.  

    Source: phpmind

    From the above chart, we can see that using synchronous programming on four tasks took 45 seconds to complete, while in asynchronous programming, those four tasks took only 20 seconds.

    Where Does Asynchronous Programming Fit in the Real-world?

    Asynchronous programming is best suited for popular scenarios such as:

    1. The program takes too much time to execute.

    2. The reason for the delay is waiting for input or output operations, not computation.

    3. For the tasks that have multiple input or output operations to be executed at once.

    And application-wise, these are the example use cases:

    • Web Scraping
    • Network Services

    Difference Between Parallelism, Concurrency, Threading, and Async IO

    Because we discussed this comparison in detail in our previous post, we will just quickly go through the concept as it will help us with our hands-on example later.

    Parallelism involves performing multiple operations at a time. Multiprocessing is an example of it. It is well suited for CPU bound tasks.

    Concurrency is slightly broader than Parallelism. It involves multiple tasks running in an overlapping manner.

    Threading – a thread is a separate flow of execution. One process can contain multiple threads and each thread runs independently. It is ideal for IO bound tasks.

    Async IO is a single-threaded, single-process design that uses cooperative multitasking. In simple words, async IO gives a feeling of concurrency despite using a single thread in a single process.

     

    Fig:- A comparison in concurrency and parallelism

     

    Components of Async IO Programming

    Let’s explore the various components of Async IO in depth. We will also look at an example code to help us understand the implementation.

    1. Coroutines

    Coroutines are mainly generalization forms of subroutines. They are generally used for cooperative tasks and behave like Python generators.

    An async function uses the await keyword to denote a coroutine. When using the await keyword, coroutines release the flow of control back to the event loop.

    To run a coroutine, we need to schedule it on the event loop. After scheduling, coroutines are wrapped in Tasks as a Future object.

    Example:

    In the below snippet, we called async_func from the main function. We have to add the await keyword while calling the sync function. As you can see, async_func will do nothing unless the await keyword implementation accompanies it.

    import asyncio
    async def async_func():
        print('Velotio ...')
        await asyncio.sleep(1)
        print('... Technologies!')
    
    async def main():
        async_func()#this will do nothing because coroutine object is created but not awaited
        await async_func()
    
    asyncio.run(main())

    Output

    RuntimeWarning: coroutine 'async_func' was never awaited
     async_func()#this will do nothing because coroutine object is created but not awaited
    RuntimeWarning: Enable tracemalloc to get the object allocation traceback
    Velotio ...
    ... Blog!

    2. Tasks

    Tasks are used to schedule coroutines concurrently.

    When submitting a coroutine to an event loop for processing, you can get a Task object, which provides a way to control the coroutine’s behavior from outside the event loop.

    Example:

    In the snippet below, we are creating a task using create_task (an inbuilt function of asyncio library), and then we are running it.

    import asyncio
    async def async_func():
        print('Velotio ...')
        await asyncio.sleep(1)
        print('... Blog!')
    
    async def main():
        task = asyncio.create_task (async_func())
        await task
    asyncio.run(main())

    Output

    Velotio ...
    ... Blog!

    3 Event Loops

    This mechanism runs coroutines until they complete. You can imagine it as while(True) loop that monitors coroutine, taking feedback on what’s idle, and looking around for things that can be executed in the meantime.

    It can wake up an idle coroutine when whatever that coroutine is waiting on becomes available.

    Only one event loop can run at a time in Python.

    Example:

    In the snippet below, we are creating three tasks and then appending them in a list and executing all tasks asynchronously using get_event_loop, create_task and the await function of the asyncio library.

    import asyncio
    async def async_func(task_no):
        print(f'{task_no} :Velotio ...')
        await asyncio.sleep(1)
        print(f'{task_no}... Blog!')
    
    async def main():
        taskA = loop.create_task (async_func('taskA'))
        taskB = loop.create_task(async_func('taskB'))
        taskC = loop.create_task(async_func('taskC'))
        await asyncio.wait([taskA,taskB,taskC])
    
    if __name__ == "__main__":
        try:
            loop = asyncio.get_event_loop()
            loop.run_until_complete(main())
        except :
            pass

    Output

    taskA :Velotio ...
    taskB :Velotio ...
    taskC :Velotio ...
    taskA... Blog!
    taskB... Blog!
    taskC... Blog!

    Future

    A future is a special, low-level available object that represents an eventual result of an asynchronous operation.

    When a Future object is awaited, the co-routine will wait until the Future is resolved in some other place.

    We will look into the sample code for Future objects in the next section.

    A Comparison Between Multithreading and Async IO

    Before we get to Async IO, let’s use multithreading as a benchmark and then compare them to see which is more efficient.

    For this benchmark, we will be fetching data from a sample URL (the Velotio Career webpage) with different frequencies, like once, ten times, 50 times, 100 times, 500 times, respectively.

    We will then compare the time taken by both of these approaches to fetch the required data.

    Implementation

    Code of Multithreading:

    import requests
    import time
    from concurrent.futures import ProcessPoolExecutor
    
    
    def fetch_url_data(pg_url):
        try:
            resp = requests.get(pg_url)
        except Exception as e:
            print(f"Error occured during fetch data from url{pg_url}")
        else:
            return resp.content
            
    
    def get_all_url_data(url_list):
        with ProcessPoolExecutor() as executor:
            resp = executor.map(fetch_url_data, url_list)
        return resp
        
    
    if __name__=='__main__':
        url = "https://www.velotio.com/careers"
        for ntimes in [1,10,50,100,500]:
            start_time = time.time()
            responses = get_all_url_data([url] * ntimes)
            print(f'Fetch total {ntimes} urls and process takes {time.time() - start_time} seconds')

    Output

    Fetch total 1 urls and process takes 1.8822264671325684 seconds
    Fetch total 10 urls and process takes 2.3358211517333984 seconds
    Fetch total 50 urls and process takes 8.05638575553894 seconds
    Fetch total 100 urls and process takes 14.43302869796753 seconds
    Fetch total 500 urls and process takes 65.25404500961304 seconds

    ProcessPoolExecutor is a Python package that implements the Executor interface. The fetch_url_data is a function to fetch the data from the given URL using the requests python package, and the get_all_url_data function is used to map the fetch_url_data function to the lists of URLs.

    Async IO Programming Example:

    import asyncio
    import time
    from aiohttp import ClientSession, ClientResponseError
    
    
    async def fetch_url_data(session, url):
        try:
            async with session.get(url, timeout=60) as response:
                resp = await response.read()
        except Exception as e:
            print(e)
        else:
            return resp
        return
    
    
    async def fetch_async(loop, r):
        url = "https://www.velotio.com/careers"
        tasks = []
        async with ClientSession() as session:
            for i in range(r):
                task = asyncio.ensure_future(fetch_url_data(session, url))
                tasks.append(task)
            responses = await asyncio.gather(*tasks)
        return responses
    
    
    if __name__ == '__main__':
        for ntimes in [1, 10, 50, 100, 500]:
            start_time = time.time()
            loop = asyncio.get_event_loop()
            future = asyncio.ensure_future(fetch_async(loop, ntimes))
            loop.run_until_complete(future) #will run until it finish or get any error
            responses = future.result()
            print(f'Fetch total {ntimes} urls and process takes {time.time() - start_time} seconds')

    Output

    Fetch total 1 urls and process takes 1.3974951362609863 seconds
    Fetch total 10 urls and process takes 1.4191942596435547 seconds
    Fetch total 50 urls and process takes 2.6497368812561035 seconds
    Fetch total 100 urls and process takes 4.391665458679199 seconds
    Fetch total 500 urls and process takes 4.960426330566406 seconds

    We need to use the get_event_loop function to create and add the tasks. For running more than one URL, we have to use ensure_future and gather function.

    The fetch_async function is used to add the task in the event_loop object and the fetch_url_data function is used to read the data from the URL using the session package. The future_result method returns the response of all the tasks.

    Results:

    As you can see from the plot, async programming is much more efficient than multi-threading for the program above. 

    The graph of the multithreading program looks linear, while the asyncio program graph is similar to logarithmic.

     

    Conclusion

    As we saw in our experiment above, Async IO showed better performance with the efficient use of concurrency than multi-threading.

    Async IO can be beneficial in applications that can exploit concurrency. Though, based on what kind of applications we are dealing with, it is very pragmatic to choose Async IO over other implementations.

    We hope this article helped further your understanding of the async feature in Python and gave you some quick hands-on experience using the code examples shared above.

  • Implementing gRPC In Python: A Step-by-step Guide

    In the last few years, we saw a great shift in technology, where projects are moving towards “microservice architecture” vs the old “monolithic architecture”. This approach has done wonders for us. 

    As we say, “smaller things are much easier to handle”, so here we have microservices that can be handled conveniently. We need to interact among different microservices. I handled it using the HTTP API call, which seems great and it worked for me.

    But is this the perfect way to do things?

    The answer is a resounding, “no,” because we compromised both speed and efficiency here. 

    Then came in the picture, the gRPC framework, that has been a game-changer.

    What is gRPC?

    Quoting the official documentation

    gRPC or Google Remote Procedure Call is a modern open-source high-performance RPC framework that can run in any environment. It can efficiently connect services in and across data centers with pluggable support for load balancing, tracing, health checking and authentication.”

     

    Credit: gRPC

    RPC or remote procedure calls are the messages that the server sends to the remote system to get the task(or subroutines) done.

    Google’s RPC is designed to facilitate smooth and efficient communication between the services. It can be utilized in different ways, such as:

    • Efficiently connecting polyglot services in microservices style architecture
    • Connecting mobile devices, browser clients to backend services
    • Generating efficient client libraries

    Why gRPC? 

    HTTP/2 based transport – It uses HTTP/2 protocol instead of HTTP 1.1. HTTP/2 protocol provides multiple benefits over the latter. One major benefit is multiple bidirectional streams that can be created and sent over TCP connections parallelly, making it swift. 

    Auth, tracing, load balancing and health checking – gRPC provides all these features, making it a secure and reliable option to choose.

    Language independent communication– Two services may be written in different languages, say Python and Golang. gRPC ensures smooth communication between them.

    Use of Protocol Buffers – gRPC uses protocol buffers for defining the type of data (also called Interface Definition Language (IDL)) to be sent between the gRPC client and the gRPC server. It also uses it as the message interchange format. 

    Let’s dig a little more into what are Protocol Buffers.

    Protocol Buffers

    Protocol Buffers like XML, are an efficient and automated mechanism for serializing structured data. They provide a way to define the structure of data to be transmitted. Google says that protocol buffers are better than XML, as they are:

    • simpler
    • three to ten times smaller
    • 20 to 100 times faster
    • less ambiguous
    • generates data access classes that make it easier to use them programmatically

    Protobuf are defined in .proto files. It is easy to define them. 

    Types of gRPC implementation

    1. Unary RPCs:- This is a simple gRPC which works like a normal function call. It sends a single request declared in the .proto file to the server and gets back a single response from the server.

    rpc HelloServer(RequestMessage) returns (ResponseMessage);

    2. Server streaming RPCs:- The client sends a message declared in the .proto file to the server and gets back a stream of message sequence to read. The client reads from that stream of messages until there are no messages.

    rpc HelloServer(RequestMessage) returns (stream ResponseMessage);

    3. Client streaming RPCs:- The client writes a message sequence using a write stream and sends the same to the server. After all the messages are sent to the server, the client waits for the server to read all the messages and return a response.

    rpc HelloServer(stream RequestMessage) returns (ResponseMessage);

    4. Bidirectional streaming RPCs:- Both gRPC client and the gRPC server use a read-write stream to send a message sequence. Both operate independently, so gRPC clients and gRPC servers can write and read in any order they like, i.e. the server can read a message then write a message alternatively, wait to receive all messages then write its responses, or perform reads and writes in any other combination.

    rpc HelloServer(stream RequestMessage) returns (stream ResponseMessage);

    **gRPC guarantees the ordering of messages within an individual RPC call. In the case of Bidirectional streaming, the order of messages is preserved in each stream.

    Implementing gRPC in Python

    Currently, gRPC provides support for many languages like Golang, C++, Java, etc. I will be focussing on its implementation using Python.

    mkdir grpc_example
    cd grpc_example
    virtualenv -p python3 env
    source env/bin/activate
    pip install grpcio grpcio-tools

    This will install all the required dependencies to implement gRPC.

    Unary gRPC 

    For implementing gRPC services, we need to define three files:-

    • Proto file – Proto file comprises the declaration of the service that is used to generate stubs (<package_name>_pb2.py and <package_name>_pb2_grpc.py). These are used by the gRPC client and the gRPC server.</package_name></package_name>
    • gRPC client – The client makes a gRPC call to the server to get the response as per the proto file.
    • gRPC Server – The server is responsible for serving requests to the client.
    syntax = "proto3";
    
    package unary;
    
    service Unary{
      // A simple RPC.
      //
      // Obtains the MessageResponse at a given position.
     rpc GetServerResponse(Message) returns (MessageResponse) {}
    
    }
    
    message Message{
     string message = 1;
    }
    
    message MessageResponse{
     string message = 1;
     bool received = 2;
    }

    In the above code, we have declared a service named Unary. It consists of a collection of services. For now, I have implemented a single service GetServerResponse(). This service takes an input of type Message and returns a MessageResponse. Below the service declaration, I have declared Message and Message Response.

    Once we are done with the creation of the .proto file, we need to generate the stubs. For that, we will execute the below command:-

    python -m grpc_tools.protoc --proto_path=. ./unary.proto --python_out=. --grpc_python_out=.

    Two files are generated named unary_pb2.py and unary_pb2_grpc.py. Using these two stub files, we will implement the gRPC server and the client.

    Implementing the Server

    import grpc
    from concurrent import futures
    import time
    import unary.unary_pb2_grpc as pb2_grpc
    import unary.unary_pb2 as pb2
    
    
    class UnaryService(pb2_grpc.UnaryServicer):
    
        def __init__(self, *args, **kwargs):
            pass
    
        def GetServerResponse(self, request, context):
    
            # get the string from the incoming request
            message = request.message
            result = f'Hello I am up and running received "{message}" message from you'
            result = {'message': result, 'received': True}
    
            return pb2.MessageResponse(**result)
    
    
    def serve():
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
        pb2_grpc.add_UnaryServicer_to_server(UnaryService(), server)
        server.add_insecure_port('[::]:50051')
        server.start()
        server.wait_for_termination()
    
    
    if __name__ == '__main__':
        serve()

    In the gRPC server file, there is a GetServerResponse() method which takes `Message` from the client and returns a `MessageResponse` as defined in the proto file.

    server() function is called from the main function, and makes sure that the server is listening to all the time. We will run the unary_server to start the server

    python3 unary_server.py

    Implementing the Client

    import grpc
    import unary.unary_pb2_grpc as pb2_grpc
    import unary.unary_pb2 as pb2
    
    
    class UnaryClient(object):
        """
        Client for gRPC functionality
        """
    
        def __init__(self):
            self.host = 'localhost'
            self.server_port = 50051
    
            # instantiate a channel
            self.channel = grpc.insecure_channel(
                '{}:{}'.format(self.host, self.server_port))
    
            # bind the client and the server
            self.stub = pb2_grpc.UnaryStub(self.channel)
    
        def get_url(self, message):
            """
            Client function to call the rpc for GetServerResponse
            """
            message = pb2.Message(message=message)
            print(f'{message}')
            return self.stub.GetServerResponse(message)
    
    
    if __name__ == '__main__':
        client = UnaryClient()
        result = client.get_url(message="Hello Server you there?")
        print(f'{result}')

    In the __init__func. we have initialized the stub using ` self.stub = pb2_grpc.UnaryStub(self.channel)’ And we have a get_url function which calls to server using the above-initialized stub  

    This completes the implementation of Unary gRPC service.

    Let’s check the output:-

    Run -> python3 unary_client.py 

    Output:-

    message: “Hello Server you there?”

    message: “Hello I am up and running. Received ‘Hello Server you there?’ message from you”

    received: true

    Bidirectional Implementation

    syntax = "proto3";
    
    package bidirectional;
    
    service Bidirectional {
      // A Bidirectional streaming RPC.
      //
      // Accepts a stream of Message sent while a route is being traversed,
       rpc GetServerResponse(stream Message) returns (stream Message) {}
    }
    
    message Message {
      string message = 1;
    }

    In the above code, we have declared a service named Bidirectional. It consists of a collection of services. For now, I have implemented a single service GetServerResponse(). This service takes an input of type Message and returns a Message. Below the service declaration, I have declared Message.

    Once we are done with the creation of the .proto file, we need to generate the stubs. To generate the stub, we need the execute the below command:-

    python -m grpc_tools.protoc --proto_path=.  ./bidirecctional.proto --python_out=. --grpc_python_out=.

    Two files are generated named bidirectional_pb2.py and bidirectional_pb2_grpc.py. Using these two stub files, we will implement the gRPC server and client.

    Implementing the Server

    from concurrent import futures
    
    import grpc
    import bidirectional.bidirectional_pb2_grpc as bidirectional_pb2_grpc
    
    
    class BidirectionalService(bidirectional_pb2_grpc.BidirectionalServicer):
    
        def GetServerResponse(self, request_iterator, context):
            for message in request_iterator:
                yield message
    
    
    def serve():
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
        bidirectional_pb2_grpc.add_BidirectionalServicer_to_server(BidirectionalService(), server)
        server.add_insecure_port('[::]:50051')
        server.start()
        server.wait_for_termination()
    
    
    if __name__ == '__main__':
        serve()

    In the gRPC server file, there is a GetServerResponse() method which takes a stream of `Message` from the client and returns a stream of `Message` independent of each other. server() function is called from the main function and makes sure that the server is listening to all the time.

    We will run the bidirectional_server to start the server:

    python3 bidirectional_server.py

    Implementing the Client

    from __future__ import print_function
    
    import grpc
    import bidirectional.bidirectional_pb2_grpc as bidirectional_pb2_grpc
    import bidirectional.bidirectional_pb2 as bidirectional_pb2
    
    
    def make_message(message):
        return bidirectional_pb2.Message(
            message=message
        )
    
    
    def generate_messages():
        messages = [
            make_message("First message"),
            make_message("Second message"),
            make_message("Third message"),
            make_message("Fourth message"),
            make_message("Fifth message"),
        ]
        for msg in messages:
            print("Hello Server Sending you the %s" % msg.message)
            yield msg
    
    
    def send_message(stub):
        responses = stub.GetServerResponse(generate_messages())
        for response in responses:
            print("Hello from the server received your %s" % response.message)
    
    
    def run():
        with grpc.insecure_channel('localhost:50051') as channel:
            stub = bidirectional_pb2_grpc.BidirectionalStub(channel)
            send_message(stub)
    
    
    if __name__ == '__main__':
        run()

    In the run() function. we have initialised the stub using `  stub = bidirectional_pb2_grpc.BidirectionalStub(channel)’

    And we have a send_message function to which the stub is passed and it makes multiple calls to the server and receives the results from the server simultaneously.

    This completes the implementation of Bidirectional gRPC service.

    Let’s check the output:-

    Run -> python3 bidirectional_client.py 

    Output:-

    Hello Server Sending you the First message

    Hello Server Sending you the Second message

    Hello Server Sending you the Third message

    Hello Server Sending you the Fourth message

    Hello Server Sending you the Fifth message

    Hello from the server received your First message

    Hello from the server received your Second message

    Hello from the server received your Third message

    Hello from the server received your Fourth message

    Hello from the server received your Fifth message

    For code reference, please visit here.

    Conclusion‍

    gRPC is an emerging RPC framework that makes communication between microservices smooth and efficient. I believe gRPC is currently confined to inter microservice but has many other utilities that we will see in the coming years. To know more about modern data communication solutions, check out this blog.

  • Building an Intelligent Recommendation Engine with Collaborative Filtering

    In this post, we will talk about building a collaborative recommendation system. For this, we will utilize patient ratings with a drug and medical condition dataset to generate treatment suggestions.

    Let’s take a practical scenario where multiple medical practitioners have treated patients with different medical conditions with the most suitable drugs available. For every prescribed drug, the patients are diagnosed and then suggested a treatment plan, which is our experiences.

    The purpose of the recommendation system is to understand and find patterns with the information provided by patients during the diagnosis, and then suggest a treatment plan, which most closely matches the pattern identified by the recommendation system. 

    At the end of this article, we are going deeper into how these recommendations work and how we can find one preferred suggestion and the next five closest suggestions for any treatment.

    Definitions

    A recommendation system suggests or predicts a user’s behaviour by observing patterns of their past behaviour compared to others.

    In simple terms, it is a filtering engine that picks more relevant information for specific users by using all the available information. It is often used in ecommerce like Amazon, Flipkart, Youtube, and Netflix and personalized user products like Alexa and Google Home Mini.

    For the medical industry, where suggestions must be most accurate, a recommendation system will also take experiences into account. So, we must use all our experiences, and such applications will use every piece of information for any treatment. 

    Recommendation systems use information like various medical conditions and their effect on each patient. They compare these patterns to every new treatment to find the closest similarity.

    Concepts and Technology

    To design the recommendation system, we need a few concepts, which are listed below.

    1. Concepts: Pattern Recognition, Correlation, Cosine Similarity, Vector norms (L1, L2, L-Infinity)

    2. Language: Python (library: Numpy & Pandas), Scipy, Sklearn

    As far as the prototype development is concerned, we have support of a library (Scipy & Sklearn) that executes all the algorithms for us. All we need is a little Python and to use library functions.

    Different Approaches for Recommendation Systems

    Below I have listed a few filtering approaches and examples:

    • Collaborative filtering: It is based on review or response of users for any entity. Here, the suggestion is based on the highest rated item by most of the users. E.g., movie or mobile suggestions.
    • Content-based filtering: It is based on the pattern of each user’s past activity. Here, the suggestion is based on the most preferred by similar users. E.g., food suggestions.
    • Popularity-based filtering: It is based on a pattern of popularity among all users. E.g., YouTube video suggestions  

    Based on these filtering approaches, there will be different approaches to recommender systems, which are explained below:

    • Multi-criteria recommender systems: Various conditions like age, gender, location, likes, and dislikes are used for categorization and then items are suggested. E.g., suggestion of apparel based on age and gender.
    • Risk-aware recommender systems: There is always uncertainty when users use Internet applications (website or mobile). Recommending any advertisement over the Internet must consider risk and users must be aware of this. E.g., advertisement display suggestion over Internet application. 
    • Mobile recommender systems: These are location-based suggestions that consist of users’ current location or future location and provide suggestions based on that. E.g., mostly preferred in traveling and tourism.
    • Hybrid recommender systems: These are the combination of multiple approaches for recommendations. E.g., suggestion of hotels and restaurants based on user preference and travel information.
    • Collaborative and content recommender systems: These are the combination of collaborative and content-based approaches. E.g., suggestion of the highest-rated movie of users’ preference along with their watch history.

    Practical Example with Implementation

    In this example, we have a sample dataset of drugs prescribed for various medical conditions and ratings given by patients. What we need here is for any medical condition we have to receive a suggestion for the most suitable prescribed drugs for treatment.

    Sample Dataset: 

    Below is the sample of the publicly available medical drug dataset used from the Winter 2018 Kaggle University Club Hackathon.

    drugNameconditionratingcondition_id
    MirtazapineDepression10201
    MesalamineCrohn’s Disease, Maintenance8185
    BactrimUrinary Tract Infection9657
    ContraveWeight Loss9677
    Cyclafem 1 / 35Birth Control9122
    ZyclaraKeratosis4365
    CopperBirth Control6122
    AmitriptylineMigraine Prevention9403
    MethadoneOpiate Withdrawal7460
    LevoraBirth Control2122
    ParoxetineHot Flashes1310
    MiconazoleVaginal Yeast Infection6664
    BelviqWeight Loss1677
    SeroquelSchizoaffective Disorde10575
    AmbienInsomnia2347
    NuvigilNarcolepsy9424
    ChantixSmoking Cessation10597
    Microgestin Fe 1 / 20Acne349
    KlonopinBipolar Disorde6121
    CiprofloxacinUrinary Tract Infection10657
    TrazodoneInsomnia1347
    EnteraGamIrritable Bowel Syndrome9356
    AripiprazoleBipolar Disorde1121
    CyclosporineKeratoconjunctivitis Sicca1364

    Sample Code: 

    We will do this in 5 steps:

    1. Importing required libraries

    2. Reading the drugsComTest_raw.csv file and creating a pivot matrix.

    3. Creating a KNN model using the NearestNeighbors function with distance metric- ‘cosine’ & algorithm- ‘brute’. Possible values for distance metric are ‘cityblock’, ‘euclidean’, ‘l1’, ‘l2’ & ‘manhattan’. Possible values for the algorithm are ‘auto’, ‘ball_tree’, ‘kd_tree’, ‘brute’ & ‘cuml’.

    4. Selecting one medical condition randomly for which we have to suggest 5 drugs for treatment.

    5. Finding the 6 nearest neighbors for the sample, calling the kneighbors function with the trained KNN models created in step 3. The first k-neighbor for the sample medical condition is self with a distance of 0. The next 5 k-neighbors are drugs prescribed for the sample medical condition.

    #!/usr/bin/env python
    # coding: utf-8
    
    # Step 1
    import pandas as pd
    import numpy as np
    
    from scipy.sparse import csr_matrix
    from sklearn.neighbors import NearestNeighbors
    from sklearn.preprocessing import LabelEncoder
    encoder = LabelEncoder()
    
    
    # Step 2
    df = pd.read_csv('drugsComTest_raw.csv').fillna('NA')
    df['condition_id'] = pd.Series(encoder.fit_transform(df['condition'].values), index=df.index)
    df_medical = df.filter(['drugName', 'condition', 'rating', 'condition_id'], axis=1)
    df_medical_ratings_pivot=df_medical.pivot_table(index='drugName',columns='condition_id',values='rating').fillna(0)
    df_medical_ratings_pivot_matrix = csr_matrix(df_medical_ratings_pivot.values)
    
    
    # Step 3
    # distance =  [‘cityblock’, ‘cosine’, ‘euclidean’, ‘l1’, ‘l2’, ‘manhattan’]
    # algorithm = ['auto', 'ball_tree', 'kd_tree', 'brute', 'cuml']
    model_knn = NearestNeighbors(metric = 'cosine', algorithm = 'brute')
    model_knn.fit(df_medical_ratings_pivot_matrix)
    
    
    # Step 4
    sample_index = np.random.choice(df_medical_ratings_pivot.shape[0])
    sample_condition = df_medical_ratings_pivot.iloc[sample_index,:].values.reshape(1, -1)
    
    
    # Step 5
    distances, indices = model_knn.kneighbors(sample_condition, n_neighbors = 6)
    for i in range(0, len(distances.flatten())):
        if i == 0:
            print('Recommendations for {0}:n'.format(df_medical_ratings_pivot.index[sample_index]))
        else:
            recommendation = df_medical_ratings_pivot.index[indices.flatten()[i]]
            distanceFromSample = distances.flatten()[i]
            print('{0}: {1}, with distance of {2}:'.format(i, recommendation, distanceFromSample))

    Explanation:

    This is the collaborative-based recommendation system that uses the patients’ ratings of given drug treatments to find similarities in medical conditions. Here, we are matching the patterns for ratings given to drugs by patients. This system compares all the rating patterns and tries to find similarities (cosine similarity).

    Challenges of Recommendation System

    Any recommendation system requires a decent quantity of quality information to process. Before developing such a system, we must be aware of it. Acknowledging and handling such challenges improve the accuracy of recommendation.

    1. Cold Start: Recommending a new user or a user without any previous behavior is a problem. We can recommend the most popular options to them. E.g., YouTube videos suggestion for newly registered users.

    2. Not Enough Data: Having insufficient data provides recommendations with less certainty. E.g., suggestion of hotels or restaurants will not be accurate if systems are uncertain about users’ locations.

    3. Grey Sheep Problem: This problem occurs when the inconsistent behavior of a user makes it difficult to find a pattern. E.g., multiple users are using the same account, so user activity will be wide, and the system will have difficulty in mapping such patterns. 

    4. Similar items: In these cases, there is not enough data to separate similar items. For these situations, we can recommend all similar items randomly. E.g., apparel suggestions for users with color and sizes. All shirts are similar. 

    5. Shilling Attacks: Intentional negative behavior that leads to bad/unwanted recommendations. While immoral, we cannot deny the possibility of such attacks. E.g., user ratings and reviews over various social media platforms.

    Accuracy and Performance Measures

    Accuracy evaluation is important as we always follow and try to improve algorithms. The most preferred measures for improving algorithms are user studies, online evaluations, and offline evaluations. Our recommendation models must be ready to learn from users’ activity daily. For online evaluations, we have to regularly test our recommendation system.

    If we understand the challenges of the recommendation system, we can prepare such testing datasets to test its accuracy. With these variations of datasets, we can improve our approach of user studies and offline evaluations.

    1. Online Evaluations: In online evaluations, prediction models are updated frequently with the unmonitored data, which leads to the possibility of unexpected accuracy. To verify this, the prediction models are exposed to the unmonitored data with less uncertainty and then the uncertainty of unmonitored data is gradually increased. 

    2. Offline Evaluations: In offline evaluations, the prediction models are trained with a sample dataset that consists of all possible uncertainty with expected outcomes. To verify this, the sample dataset will be gradually updated and prediction models will be verified with predicted and actual outcomes. E.g., creating multiple users with certain activity and expecting genuine suggestions for them.

    Conclusion

    As a part of this article, we have learned about the approaches, challenges, and evaluation methods, and then we created a practical example of the collaboration-based recommendation system. We also explored various types and filtering approaches with real-world scenarios.

    We have also executed sample code with a publicly available medical drug dataset with patient ratings. We can opt for various options for distance matrix and algorithm for the NearestNeighbors calculation. We have also listed various challenges for this system and understood the accuracy evaluation measures and things that affect and improve them.

  • Exploring OpenAI Gym: A Platform for Reinforcement Learning Algorithms

    Introduction 

    According to the OpenAI Gym GitHub repository “OpenAI Gym is a toolkit for developing and comparing reinforcement learning algorithms. This is the gym open-source library, which gives you access to a standardized set of environments.”

    Open AI Gym has an environment-agent arrangement. It simply means Gym gives you access to an “agent” which can perform specific actions in an “environment”. In return, it gets the observation and reward as a consequence of performing a particular action in the environment.

    There are four values that are returned by the environment for every “step” taken by the agent.

    1. Observation (object): an environment-specific object representing your observation of the environment. For example, board state in a board game etc
    2. Reward (float): the amount of reward/score achieved by the previous action. The scale varies between environments, but the goal is always to increase your total reward/score.
    3. Done (boolean): whether it’s time to reset the environment again. E.g you lost your last life in the game.
    4. Info (dict): diagnostic information useful for debugging. However, official evaluations of your agent are not allowed to use this for learning.

    Following are the available Environments in the Gym:

    1. Classic control and toy text
    2. Algorithmic
    3. Atari
    4. 2D and 3D robots

    Here you can find a full list of environments.

    Cart-Pole Problem

    Here we will try to write a solve a classic control problem from Reinforcement Learning literature, “The Cart-pole Problem”.

    The Cart-pole problem is defined as follows:
    “A pole is attached by an un-actuated joint to a cart, which moves along a frictionless track. The system is controlled by applying a force of +1 or -1 to the cart. The pendulum starts upright, and the goal is to prevent it from falling over. A reward of +1 is provided for every timestep that the pole remains upright. The episode ends when the pole is more than 15 degrees from vertical, or the cart moves more than 2.4 units from the center.”

    The following code will quickly allow you see how the problem looks like on your computer.

    import gym
    env = gym.make('CartPole-v0')
    env.reset()
    for _ in range(1000):
        env.render()
        env.step(env.action_space.sample())

    This is what the output will look like:

    Coding the neural network 

    #We first import the necessary libraries and define hyperparameters - 
    
    import gym
    import random
    import numpy as np
    import tflearn
    from tflearn.layers.core import input_data, dropout, fully_connected
    from tflearn.layers.estimator import regression
    from statistics import median, mean
    from collections import Counter
    
    LR = 2.33e-4
    env = gym.make("CartPole-v0")
    observation = env.reset()
    goal_steps = 500
    score_requirement = 50
    initial_games = 10000
    
    #Now we will define a function to generate training data - 
    
    def initial_population():
        # [OBS, MOVES]
        training_data = []
        # all scores:
        scores = []
        # scores above our threshold:
        accepted_scores = []
        # number of episodes
        for _ in range(initial_games):
            score = 0
            # moves specifically from this episode:
            episode_memory = []
            # previous observation that we saw
            prev_observation = []
            for _ in range(goal_steps):
                # choose random action left or right i.e (0 or 1)
                action = random.randrange(0,2)
                observation, reward, done, info = env.step(action)
                # since that the observation is returned FROM the action
                # we store previous observation and corresponding action
                if len(prev_observation) > 0 :
                    episode_memory.append([prev_observation, action])
                prev_observation = observation
                score+=reward
                if done: break
    
            # reinforcement methodology here.
            # IF our score is higher than our threshold, we save
            # all we're doing is reinforcing the score, we're not trying
            # to influence the machine in any way as to HOW that score is
            # reached.
            if score >= score_requirement:
                accepted_scores.append(score)
                for data in episode_memory:
                    # convert to one-hot (this is the output layer for our neural network)
                    if data[1] == 1:
                        output = [0,1]
                    elif data[1] == 0:
                        output = [1,0]
    
                    # saving our training data
                    training_data.append([data[0], output])
    
            # reset env to play again
            env.reset()
            # save overall scores
            scores.append(score)
    
    # Now using tflearn we will define our neural network 
    
    def neural_network_model(input_size):
    
        network = input_data(shape=[None, input_size, 1], name='input')
    
        network = fully_connected(network, 128, activation='relu')
        network = dropout(network, 0.8)
    
        network = fully_connected(network, 256, activation='relu')
        network = dropout(network, 0.8)
    
        network = fully_connected(network, 512, activation='relu')
        network = dropout(network, 0.8)
    
        network = fully_connected(network, 256, activation='relu')
        network = dropout(network, 0.8)
    
        network = fully_connected(network, 128, activation='relu')
        network = dropout(network, 0.8)
    
        network = fully_connected(network, 2, activation='softmax')
        network = regression(network, optimizer='adam', learning_rate=LR, loss='categorical_crossentropy', name='targets')
        model = tflearn.DNN(network, tensorboard_dir='log')
    
        return model
    
    #It is time to train the model now -
    
    def train_model(training_data, model=False):
    
        X = np.array([i[0] for i in training_data]).reshape(-1,len(training_data[0][0]),1)
        y = [i[1] for i in training_data]
    
        if not model:
            model = neural_network_model(input_size = len(X[0]))
    
        model.fit({'input': X}, {'targets': y}, n_epoch=5, snapshot_step=500, show_metric=True, run_id='openai_CartPole')
        return model
    
    training_data = initial_population()
    
    model = train_model(training_data)
    
    #Training complete, now we should play the game to see how the output looks like 
    
    scores = []
    choices = []
    for each_game in range(10):
        score = 0
        game_memory = []
        prev_obs = []
        env.reset()
        for _ in range(goal_steps):
            env.render()
    
            if len(prev_obs)==0:
                action = random.randrange(0,2)
            else:
                action = np.argmax(model.predict(prev_obs.reshape(-1,len(prev_obs),1))[0])
    
            choices.append(action)
    
            new_observation, reward, done, info = env.step(action)
            prev_obs = new_observation
            game_memory.append([new_observation, action])
            score+=reward
            if done: break
    
        scores.append(score)
    
    print('Average Score:',sum(scores)/len(scores))
    print('choice 1:{}  choice 0:{}'.format(float((choices.count(1))/float(len(choices)))*100,float((choices.count(0))/float(len(choices)))*100))
    print(score_requirement)

    This is what the result will look like:

    Conclusion

    Though we haven’t used the Reinforcement Learning model in this blog, the normal fully connected neural network gave us a satisfactory accuracy of 60%. We used tflearn, which is a higher level API on top of Tensorflow for speeding-up experimentation. We hope that this blog will give you a head start in using OpenAI Gym.

    We are waiting to see exciting implementations using Gym and Reinforcement Learning. Happy Coding!