Tag: data science

  • Machine Learning for your Infrastructure: Anomaly Detection with Elastic + X-Pack

    Introduction

    The world continues to go through digital transformation at an accelerating pace. Modern applications and infrastructure continues to expand and operational complexity continues to grow. According to a recent ManageEngine Application Performance Monitoring Survey:

    • 28 percent use ad-hoc scripts to detect issues in over 50 percent of their applications.
    • 32 percent learn about application performance issues from end users.
    • 59 percent trust monitoring tools to identify most performance deviations.

    Most enterprises and web-scale companies have instrumentation & monitoring capabilities with an ElasticSearch cluster. They have a high amount of collected data but struggle to use it effectively. This available data can be used to improve availability and effectiveness of performance and uptime along with root cause analysis and incident prediction

    IT Operations & Machine Learning

    Here is the main question: How to make sense of the huge piles of collected data? The first step towards making sense of data is to understand the correlations between the time series data. But only understanding will not work since correlation does not imply causation. We need a practical and scalable approach to understand the cause-effect relationship between data sources and events across complex infrastructure of VMs, containers, networks, micro-services, regions, etc.

    It’s very likely that due to one component something goes wrong with another component. In such cases, operational historical data can be used to identify the root cause by investigating through a series of intermediate causes and effects. Machine learning is particularly useful for such problems where we need to identify “what changed”, since machine learning algorithms can easily analyze existing data to understand the patterns, thus making easier to recognize the cause. This is known as unsupervised learning, where the algorithm learns from the experience and identifies similar patterns when they come along again.

    Let’s see how you can setup Elastic + X-Pack to enable anomaly detection for your infrastructure & applications.

    Anomaly Detection using Elastic’s machine learning with X-Pack

    Step I: Setup

    1. Setup Elasticsearch: 

    According to Elastic documentation, it is recommended to use the Oracle JDK version 1.8.0_131. Check if you have required Java version installed on your system. It should be at least Java 8, if required install/upgrade accordingly.

    • Download elasticsearch tarball and untar it
    $ wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.5.1.tar.gz
    $ tar -xzvf elasticsearch-5.5.1.tar.gz

    • It will then create a folder named elasticsearch-5.5.1. Go into the folder.
    $ cd elasticsearch-5.5.1

    • Install X-Pack into Elasticsearch
    $ ./bin/elasticsearch-plugin install x-pack

    • Start elasticsearch
    $ bin/elasticsearch

    2. Setup Kibana

    Kibana is an open source analytics and visualization platform designed to work with Elasticsearch.

    • Download kibana tarball and untar it
    $ wget https://artifacts.elastic.co/downloads/kibana/kibana-5.5.1-linux-x86_64.tar.gz
    $ tar -xzf kibana-5.5.1-linux-x86_64.tar.gz

    • It will then create a folder named kibana-5.5.1. Go into the directory.
    $ cd kibana-5.5.1-linux-x86_64

    • Install X-Pack into Kibana
    $ ./bin/kibana-plugin install x-pack

    • Running kibana
    $ ./bin/kibana

    • Navigate to Kibana at http://localhost:5601/
    • Log in as the built-in user elastic and password changeme.
    • You will see the below screen:
    Kibana: X-Pack Welcome Page

     

    3. Metricbeat:

    Metricbeat helps in monitoring servers and the services they host by collecting metrics from the operating system and services. We will use it to get CPU utilization metrics of our local system in this blog.

    • Download Metric Beat’s tarball and untar it
    $ wget https://artifacts.elastic.co/downloads/beats/metricbeat/metricbeat-5.5.1-linux-x86_64.tar.gz
    $ tar -xvzf metricbeat-5.5.1-linux-x86_64.tar.gz

    • It will create a folder metricbeat-5.5.1-linux-x86_64. Go to the folder
    $ cd metricbeat-5.5.1-linux-x86_64

    • By default, Metricbeat is configured to send collected data to elasticsearch running on localhost. If your elasticsearch is hosted on any server, change the IP and authentication credentials in metricbeat.yml file.
     Metricbeat Config

     

    • Metric beat provides following stats:
    • System load
    • CPU stats
    • IO stats
    • Per filesystem stats
    • Per CPU core stats
    • File system summary stats
    • Memory stats
    • Network stats
    • Per process stats
    • Start Metricbeat as daemon process
    $ sudo ./metricbeat -e -c metricbeat.yml &

    Now, all setup is done. Let’s go to step 2 to create machine learning jobs. 

    Step II: Time Series data

    • Real-time data: We have metricbeat providing us the real-time series data which will be used for unsupervised learning. Follow below steps to define index pattern metricbeat-*  in Kibana to search against this pattern in Elasticsearch:
      – Go to Management -> Index Patterns  
      – Provide Index name or pattern as metricbeat-*
      – Select Time filter field name as @timestamp
      – Click Create

    You will not be able to create an index if elasticsearch did not contain any metric beat data. Make sure your metric beat is running and output is configured as elasticsearch.

    • Saved Historic data: Just to see quickly how machine learning detect the anomalies you can also use data provided by Elastic. Download sample data by clicking here.
    • Unzip the files in a folder: tar -zxvf server_metrics.tar.gz
    • Download this script. It will be used to upload sample data to elastic.
    • Provide execute permissions to the file: chmod +x upload_server-metrics.sh
    • Run the script.
    • As we created index pattern for metricbeat data, in same way create index pattern server-metrics*

    Step III: Creating Machine Learning jobs

    There are two scenarios in which data is considered anomalous. First, when the behavior of key indicator changes over time relative to its previous behavior. Secondly, when within a population behavior of an entity deviates from other entities in population over single key indicator.

    To detect these anomalies, there are three types of jobs we can create:

    1. Single Metric job: This job is used to detect Scenario 1 kind of anomalies over only one key performance indicator.
    2. Multimetric job: Multimetric job also detects Scenario 1 kind of anomalies but in this type of job we can track more than one performance indicators, such as CPU utilization along with memory utilization.
    3. Advanced job: This kind of job is created to detect anomalies of type 2.

    For simplicity, we are creating following single metric jobs:

    1. Tracking CPU Utilization: Using metric beat data
    2. Tracking total requests made on server: Using sample server data

    Follow below steps to create single metric jobs:

    Job1: Tracking CPU Utilization

    Job2: Tracking total requests made on server

    • Go to http://localhost:5601/
    • Go to Machine learning tab on the left panel of Kibana.
    • Click on Create new job
    • Click Create single metric job
    • Select index we created in Step 2 i.e. metricbeat-* and server-metrics* respectively
    • Configure jobs by providing following values:
    1. Aggregation: Here you need to select an aggregation function that will be applied to a particular field of data we are analyzing.
    2. Field: It is a drop down, will show you all field that you have w.r.t index pattern.
    3. Bucket span: It is interval time for analysis. Aggregation function will be applied on selected field after every interval time specified here.
    • If your data contains so many empty buckets i.e. data is sparse and you don’t want to consider it as anomalous check the checkbox named sparse data  (if it appears).
    • Click on Use full <index pattern=””> data to use all available data for analysis.</index>
    Metricbeats Description
    Server Description
    • Click on play symbol
    • Provide job name and description
    • Click on Create Job

    After creating job the data available will be analyzed. Click on view results, you will see a chart which will show the actual and upper & lower bound of predicted value. If actual value lies outside of the range, it will be considered as anomalous. The Color of the circles represents the severity level.

    Here we are getting a high range of prediction values since it just started learning. As we get more data the prediction will get better.
    You can see here predictions are pretty good since there is a lot of data to understand the pattern
    • Click on machine learning tab in the left panel. The jobs we created will be listed here.
    • You will see the list of actions for every job you have created.
    • Since we are storing every minute data for Job1 using metricbeat. We can feed the data to the job in real time. Click on play button to start data feed. As we get more and more data prediction will improve.
    • You see details of anomalies by clicking Anomaly Viewer.
    Anomaly in metricbeats data
    Server metrics anomalies  

    We have seen how machine learning can be used to get patterns among the different statistics along with anomaly detection. After identifying anomalies, it is required to find the context of those events. For example, to know about what other factors are contributing to the problem? In such cases, we can troubleshoot by creating multimetric jobs.

  • Explanatory vs. Predictive Models in Machine Learning

    My vision on Data Analysis is that there is continuum between explanatory models on one side and predictive models on the other side. The decisions you make during the modeling process depend on your goal. Let’s take Customer Churn as an example, you can ask yourself why are customers leaving? Or you can ask yourself which customers are leaving? The first question has as its primary goal to explain churn, while the second question has as its primary goal to predict churn. These are two fundamentally different questions and this has implications for the decisions you take along the way. The predictive side of Data Analysis is closely related to terms like Data Mining and Machine Learning.

    SPSS & SAS

    When we’re looking at SPSS and SAS, both of these languages originate from the explanatory side of Data Analysis. They are developed in an academic environment, where hypotheses testing plays a major role. This makes that they have significant less methods and techniques in comparison to R and Python. Nowadays, SAS and SPSS both have data mining tools (SAS Enterprise Miner and SPSS Modeler), however these are different tools and you’ll need extra licenses.

    I have spent some time to build extensive macros in SAS EG to seamlessly create predictive models, which also does a decent job at explaining the feature importance. While a Neural Network may do a fair job at making predictions, it is extremely difficult to explain such models, let alone feature importance. The macros that I have built in SAS EG does precisely the job of explaining the features, apart from producing excellent predictions.

    Open source TOOLS: R & PYTHON

    One of the major advantages of open source tools is that the community continuously improves and increases functionality. R was created by academics, who wanted their algorithms to spread as easily as possible. R has the widest range of algorithms, which makes R strong on the explanatory side and on the predictive side of Data Analysis.

    Python is developed with a strong focus on (business) applications, not from an academic or statistical standpoint. This makes Python very powerful when algorithms are directly used in applications. Hence, we see that the statistical capabilities are primarily focused on the predictive side. Python is mostly used in Data Mining or Machine Learning applications where a data analyst doesn’t need to intervene. Python is therefore also strong in analyzing images and videos. Python is also the easiest language to use when using Big Data Frameworks like Spark. With the plethora of packages and ever improving functionality, Python is a very accessible tool for data scientists.

    MACHINE LEARNING MODELS

    While procedures like Logistic Regression are very good at explaining the features used in a prediction, some others like Neural Networks are not. The latter procedures may be preferred over the former when it comes to only prediction accuracy and not explaining the models. Interpreting or explaining the model becomes an issue for Neural Networks. You can’t just peek inside a deep neural network to figure out how it works. A network’s reasoning is embedded in the behavior of numerous simulated neurons, arranged into dozens or even hundreds of interconnected layers. In most cases the Product Marketing Officer may be interested in knowing what are the factors that are most important for a specific advertising project. What can they concentrate on to get the response rates higher, rather than, what will be their response rate, or revenues in the upcoming year. These questions are better answered by procedures which can be interpreted in an easier way. This is a great article about the technical and ethical consequences of the lack of explanations provided by complex AI models.

    Procedures like Decision Trees are very good at explaining and visualizing what exactly are the decision points (features and their metrics). However, those do not produce the best models. Random Forests, Boosting are the procedures which use Decision Trees as the basic starting point to build the predictive models, which are by far some of the best methods to build sophisticated prediction models.

    While Random Forests use fully grown (highly complex) Trees, and by taking random samples from the training set (a process called Bootstrapping), then each split uses only a proper subset of features from the entire feature set to actually make the split, rather than using all of the features. This process of bootstrapping helps with lower number of training data (in many cases there is no choice to get more data). The (proper) subsetting of the features has a tremendous effect on de-correlating the Trees grown in the Forest (hence randomizing it), leading to a drop in Test Set error. A fresh subset of features is chosen at each step of splitting, making the method robust. The strategy also stops the strongest feature from appearing each time a split is considered, making all the trees in the forest similar. The final result is obtained by averaging the result over all trees (in case of Regression problems), or by taking a majority class vote (in case of classification problem).

    On the other hand, Boosting is a method where a Forest is grown using Trees which are NOT fully grown, or in other words, with Weak Learners. One has to specify the number of trees to be grown, and the initial weights of those trees for taking a majority vote for class selection. The default weight, if not specified is the average of the number of trees requested. At each iteration, the method fits these weak learners, finds the residuals. Then the weights of those trees which failed to predict the correct class is increased so that those trees can concentrate better on the failed examples. This way, the method proceeds by improving the accuracy of the Boosted Trees, stopping when the improvement is below a threshold. One particularly implementation of Boosting, AdaBoost has very good accuracy over other implementations. AdaBoost uses Trees of depth 1, known as Decision Stump as each member of the Forest. These are slightly better than random guessing to start with, but over time they learn the pattern and perform extremely well on test set. This method is more like a feedback control mechanism (where the system learns from the errors). To address overfitting, one can use the hyper-parameter Learning Rate (lambda) by choosing values in the range: (0,1]. Very small values of lambda will take more time to converge, however larger values may have difficulty converging. This can be achieved by a iterative process to select the correct value for lambda, plotting the test error rate against values of lambda. The value of lambda with the lowest test error should be chosen.

    In all these methods, as we move from Logistic Regression, to Decision Trees to Random Forests and Boosting, the complexity of the models increase, making it almost impossible to EXPLAIN the Boosting model to marketers/product managers. Decision Trees are easy to visualize, Logisitic Regression results can be used to demonstrate the most important factors in a customer acquisition model and hence will be well received by business leaders. On the other hand, the Random Forest and Boosting methods are extremely good predictors, without much scope for explaining. But there is hope: These models have functions for revealing the most important variables, although it is not possible to visualize why. 

    USING A BALANCED APPROACH

    So I use a mixed strategy: Use the previous methods as a step in Exploratory Data Analysis, present the importance of features, characteristics of the data to the business leaders in phase one, and then use the more complicated models to build the prediction models for deployment, after building competing models. That way, one not only gets to understand what is happening and why, but also gets the best predictive power. In most cases that I have worked, I have rarely seen a mismatch between the explanation and the predictions using different methods. After all, this is all math and the way of delivery should not change end results. Now that’s a happy ending for all sides of the business!

  • Exploring OpenAI Gym: A Platform for Reinforcement Learning Algorithms

    Introduction 

    According to the OpenAI Gym GitHub repository “OpenAI Gym is a toolkit for developing and comparing reinforcement learning algorithms. This is the gym open-source library, which gives you access to a standardized set of environments.”

    Open AI Gym has an environment-agent arrangement. It simply means Gym gives you access to an “agent” which can perform specific actions in an “environment”. In return, it gets the observation and reward as a consequence of performing a particular action in the environment.

    There are four values that are returned by the environment for every “step” taken by the agent.

    1. Observation (object): an environment-specific object representing your observation of the environment. For example, board state in a board game etc
    2. Reward (float): the amount of reward/score achieved by the previous action. The scale varies between environments, but the goal is always to increase your total reward/score.
    3. Done (boolean): whether it’s time to reset the environment again. E.g you lost your last life in the game.
    4. Info (dict): diagnostic information useful for debugging. However, official evaluations of your agent are not allowed to use this for learning.

    Following are the available Environments in the Gym:

    1. Classic control and toy text
    2. Algorithmic
    3. Atari
    4. 2D and 3D robots

    Here you can find a full list of environments.

    Cart-Pole Problem

    Here we will try to write a solve a classic control problem from Reinforcement Learning literature, “The Cart-pole Problem”.

    The Cart-pole problem is defined as follows:
    “A pole is attached by an un-actuated joint to a cart, which moves along a frictionless track. The system is controlled by applying a force of +1 or -1 to the cart. The pendulum starts upright, and the goal is to prevent it from falling over. A reward of +1 is provided for every timestep that the pole remains upright. The episode ends when the pole is more than 15 degrees from vertical, or the cart moves more than 2.4 units from the center.”

    The following code will quickly allow you see how the problem looks like on your computer.

    import gym
    env = gym.make('CartPole-v0')
    env.reset()
    for _ in range(1000):
        env.render()
        env.step(env.action_space.sample())

    This is what the output will look like:

    Coding the neural network 

    #We first import the necessary libraries and define hyperparameters - 
    
    import gym
    import random
    import numpy as np
    import tflearn
    from tflearn.layers.core import input_data, dropout, fully_connected
    from tflearn.layers.estimator import regression
    from statistics import median, mean
    from collections import Counter
    
    LR = 2.33e-4
    env = gym.make("CartPole-v0")
    observation = env.reset()
    goal_steps = 500
    score_requirement = 50
    initial_games = 10000
    
    #Now we will define a function to generate training data - 
    
    def initial_population():
        # [OBS, MOVES]
        training_data = []
        # all scores:
        scores = []
        # scores above our threshold:
        accepted_scores = []
        # number of episodes
        for _ in range(initial_games):
            score = 0
            # moves specifically from this episode:
            episode_memory = []
            # previous observation that we saw
            prev_observation = []
            for _ in range(goal_steps):
                # choose random action left or right i.e (0 or 1)
                action = random.randrange(0,2)
                observation, reward, done, info = env.step(action)
                # since that the observation is returned FROM the action
                # we store previous observation and corresponding action
                if len(prev_observation) > 0 :
                    episode_memory.append([prev_observation, action])
                prev_observation = observation
                score+=reward
                if done: break
    
            # reinforcement methodology here.
            # IF our score is higher than our threshold, we save
            # all we're doing is reinforcing the score, we're not trying
            # to influence the machine in any way as to HOW that score is
            # reached.
            if score >= score_requirement:
                accepted_scores.append(score)
                for data in episode_memory:
                    # convert to one-hot (this is the output layer for our neural network)
                    if data[1] == 1:
                        output = [0,1]
                    elif data[1] == 0:
                        output = [1,0]
    
                    # saving our training data
                    training_data.append([data[0], output])
    
            # reset env to play again
            env.reset()
            # save overall scores
            scores.append(score)
    
    # Now using tflearn we will define our neural network 
    
    def neural_network_model(input_size):
    
        network = input_data(shape=[None, input_size, 1], name='input')
    
        network = fully_connected(network, 128, activation='relu')
        network = dropout(network, 0.8)
    
        network = fully_connected(network, 256, activation='relu')
        network = dropout(network, 0.8)
    
        network = fully_connected(network, 512, activation='relu')
        network = dropout(network, 0.8)
    
        network = fully_connected(network, 256, activation='relu')
        network = dropout(network, 0.8)
    
        network = fully_connected(network, 128, activation='relu')
        network = dropout(network, 0.8)
    
        network = fully_connected(network, 2, activation='softmax')
        network = regression(network, optimizer='adam', learning_rate=LR, loss='categorical_crossentropy', name='targets')
        model = tflearn.DNN(network, tensorboard_dir='log')
    
        return model
    
    #It is time to train the model now -
    
    def train_model(training_data, model=False):
    
        X = np.array([i[0] for i in training_data]).reshape(-1,len(training_data[0][0]),1)
        y = [i[1] for i in training_data]
    
        if not model:
            model = neural_network_model(input_size = len(X[0]))
    
        model.fit({'input': X}, {'targets': y}, n_epoch=5, snapshot_step=500, show_metric=True, run_id='openai_CartPole')
        return model
    
    training_data = initial_population()
    
    model = train_model(training_data)
    
    #Training complete, now we should play the game to see how the output looks like 
    
    scores = []
    choices = []
    for each_game in range(10):
        score = 0
        game_memory = []
        prev_obs = []
        env.reset()
        for _ in range(goal_steps):
            env.render()
    
            if len(prev_obs)==0:
                action = random.randrange(0,2)
            else:
                action = np.argmax(model.predict(prev_obs.reshape(-1,len(prev_obs),1))[0])
    
            choices.append(action)
    
            new_observation, reward, done, info = env.step(action)
            prev_obs = new_observation
            game_memory.append([new_observation, action])
            score+=reward
            if done: break
    
        scores.append(score)
    
    print('Average Score:',sum(scores)/len(scores))
    print('choice 1:{}  choice 0:{}'.format(float((choices.count(1))/float(len(choices)))*100,float((choices.count(0))/float(len(choices)))*100))
    print(score_requirement)

    This is what the result will look like:

    Conclusion

    Though we haven’t used the Reinforcement Learning model in this blog, the normal fully connected neural network gave us a satisfactory accuracy of 60%. We used tflearn, which is a higher level API on top of Tensorflow for speeding-up experimentation. We hope that this blog will give you a head start in using OpenAI Gym.

    We are waiting to see exciting implementations using Gym and Reinforcement Learning. Happy Coding!

  • A Quick Introduction to Data Analysis With Pandas

    Python is a great language for doing data analysis, primarily because of the fantastic ecosystem of data-centric Python packages. Pandas is one of those packages and makes importing and analyzing data much easier.

    Pandas aims to integrate the functionality of NumPy and matplotlib to give you a convenient tool for data analytics and visualization. Besides the integration,  it also makes the usage far more better.

    In this blog, I’ll give you a list of useful pandas snippets that can be reused over and over again. These will definitely save you some time that you may otherwise need to skim through the comprehensive Pandas docs.

    The data structures in Pandas are capable of holding elements of any type: Series, DataFrame.

    Series

    A one-dimensional object that can hold any data type such as integers, floats, and strings

    A Series object can be created of different values. Series can be remembered similar to a Python list.

    In the below example, NaN is NumPy’s nan symbol which tells us that the element is not a number but it can be used as one numerical type pointing out to be not a number. The type of series is an object because the series has mixed contents of strings and numbers.

    >>> import pandas as pd
    >>> import numpy as np
    >>> series = pd.Series([12,32,54,2, np.nan, "a string", 6])
    >>> series
    0          12
    1          32
    2          54
    3           2
    4         NaN
    5    a string
    6           6
    dtype: object

    Now if we use only numerical values, we get the basic NumPy dtype – float for our series.

    >>> series = pd.Series([1,2,np.nan, 4])
    >>> series
    0    1.0
    1    2.0
    2    NaN
    3    4.0
    dtype: float64

    DataFrame

    A two-dimensional labeled data structure where columns can be of different types.

    Each column in a Pandas DataFrame represents a Series object in memory.

    In order to convert a certain Python object (dictionary, lists, etc) to a DataFrame, it is extremely easy. From the python dictionaries, the keys map to Column names while values correspond to a list of column values.

    >>> d = {
        "stats": pd.Series(np.arange(10,15,1.0)),
        "year": pd.Series(["2012","2007","2012","2003"]),
        "intake": pd.Series(["SUMMER","WINTER","WINTER","SUMMER"]),
    }
    >>> df = pd.DataFrame(d)
    >>> df

    Reading CSV files

    Pandas can work with various file types while reading any file you need to remember.

    pd.read_filetype()

    Now you will have to only replace “filetype” with the actual type of the file, like csv or excel. You will have to give the path of the file inside the parenthesis as the first argument. You can also pass in different arguments that relate to opening the file. (Reading a csv file? See this)

    >>> df = pd.read_csv('companies.csv')
    >>> df.head()
    view raw

    Accessing Columns and Rows

    DataFrame comprises of three sub-components, the indexcolumns, and the data (also known as values).

    The index represents a sequence of values. In the DataFrame, it always on the left side. Values in an index are in bold font. Each individual value of the index is called a label. Index is like positions while the labels are values at that particular index. Sometimes the index is also referred to as row labels. In all the examples below, the labels and indexes are the same and are just integers beginning from 0 up to n-1, where n is the number of rows in the table.

    Selecting rows is done using loc and iloc:

    • loc gets rows (or columns) with particular labels from the index. Raises KeyError when the items are not found.
    • iloc gets rows (or columns) at particular positions/index (so it only takes integers). Raises IndexError if a requested indexer is out-of-bounds.
    >>> df.loc[:5]              #similar to df.head()

    Accessing the data using column names

    Pandas takes an extra step and allows us to access data through labels in DataFrames.

    >>> df.loc[:5, ["name","vertical", "url"]]

    In Pandas, selecting data is very easy and similar to accessing an element from a dictionary or a list.

    You can select a column (df[col_name]) and it will return column with label col_name as a Series, because rows and columns are stored as Series in a DataFrame, If you need to access more columns (df[[col_name_1, col_name_2]]) and it returns columns as a new DataFrame.

    Filtering DataFrames with Conditional Logic

    Let’s say we want all the companies with the vertical as B2B, the logic would be:

    >>> df[(df['vertical'] == 'B2B')]

    If we want the companies for the year 2009, we would use:

    >>> df[(df['year'] == 2009)]

    Need to combine them both? Here’s how you would do it:

    >>> df[(df['vertical'] == 'B2B') & (df['year'] == 2009)]

    Get all companies with vertical as B2B for the year 2009

    Sort and Groupby

    Sorting

    Sort values by a certain column in ascending order by using:

    >>> df.sort_values(colname)

    >>> df.sort_values(colname,ascending=False)

    Furthermore, it’s also possible to sort values by multiple columns with different orders. colname_1 is being sorted in ascending order and colname_2 in descending order by using:

    >>> df.sort_values([colname_1,colname_2],ascending=[True,False])

    Grouping

    This operation involves 3 steps; splitting of the data, applying a function on each of the group, and finally combining the results into a data structure. This can be used to group large amounts of data and compute operations on these groups.

    df.groupby(colname) returns a groupby object for values from one column while df.groupby([col1,col2]) returns a groupby object for values from multiple columns.

    Data Cleansing

    Data cleaning is a very important step in data analysis.

    Checking missing values in the data

    Check null values in the DataFrame by using:

    >>> df.isnull()

    This returns a boolean array (an array of true for missing values and false for non-missing values).

    >>> df.isnull().sum()

    Check non null values in the DataFrame using pd.notnull(). It returns a boolean array, exactly converse of df.notnull()

    Removing Empty Values

    Dropping empty values can be done easily by using:

    >>> df.dropna()

    This drops the rows having empty values or df.dropna(axis=1) to drop the columns.

    Also, if you wish to fill the missing values with other values, use df.fillna(x). This fills all the missing values with the value x (here you can put any value that you want) or s.fillna(s.mean()) which replaces null values with the mean (mean can be replaced with any function from the arithmetic section).

    Operations on Complete Rows, Columns, or Even All Data

    >>> df["url_len"] = df["url"].map(len)

    The .map() operation applies a function to each element of a column.

    .apply() applies a function to columns. Use .apply(axis=1) to do it on the rows.

    Iterating over rows

    Very similar to iterating any of the python primitive types such as list, tuples, dictionaries.

    >>> for i, row in df.iterrows():
            print("Index {0}".format(i))
            print("Row {0}".format(row))

    The .iterrows() loops 2 variables together i.e, the index of the row and the row itself, variable is the index and variable row is the row in the code above.

    Tips & Tricks

    Using ufuncs (also known as Universal Functions). Python has the .apply() which applies a function to columns/rows. Similarly, Ufuncs can be used while preprocessing. What is the difference between ufuncs and .apply()?

    Ufuncs is a numpy library, implemented in C which is highly efficient (ufuncs are around 10 times faster).

    A list of common Ufuncs:

    isinf: Element-wise checks for positive or negative infinity.

    isnan: Element-wise checks for NaN and returns result as a boolean array.

    isnat: Element-wise checks for NaT (not time) and returns result as a boolean array.

    trunc: Return the truncated value of the input, element-wise.

    .dt commands: Element-wise processing for date objects.

    High-Performance Pandas

    Pandas performs various vectorized/broadcasted operations and grouping-type operations. These operations are efficient and effective.

    As of version 0.13, Pandas included tools that allow us to directly access C-speed operations without costly allocation of intermediate arrays. There are two functions, eval() and query().

    DataFrame.eval() for efficient operations:

    >>> import pandas as pd
    >>> nrows, ncols = 100000, 100
    >>> rng = np.random.RandomState(42)
    >>> df1, df2, df3, df4 = (pd.DataFrame(rng.rand(nrows, ncols))
                          for i in range(4))

    To compute the sum of df1, df2, df3, and df4 DataFrames using the typical Pandas approach, we can just write the sum:

    >>> %timeit df1 + df2 + df3 + df4
    
    10 loops, best of 3: 103.1 ms per loop

    A better and optimized approach for the same operation can be computed via pd.eval():

    >>> %timeit pd.eval('df1 + df2 + df3 + df4')
    
    10 loops, best of 3: 53.6 ms per loop

    %timeit — Measure execution time of small code snippets.

    The eval() expression is about 50% faster (it also consumes mush less memory).

    And it performs the same result:

    >>> np.allclose(df1 + df2 + df3 + df4,d.eval('df1 + df2 + df3 + df4'))
    
    True

    np.allclose() is a numpy function which returns True if two arrays are element-wise equal within a tolerance.

    Column-Wise & Assignment Operations Using df.eval()

    Normal expression to split the first character of a column and assigning it to the same column can be done by using:

    >>> df['batch'] = df['batch'].str[0]

    By using df.eval(), same expression can be performed much faster:

    >>> df.eval("batch=batch.str[0]")

    DataFrame.query() for efficient operations:

    Similar to performing filtering operations with conditional logic, to filter rows with vertical as B2B and year as 2009, we do it by using:

    >>> %timeit df[(df['vertical'] == 'B2B') & (df['year'] == 2009)]
    
    1.69 ms ± 57 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

    With .query() the same filtering can be performed about 50% faster.

    >>> %timeit df.query("vertical == 'B2B' and year == 2009")
    
    875 µs ± 24.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

    When to use eval() and query()? 

    Two aspects: computation time and memory usage. 

    Memory usage: Every operation which involves NumPy/Pandas DataFrames results into implicit creation of temporary variables. In such cases, if the memory usage of these temporary variables is greater, using eval() and query() is an appropriate choice to reduce the memory usage.

    Computation time: Traditional method of performing NumPy/Pandas operations is faster for smaller arrays! The real benefit of eval()/query() is achieved mainly because of the saved memory, and also because of the cleaner syntax they offer.

    Conclusion

    Pandas is a powerful and fun library for data manipulation/analysis. It comes with easy syntax and fast operations. The blog highlights the most used pandas implementation and optimizations. Best way to master your skills over pandas is to use real datasets, beginning with Kaggle kernels to learning how to use pandas for data analysis. Check out more on real time text classification using Kafka and Scikit-learn and explanatory vs. predictive models in machine learning here.  

  • Lessons Learnt While Building an ETL Pipeline for MongoDB & Amazon Redshift Using Apache Airflow

    Recently, I was involved in building an ETL (Extract-Transform-Load) pipeline. It included extracting data from MongoDB collections, perform transformations and then loading it into Redshift tables. Many ETL solutions are available in the market which kind-of solves the issue, but the key part of an ETL process lies in its ability to transform or process raw data before it is pushed to its destination.

    Each ETL pipeline comes with a specific business requirement around processing data which is hard to be achieved using off-the-shelf ETL solutions. This is why a majority of ETL solutions are custom built manually, from scratch. In this blog, I am going to talk about my learning around building a custom ETL solution which involved moving data from MongoDB to Redshift using Apache Airflow.

    Background:

    I began by writing a Python-based command line tool which supported different phases of ETL, like extracting data from MongoDB, processing extracted data locally, uploading the processed data to S3, loading data from S3 to Redshift, post-processing and cleanup. I used the PyMongo library to interact with MongoDB and the Boto library for interacting with Redshift and S3.

    I kept each operation atomic so that multiple instances of each operation can run independently of each other, which will help to achieve parallelism. One of the major challenges was to achieve parallelism while running the ETL tasks. One option was to develop our own framework based on threads or developing a distributed task scheduler tool using a message broker tool like Celery combined with RabbitMQ. After doing some research I settled for Apache Airflow. Airflow is a Python-based scheduler where you can define DAGs (Directed Acyclic Graphs), which would run as per the given schedule and run tasks in parallel in each phase of your ETL. You can define DAG as Python code and it also enables you to handle the state of your DAG run using environment variables. Features like task retries on failure handling are a plus.

    We faced several challenges while getting the above ETL workflow to be near real-time and fault tolerant. We discuss the challenges faced and the solutions below:

    Keeping your ETL code changes in sync with Redshift schema

    While you are building the ETL tool, you may end up fetching a new field from MongoDB, but at the same time, you have to add that column to the corresponding Redshift table. If you fail to do so the ETL pipeline will start failing. In order to tackle this, I created a database migration tool which would become the first step in my ETL workflow.

    The migration tool would:

    • keep the migration status in a Redshift table and
    • would track all migration scripts in a code directory.

    In each ETL run, it would get the most recently ran migrations from Redshift and would search for any new migration script available in the code directory. If found it would run the newly found migration script after which the regular ETL tasks would run. This adds the onus on the developer to add a migration script if he is making any changes like addition or removal of a field that he is fetching from MongoDB.

    Maintaining data consistency

    While extracting data from MongoDB, one needs to ensure all the collections are extracted at a specific point in time else there can be data inconsistency issues. We need to solve this problem at multiple levels:

    • While extracting data from MongoDB define parameters like modified date and extract data from different collections with a filter as records less than or equal to that date. This will ensure you fetch point in time data from MongoDB.
    • While loading data into Redshift tables, don’t load directly to master table, instead load it to some staging table. Once you are done loading data in staging for all related collections, load it to master from staging within a single transaction. This way data is either updated in all related tables or in none of the tables.

    A single bad record can break your ETL

    While moving data across the ETL pipeline into Redshift, one needs to take care of field formats. For example, the Date field in the incoming data can be different than that in the Redshift schema design. Another example can be that the incoming data can exceed the length of the field in the schema. Redshift’s COPY command which is used to load data from files to redshift tables is very vulnerable to such changes in data types. Even a single incorrectly formatted record will lead to all your data getting rejected and effectively breaking the ETL pipeline.

    There are multiple ways in which we can solve this problem. Either handle it in one of the transform jobs in the pipeline. Alternately we put the onus on Redshift to handle these variances. Redshift’s COPY command has many options which can help you solve these problems. Some of the very useful options are

    • ACCEPTANYDATE: Allows any date format, including invalid formats such as 00/00/00 00:00:00, to be loaded without generating an error.
    • ACCEPTINVCHARS: Enables loading of data into VARCHAR columns even if the data contains invalid UTF-8 characters.
    • TRUNCATECOLUMNS: Truncates data in columns to the appropriate number of characters so that it fits the column specification.

    Redshift going out of storage

    Redshift is based on PostgreSQL and one of the common problems is when you delete records from Redshift tables it does not actually free up space. So if your ETL process is deleting and creating new records frequently, then you may run out of Redshift storage space. VACUUM operation for Redshift is the solution to this problem. Instead of making VACUUM operation a part of your main ETL flow, define a different workflow which runs on a different schedule to run VACUUM operation. VACUUM operation reclaims space and resorts rows in either a specified table or all tables in the current database. VACUUM operation can be FULL, SORT ONLY, DELETE ONLY & REINDEX. More information on VACUUM can be found here.

    ETL instance going out of storage

    Your ETL will be generating a lot of files by extracting data from MongoDB onto your ETL instance. It is very important to periodically delete those files otherwise you are very likely to go out of storage on your ETL server. If your data from MongoDB is huge, you might end up creating large files on your ETL server. Again, I would recommend defining a different workflow which runs on a different schedule to run a cleanup operation.

    Making ETL Near Real Time

    Processing only the delta rather than doing a full load in each ETL run

    ETL would be faster if you keep track of the already processed data and process only the new data. If you are doing a full load of data in each ETL run, then the solution would not scale as your data scales. As a solution to this, we made it mandatory for the collection in our MongoDB to have a created and a modified date. Our ETL would check the maximum value of the modified date for the given collection from the Redshift table. It will then generate the filter query to fetch only those records from MongoDB which have modified date greater than that of the maximum value. It may be difficult for you to make changes in your product, but it’s worth the effort!

    Compressing and splitting files while loading

    A good approach is to write files in some compressed format. It saves your storage space on ETL server and also helps when you load data to Redshift. Redshift COPY command suggests that you provide compressed files as input. Also instead of a single huge file, you should split your files into parts and give all files to a single COPY command. This will enable Redshift to use it’s computing resources across the cluster to do the copy in parallel, leading to faster loads.

    Streaming mongo data directly to S3 instead of writing it to ETL server

    One of the major overhead in the ETL process is to write data first to ETL server and then uploading it to S3. In order to reduce disk IO, you should not store data to ETL server. Instead, use MongoDB’s handy stream API. For MongoDB Node driver, both the collection.find() and the collection.aggregate() function return cursors. The stream method also accepts a transform function as a parameter. All your custom transform logic could go into the transform function. AWS S3’s node library’s upload() function, also accepts readable streams. Use the stream from the MongoDB Node stream method, pipe it into zlib to gzip it, then feed the readable stream into AWS S3’s Node library. Simple! You will see a large improvement in your ETL process by this simple but important change.

    Optimizing Redshift Queries

    Optimizing Redshift Queries helps in making the ETL system highly scalable, efficient and also reduce the cost. Lets look at some of the approaches:

    Add a distribution key

    Redshift database is clustered, meaning your data is stored across cluster nodes. When you query for certain set of records, Redshift has to search for those records in each node, leading to slow queries. A distribution key is a single metric, which will decide the data distribution of all data records across your tables. If you have a single metric which is available for all your data, you can specify it as distribution key. When loading data into Redshift, all data for a certain value of distribution key will be placed on a single node of Redshift cluster. So when you query for certain records Redshift knows exactly where to search for your data. This is only useful when you are also using the distribution key to query the data.

    Source: Slideshare

     

    Generating a numeric primary key for string primary key

    In MongoDB, you can have any type of field as your primary key. If your Mongo collections are having a non-numeric primary key and you are using those same keys in Redshift, your joins will end up being on string keys which are slower. Instead, generate numeric keys for your string keys and joining on it which will make queries run much faster. Redshift supports specifying a column with an attribute as IDENTITY which will auto-generate numeric unique value for the column which you can use as your primary key.

    Conclusion:

    In this blog, I have covered the best practices around building ETL pipelines for Redshift  based on my learning. There are many more recommended practices which can be easily found in Redshift and MongoDB documentation. 

  • Real Time Analytics for IoT Data using Mosquitto, AWS Kinesis and InfluxDB

    Internet of things (IoT) is maturing rapidly and it is finding application across various industries. Every common device that we use is turning into the category of smart devices. Smart devices are basically IoT devices. These devices captures various parameters in and around their environment leading to generation of a huge amount of data. This data needs to be collected, processed, stored and analyzed in order to get actionable insights from them. To do so, we need to build data pipeline.  In this blog we will be building a similar pipeline using Mosquitto, Kinesis, InfluxDB and Grafana. We will discuss all these individual components of the pipeline and the steps to build it.

    Why the Analysis of IoT data is different

    In an IoT setup, the data is generated by sensors that are distributed across various locations. In order to use the data generated by them we should first get them to a common location from where the various applications which want to process them can read it.

    Network Protocol

    IoT devices have low computational and network resources. Moreover, these devices write data in very short intervals thus high throughput is expected on the network. For transferring IoT data it is desirable to use lightweight network protocols. A protocol like HTTP uses a complex structure for communication resulting in consumption of more resources making it unsuitable for IoT data transfer. One of the lightweight protocol suitable for IoT data is MQTT which we are using in our pipeline. MQTT is designed for machine to machine (M2M) connectivity. It uses a publisher/subscriber communication model and helps clients to distribute telemetry data with very low network resource consumption. Along with IoT MQTT has been found to be useful in other fields as well.

    Other similar protocols include Constrained Application Protocol (CoAP), Advanced Message Queuing Protocol (AMQP) etc.

    Datastore   

    IoT devices generally collect telemetry about its environment usually through sensors. In most of the IoT scenarios, we try to analyze how things have changed over a period of time. Storing these data in a time series database makes our analysis simpler and better. InfluxDB is popular time series database which we will use in our pipeline. More about time series databases can be read here.

    Pipeline Overview

    The first thing we need for a data pipeline is data. As shown in the image above the data generated by various sensors are written to a topic in the MQTT message broker. To mimic sensors we will use a program which uses the MQTT client to write data to the MQTT broker.

    The next component is Amazon Kinesis which is used for streaming data analysis. It closely resembles apache Kafka which is an open source tool used for similar purposes. Kinesis brings the data generated by a number of clients to a single location from where different consumers can pull it for processing. We are using Kinesis so that multiple consumers can read data from a single location. This approach scales well even if we have multiple message brokers.

    Once the data is written to the MQTT broker a Kinesis producer subscribes to it and pull the data from it and writes it to the Kinesis stream, from the Kinesis stream the data is pulled by Kinesis consumers which processes the data and writes it to an InfluxDB which is a time series database.

    Finally, we use Grafana which is a well-known tool for analytics and monitoring, we can connect it to many popular databases and perform analytics and monitoring. Another popular tool in this space is Kibana (the K of ELK stack)

    Setting up a MQTT Message Broker Server:

    For MQTT message broker we will use Mosquitto which is a popular open source message broker that implements MQTT. The details of downloading and installing mosquitto for various platforms are available here.

    For Ubuntu, it can be installed using the following commands

    sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa
    sudo apt-get update
    sudo apt-get install mosquitto
    service mosquitto status

    Setting up InfluxDB and Grafana

    The simplest way to set up both these components is to use their docker image directly

    docker run --name influxdb -p 8083:8083 -p 8086:8086 influxdb:1.0
    docker run --name grafana -p 3000:3000 --link influxdb grafana/grafana:3.1.1

    In InfluxDB we have mapped two ports, port 8086 is the HTTP API endpoint port while 8083 is the administration web server’s port. We need to create a database where we will write our data.

    For creating a database we can directly go to the console at <influxdb-ip>:8083 and run the command: </influxdb-ip>

    CREATE DATABASE "iotdata"

    Or we can do it via HTTP request :

    curl -XPOST "http://localhost:8086/query" --data-urlencode "q=CREATE DATABASE iotdata

    Creating a Kinesis stream

    In Kinesis, we create streams where the Kinesis producers write the data coming from various sources and then the Kinesis consumers read the data from the stream. In the stream, the data is stored in various shards. For our purpose, one shard would be enough.

    Creating the MQTT client

    We will use the Golang client available in this repository to connect with our message broker server and write data to a specific topic. We will first create a new MQTT client. Here we can see the list of options we have for configuring our MQTT client.

    Once we create the options object we can pass it to the NewClient() method which will return us the MQTT client. Now we can write data to the MQTT server. We have defined the structure of the data in the struct sensor data. Now to mimic two sensors which are writing telemetry data to the MQTT broker we have two goroutines which push data to the MQTT server every five seconds.

    package publisher
    
    import (
    	"config"
    	"encoding/json"
    	"fmt"
    	"log"
    	"math/rand"
    	"os"
    	"time"
    
    	"github.com/eclipse/paho.mqtt.golang"
    )
    
    type SensorData struct {
    	Id          string  `json:"id"`
    	Temperature float64 `json:"temperature"`
    	Humidity    float64 `json:"humidity"`
    	Timestamp   int64   `json:"timestamp"`
    	City        string  `json:"city"`
    }
    
    func StartMQTTPublisher() {
    	fmt.Println("MQTT publisher Started")
    	mqtt.DEBUG = log.New(os.Stdout, "", 0)
    	mqtt.ERROR = log.New(os.Stdout, "", 0)
    	opts := mqtt.NewClientOptions().AddBroker(config.GetMqttServerurl()).SetClientID("MqttPublisherClient")
    	opts.SetKeepAlive(2 * time.Second)
    	opts.SetPingTimeout(1 * time.Second)
    	c := mqtt.NewClient(opts)
    	if token := c.Connect(); token.Wait() && token.Error() != nil {
    		panic(token.Error())
    	}
    
    	go func() {
    		t := 20.04
    		h := 32.06
    		for i := 0; i < 100; i++ {
    			sensordata := SensorData{
    				Id:          "CITIMUM",
    				Temperature: t,
    				Humidity:    h,
    				Timestamp:   time.Now().Unix(),
    				City:        "Mumbai",
    			}
    			requestBody, err := json.Marshal(sensordata)
    			if err != nil {
    				fmt.Println(err)
    			}
    			token := c.Publish(config.GetMQTTTopicName(), 0, false, requestBody)
    			token.Wait()
    			if i < 50 {
    				t = t + 1*rand.Float64()
    				h = h + 1*rand.Float64()
    			} else {
    				t = t - 1*rand.Float64()
    				h = h - 1*rand.Float64()
    			}
    			time.Sleep(5 * time.Second)
    		}
    	}()
    	go func() {
    		t := 16.02
    		h := 24.04
    		for i := 0; i < 100; i++ {
    			sensordata := SensorData{
    				Id:          "CITIPUN",
    				Temperature: t,
    				Humidity:    h,
    				Timestamp:   time.Now().Unix(),
    				City:        "Pune",
    			}
    			requestBody, err := json.Marshal(sensordata)
    			if err != nil {
    				fmt.Println(err)
    			}
    			token := c.Publish(config.GetMQTTTopicName(), 0, false, requestBody)
    			token.Wait()
    			if i < 50 {
    				t = t + 1*rand.Float64()
    				h = h + 1*rand.Float64()
    			} else {
    				t = t - 1*rand.Float64()
    				h = h - 1*rand.Float64()
    			}
    			time.Sleep(5 * time.Second)
    		}
    	}()
    	time.Sleep(1000 * time.Second)
    	c.Disconnect(250)
    
    }

    Create a Kinesis Producer

    Now we will create a Kinesis producer which subscribes to the topic to which our MQTT client writes data and pull the data from the broker and pushes it to the Kinesis stream. Just like in the previous section here also we first create an MQTT client which connects to the message broker and subscribe to the topic to which our clients/publishers are going to write data to. In the client option, we have the option to define a function which will be called when data is written to this topic. We have created a function postDataTokinesisStream() which connects Kinesis using the Kinesis client and then writes data to the Kinesis stream, every time a data is pushed to the topic.

    package producer
    
    import (
    	"config"
    	"fmt"
    
    	"os"
    	"time"
    
    	"github.com/aws/aws-sdk-go/service/kinesis"
    
    	mqtt "github.com/eclipse/paho.mqtt.golang"
    )
    
    func postDataTokinesisStream(client mqtt.Client, message mqtt.Message) {
    	fmt.Printf("Received message on topic: %snMessage: %sn", message.Topic(), message.Payload())
    	streamName := config.GetKinesisStreamName()
    	kclient := config.GetKinesisClient()
    	var putRecordInput kinesis.PutRecordInput
    	partitionKey := message.Topic()
    	putRecordInput.PartitionKey = &partitionKey
    	putRecordInput.StreamName = &streamName
    	putRecordInput.Data = message.Payload()
    	putRecordOutput, err := kclient.PutRecord(&putRecordInput)
    	if err != nil {
    		fmt.Println(err)
    	} else {
    		fmt.Println(putRecordOutput)
    	}
    
    }
    
    func StartKinesisProducer() {
    	fmt.Println("Kinesis Producer Started")
    	c := make(chan os.Signal, 1)
    	opts := mqtt.NewClientOptions().AddBroker(config.GetMqttServerurl()).SetClientID("MqttSubscriberClient")
    	opts.SetKeepAlive(2 * time.Second)
    	opts.SetPingTimeout(1 * time.Second)
    	opts.OnConnect = func(c mqtt.Client) {
    		if token := c.Subscribe(config.GetMQTTTopicName(), 0, postDataTokinesisStream); token.Wait() && token.Error() != nil {
    			panic(token.Error())
    		}
    	}
    
    	client := mqtt.NewClient(opts)
    	if token := client.Connect(); token.Wait() && token.Error() != nil {
    		panic(token.Error())
    	} else {
    		fmt.Printf("Connected to %sn", config.GetMqttServerurl())
    	}
    
    	<-c
    }

    Create a Kinesis Consumer

    Now the data is available in our Kinesis stream we can pull it for processing. In the Kinesis consumer section, we create a Kinesis client just like we did in the previous section and then pull data from it. Here we first make a call to the DescribeStream method which returns us the shardId, we then use this shardId to get the ShardIterator and then finally we are able to fetch the records by passing the ShardIterator to GetRecords() method. GetRecords() also returns the  NextShardIterator which we can use to continuously look for records in the shard until NextShardIterator becomes null.

    package consumer
    
    import (
    	"config"
    	"fmt"
    
    	"github.com/aws/aws-sdk-go/service/kinesis"
    	"velotio.com/dao"
    )
    
    func StartKinesisConsumer() {
    	fmt.Println("Kinesis Consumer Started")
    	client := config.GetKinesisClient()
    	streamName := config.GetKinesisStreamName()
    	var describeStreamInput kinesis.DescribeStreamInput
    	describeStreamInput.StreamName = &streamName
    	describeStreamOutput, err := client.DescribeStream(&describeStreamInput)
    	if err != nil {
    		fmt.Println(err)
    	} else {
    		fmt.Println(*describeStreamOutput.StreamDescription.Shards[0].ShardId)
    	}
    	var getShardIteratorInput kinesis.GetShardIteratorInput
    	getShardIteratorInput.ShardId = describeStreamOutput.StreamDescription.Shards[0].ShardId
    	getShardIteratorInput.StreamName = &streamName
    	shardIteratorType := "TRIM_HORIZON"
    	getShardIteratorInput.ShardIteratorType = &shardIteratorType
    	getShardIteratorOuput, err := client.GetShardIterator(&getShardIteratorInput)
    	if err != nil {
    		fmt.Println(err)
    	} else {
    		fmt.Println(*getShardIteratorOuput.ShardIterator)
    	}
    	var getRecordsInput kinesis.GetRecordsInput
    
    	getRecordsInput.ShardIterator = getShardIteratorOuput.ShardIterator
    	getRecordsOuput, err := client.GetRecords(&getRecordsInput)
    	//fmt.Println(getRecordsOuput)
    	if err != nil {
    		fmt.Println(err)
    	} else {
    		for *getRecordsOuput.NextShardIterator != "" {
    			i := 0
    			for i < len(getRecordsOuput.Records) {
    				//fmt.Println(len(getRecordsOuput.Records))
    				sdf := &dao.SensorDataFiltered{}
    				sdf.PostDataToInfluxDB(getRecordsOuput.Records[i].Data)
    				i++
    			}
    			getRecordsInput.ShardIterator = getRecordsOuput.NextShardIterator
    			getRecordsOuput, err = client.GetRecords(&getRecordsInput)
    		}
    
    	}
    }

    Processing the data and writing it to InfluxDB

    Now we do simple processing of filtering out data. The data that we got from the sensor is having fields sensorId, temperature, humidity, city, and timestamp but we are interested in only the values of temperature and humidity for a city so we have created a new structure ‘SensorDataFiltered’ which contains only the fields we need.

    For every record that the Kinesis consumer receives it creates an instance of the SensorDataFiltered type and calls the PostDataToInfluxDB() method where the record received from the Kinesis stream is unmarshaled into the SensorDataFiltered type and send to InfluxDB. Here we need to provide the name of the database we created earlier to the variable dbName and the InfluxDB host and port values to dbHost and dbPort respectively.

    In the InfluxDB request body, the first value that we provide is used as the measurement which is an InfluxDB struct to store similar data together. Then we have tags, we have used `city` as our tag so that we can filter the data based on them and then we have the actual values. For more details on InfluxDB data write format please refer here.

    package dao
    
    import (
    	"bytes"
    	"crypto/tls"
    	"encoding/json"
    	"fmt"
    	"net/http"
    )
    
    type SensorDataFiltered struct {
    	Temperature float64 `json:"temperature"`
    	Humidity    float64 `json:"humidity"`
    	City        string  `json:"city"`
    }
    
    var dbName = "iotdata"
    var dbHost = "184.73.62.30"
    var dbPort = "8086"
    
    func (sdf *SensorDataFiltered) PostDataToInfluxDB(Data []byte) {
    	err := json.Unmarshal(Data, &sdf)
    	if err != nil {
    		fmt.Println(err)
    	} else {
    		fmt.Println(sdf.Temperature, sdf.Humidity)
    	}
    	url := "http://" + dbHost + ":" + dbPort + "/write?db=" + dbName
    	humidity := fmt.Sprintf("%.2f", sdf.Humidity)
    	temperature := fmt.Sprintf("%.2f", sdf.Temperature)
    	city := sdf.City
    	requestBody := "sensordata,city=" + city + " humidity=" + humidity + ",temperature=" + temperature
    	req, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(requestBody)))
    	httpclient := &http.Client{
    		Transport: &http.Transport{
    			TLSClientConfig: &tls.Config{
    				InsecureSkipVerify: true,
    			},
    		},
    	}
    	resp, err := httpclient.Do(req)
    	if err != nil {
    		fmt.Println(err)
    	} else {
    		fmt.Println("Status code for influxdb data port request = ", resp.StatusCode)
    	}
    	defer resp.Body.Close()
    
    }

    Once the data is written to InfluxDB we can see it in the web console by querying the measurement create in our database.

    Putting everything together in our main function

    Now we need to simply call the functions we discussed above and run our main program. Note that we have used `go` before the first two function call which makes them goroutines and they execute concurrently.

    On running the code you will see the logs for all the stages of our pipeline getting written to the stdout and it very closely resembles real-life scenarios where data is written by IoT devices and gets processed in near real-time.

    package main
    
    import (
    	"time"
    
    	"velotio.com/consumer"
    	"velotio.com/producer"
    	"velotio.com/publisher"
    )
    
    func main() {
    
    	go producer.StartKinesisProducer()
    	go publisher.StartMQTTPublisher()
    	time.Sleep(5 * time.Second)
    	consumer.StartKinesisConsumer()
    
    }

    Visualization through Grafana

    We can access the Grafana web console at port 3000 of the machine on which it is running. First, we need to add our InfluxDB as a data source to it under the data sources option.

    For creating dashboard go to the dashboard option and choose new. Once the dashboard is created we can start by adding a panel.

    We need to add Influxdb data source that we added earlier as the panel data source and write queries as shown in the image below.

    We can repeat the same process for adding another panel to the dashboard this time choosing a different city in our query.

    Conclusion:

    IoT data analytics is a fast evolving and interesting space. The number of IoT devices are growing rapidly. There is a great opportunity to get valuable insights from the huge amount of data generated by these device. In this blog, I tried to help you grab that opportunity by building a near real time data pipeline for IoT data. If you like it please share and subscribe to our blog.

  • Real Time Text Classification Using Kafka and Scikit-learn

    Introduction:

    Text classification is one of the essential tasks in supervised machine learning (ML). Assigning categories to text, which can be tweets, Facebook posts, web page, library book, media articles, gallery, etc. has many applications like spam filtering, sentiment analysis, etc. In this blog, we build a text classification engine to classify topics in an incoming Twitter stream using Apache Kafka and scikit-learn – a Python based Machine Learning Library.

    Let’s dive into the details. Here is a diagram to explain visually the components and data flow. The Kafka producer will ingest data from Twitter and send it to Kafka broker. The Kafka consumer will ask the Kafka broker for the tweets. We convert the tweets binary stream from Kafka to human readable strings and perform predictions using saved models. We train the models using Twenty Newsgroups which is a prebuilt training data from Sci-kit. It is a standard data set used for training classification algorithms. 

    In this blog we will use the following machine learning models:

    We have used the following libraries/tools:

    • tweepy – Twitter library for python
    • Apache Kafka
    • scikit-learn
    • pickle – Python Object serialization library

    Let’s first understand the following key concepts:

    • Word to Vector Methodology (Word2Vec)
    • Bag-of-Words
    • tf-idf
    • Multinomial Naive Bayes classifier

    Word2Vec methodology

    One of the key ideas in Natural Language Processing(NLP) is how we can efficiently convert words into numeric vectors which can then be given as an input to machine learning models to perform predictions.

    Neural networks or any other machine learning models are nothing but mathematical functions which need numbers or vectors to churn out the output except tree based methods, they can work on words.

    For this we have an approach known as Word2Vec. A very trivial solution to this would be to use “one-hot” method of converting the word into a sparse matrix with only one element of the vector set to 1, the rest being zero.

    For example, “the apple a day the good” would have following representation

    Here we have transformed the above sentence into a 6×5 matrix, with the 5 being the size of the vocabulary as “the” is repeated. But what are we supposed to do when we have a gigantic dictionary to learn from say more than 100000 words? Here one hot encoding fails. In one hot encoding the relationship between the words is lost. Like “Lanka” should come after “Sri”.

    Here is where Word2Vec comes in. Our goal is to vectorize the words while maintaining the context. Word2vec can utilize either of two model architectures to produce a distributed representation of words: continuous bag-of-words (CBOW) or continuous skip-gram. In the continuous bag-of-words architecture, the model predicts the current word from a window of surrounding context words. The order of context words does not influence prediction (bag-of-words assumption). In the continuous skip-gram architecture, the model uses the current word to predict the surrounding window of context words. 

    Tf-idf (term frequency–inverse document frequency)

    TF-IDF is a statistic which determines how important is a word to the document in given corpus. Variations of tf-idf is used by search engines, for text summarizations etc. You can read more about tf-idf – here.

    Multinomial Naive Bayes classifier

    Naive Bayes Classifier comes from family of probabilistic classifiers based on Bayes theorem. We use it to classify spam or not spam, sports or politics etc. We are going to use this for classifying streams of tweets coming in. You can explore it – here.

    Lets how they fit in together.

    The data from the “20 newsgroups datasets” is completely in text format. We cannot feed it directly to any model to do mathematical calculations. We have to extract features from the datasets and have to convert them to numbers which a model can ingest and then produce an output.
    So, we use Continuous Bag of Words and tf-idf for extracting features from datasets and then ingest them to multinomial naive bayes classifier to get predictions.

    1. Train Your Model

    We are going to use this dataset. We create another file and import the needed libraries We are using sklearn for ML and pickle to save trained model. Now we define the model.

    from __future__ import division,print_function, absolute_import
    from sklearn.datasets import fetch_20newsgroups #built-in dataset
    from sklearn.feature_extraction.text import CountVectorizer
    from sklearn.feature_extraction.text import TfidfTransformer
    from sklearn.naive_bayes import MultinomialNB
    import pickle
    from kafka import KafkaConsumer
    
    #Defining model and training it
    categories = ["talk.politics.misc","misc.forsale","rec.motorcycles",
    "comp.sys.mac.hardware","sci.med","talk.religion.misc"] #http://qwone.com/~jason/20Newsgroups/ for reference
    
    def fetch_train_dataset(categories):
    twenty_train = fetch_20newsgroups(subset='train', categories=categories, shuffle=True, random_state=42)
    return twenty_train
    
    def bag_of_words(categories):
    count_vect = CountVectorizer()
    X_train_counts = count_vect.fit_transform(fetch_train_dataset(categories).data)
    pickle.dump(count_vect.vocabulary_, open("vocab.pickle", 'wb'))
    return X_train_counts
    
    def tf_idf(categories):
    tf_transformer = TfidfTransformer()
    return (tf_transformer,tf_transformer.fit_transform(bag_of_words(categories)))
    
    def model(categories):
    clf = MultinomialNB().fit(tf_idf(categories)[1], fetch_train_dataset(categories).target)
    return clf
    
    model = model(categories)
    pickle.dump(model,open("model.pickle", 'wb'))
    print("Training Finished!")
    #Training Finished Here

    2. The Kafka Tweet Producer

    We have the trained model in place. Now lets get the real time stream of Twitter via Kafka. We define the Producer.

    # import required libraries
    from kafka import SimpleProducer, KafkaClient
    from tweepy.streaming import StreamListener
    from tweepy import OAuthHandler
    from tweepy import Stream
    from twitter_config import consumer_key, consumer_secret, access_token, access_token_secret
    import json

    Now we will define Kafka settings and will create KafkaPusher Class. This is necessary because we need to send the data coming from tweepy stream to Kafka producer.

    # Kafka settings
    topic = b'twitter-stream'
    
    # setting up Kafka producer
    kafka = KafkaClient('localhost:9092')
    producer = SimpleProducer(kafka)
    
    class KafkaPusher(StreamListener):
    
    def on_data(self, data):
    all_data = json.loads(data)
    tweet = all_data["text"]
    producer.send_messages(topic, tweet.encode('utf-8'))
    return True
    
    def on_error(self, status):
    print statusWORDS_TO_TRACK = ["Politics","Apple","Google","Microsoft","Bikes","Harley Davidson","Medicine"]
    
    if __name__ == '__main__':
    l = KafkaPusher()
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    stream = Stream(auth, l)
    while True:
    try:
    stream.filter(languages=["en"], track=WORDS_TO_TRACK)
    except:
    pass

    Note – You need to start Kafka server before running this script.

    3. Loading your model for predictions

    Now we have the trained model in step 1 and a twitter stream in step 2. Lets use the model now to do actual predictions. The first step is to load the model:

    #Loading model and vocab
    print("Loading pre-trained model")
    vocabulary_to_load = pickle.load(open("vocab.pickle", 'rb'))
    count_vect = CountVectorizer(vocabulary=vocabulary_to_load)
    load_model = pickle.load(open("model.pickle", 'rb'))count_vect._validate_vocabulary()
    tfidf_transformer = tf_idf(categories)[0]

    Then we start the kafka consumer and begin predictions:

    #predicting the streaming kafka messages
    consumer = KafkaConsumer('twitter-stream',bootstrap_servers=['localhost:9092'])
    print("Starting ML predictions.")
    for message in consumer:
    X_new_counts = count_vect.transform([message.value])
    X_new_tfidf = tfidf_transformer.transform(X_new_counts)
    predicted = load_model.predict(X_new_tfidf)
    print(message.value+" => "+fetch_train_dataset(categories).target_names[predicted[0]])

    Following are some of the classification done by our model

    • RT @amazingatheist: Making fun of kids who survived a school shooting just days after the event because you disagree with their politics is… => talk.politics.misc
    • sci.med
    • RT @DavidKlion: Apropos of that D’Souza tweet; I think in order to make sense of our politics, you need to understand that there are some t… => talk.politics.misc
    • RT @BeauWillimon: These students have already cemented a place in history with their activism, and they’re just getting started. No one wil… => talk.politics.misc
    • RT @byedavo: Cause we ain’t got no president => talk.politics.misc
    • RT @appleinsider: .@Apple reportedly in talks to buy cobalt, key Li-ion battery ingredient, directly from miners … => comp.sys.mac.hardware

    Here is the link to the complete git repository

    Conclusion:

    In this blog, we were successful in creating a data pipeline where we were using the Naive Bayes model for doing classification of the streaming twitter data. We can classify other sources of data like news articles, blog posts etc. Do let us know if you have any questions, queries and additional thoughts in the comments section below.

    Happy coding!

  • Surviving & Thriving in the Age of Software Accelerations

    It has almost been a decade since Marc Andreessen made this prescient statement. Software is not only eating the world but doing so at an accelerating pace. There is no industry that hasn’t been challenged by technology startups with disruptive approaches.

    • Automakers are no longer just manufacturing companies: Tesla is disrupting the industry with their software approach to vehicle development and continuous over-the-air software delivery. Waymo’s autonomous cars have driven millions of miles and self-driving cars are a near-term reality. Uber is transforming the transportation industry into a service, potentially affecting the economics and incentives of almost 3–4% of the world GDP!
    • Social networks and media platforms had a significant and decisive impact on the US election results.
    • Banks and large financial institutions are being attacked by FinTech startups like WealthFront, Venmo, Affirm, Stripe, SoFi, etc. Bitcoin, Ethereum and the broader blockchain revolution can upend the core structure of banks and even sovereign currencies.
    • Traditional retail businesses are under tremendous pressure due to Amazon and other e-commerce vendors. Retail is now a customer ownership, recommendations, and optimization business rather than a brick and mortar one.

    Enterprises need to adopt a new approach to software development and digital innovation. At Velotio, we are helping customers to modernize and transform their business with all of the approaches and best practices listed below.

    Agility

    In this fast-changing world, your business needs to be agile and fast-moving. You need to ship software faster, at a regular cadence, with high quality and be able to scale it globally.

    Agile practices allow companies to rally diverse teams behind a defined process that helps to achieve inclusivity and drives productivity. Agile is about getting cross-functional teams to work in concert in planned short iterations with continuous learning and improvement.

    Generally, teams that work in an Agile methodology will:

    • Conduct regular stand-ups and Scrum/Kanban planning meetings with the optimal use of tools like Jira, PivotalTracker, Rally, etc.
    • Use pair programming and code review practices to ensure better code quality.
    • Use continuous integration and delivery tools like Jenkins or CircleCI.
    • Design processes for all aspects of product management, development, QA, DevOps and SRE.
    • Use Slack, Hipchat or Teams for communication between team members and geographically diverse teams. Integrate all tools with Slack to ensure that it becomes the central hub for notifications and engagement.

    Cloud-Native

    Businesses need software that is purpose-built for the cloud model. What does that mean? Software team sizes are now in the hundreds of thousands. The number of applications and software stacks is growing rapidly in most companies. All companies use various cloud providers, SaaS vendors and best-of-breed hosted or on-premise software. Essentially, software complexity has increased exponentially which required a “cloud-native” approach to manage effectively. Cloud Native Computing Foundation defines cloud native as a software stack which is:

    1. Containerized: Each part (applications, processes, etc) is packaged in its own container. This facilitates reproducibility, transparency, and resource isolation.
    2. Dynamically orchestrated: Containers are actively scheduled and managed to optimize resource utilization.
    3. Microservices oriented: Applications are segmented into micro services. This significantly increases the overall agility and maintainability of applications.

    You can deep-dive into cloud native with this blog by our CTO, Chirag Jog.

    Cloud native is disrupting the traditional enterprise software vendors. Software is getting decomposed into specialized best of breed components — much like the micro-services architecture. See the Cloud Native landscape below from CNCF.

    DevOps

    Process and toolsets need to change to enable faster development and deployment of software. Enterprises cannot compete without mature DevOps strategies. DevOps is essentially a set of practices, processes, culture, tooling, and automation that focuses on delivering software continuously with high quality.

    DevOps tool chains & process

    As you begin or expand your DevOps journey, a few things to keep in mind:

    • Customize to your needs: There is no single DevOps process or toolchain that suits all needs. Take into account your organization structure, team capabilities, current software process, opportunities for automation and goals while making decisions. For example, your infrastructure team may have automated deployments but the main source of your quality issues could be the lack of code reviews in your development team. So identify the critical pain points and sources of delay to address those first.
    • Automation: Automate everything that can be. The lesser the dependency on human intervention, the higher are the chances for success.
    • Culture: Align the incentives and goals with your development, ITOps, SecOps, SRE teams. Ensure that they collaborate effectively and ownership in the DevOps pipeline is well established.
    • Small wins: Pick one application or team and implement your DevOps strategy within it. That way you can focus your energies and refine your experiments before applying them broadly. Show success as measured by quantifiable parameters and use that to transform the rest of your teams.
    • Organizational dynamics & integrations: Adoption of new processes and tools will cause some disruptions and you may need to re-skill part of your team or hire externally. Ensure that compliance, SecOps & audit teams are aware of your DevOps journey and get their buy-in.
    • DevOps is a continuous journey: DevOps will never be done. Train your team to learn continuously and refine your DevOps practice to keep achieving your goal: delivering software reliably and quickly.

    Micro-services

    As the amount of software in an enterprise explodes, so does the complexity. The only way to manage this complexity is by splitting your software and teams into smaller manageable units. Micro-services adoption is primarily to manage this complexity.

    Development teams across the board are choosing micro services to develop new applications and break down legacy monoliths. Every micro-service can be deployed, upgraded, scaled, monitored and restarted independent of other services. Micro-services should ideally be managed by an automated system so that teams can easily update live applications without affecting end-users.

    There are companies with 100s of micro-services in production which is only possible with mature DevOps, cloud-native and agile practice adoption.

    Interestingly, serverless platforms like Google Functions and AWS Lambda are taking the concept of micro-services to the extreme by allowing each function to act like an independent piece of the application. You can read about my thoughts on serverless computing in this blog: Serverless Computing Predictions for 2017.

    Digital Transformation

    Digital transformation involves making strategic changes to business processes, competencies, and models to leverage digital technologies. It is a very broad term and every consulting vendor twists it in various ways. Let me give a couple of examples to drive home the point that digital transformation is about using technology to improve your business model, gain efficiencies or built a moat around your business:

    • GE has done an excellent job transforming themselves from a manufacturing company into an IoT/software company with Predix. GE builds airplane engines, medical equipment, oil & gas equipment and much more. Predix is an IoT platform that is being embedded into all of GE’s products. This enabled them to charge airlines on a per-mile basis by taking the ownership of maintenance and quality instead of charging on a one-time basis. This also gives them huge amounts of data that they can leverage to improve the business as a whole. So digital innovation has enabled a business model improvement leading to higher profits.
    • Car companies are exploring models where they can provide autonomous car fleets to cities where they will charge on a per-mile basis. This will convert them into a “service” & “data” company from a pure manufacturing one.
    • Insurance companies need to built digital capabilities to acquire and retain customers. They need to build data capabilities and provide ongoing value with services rather than interact with the customer just once a year.

    You would be better placed to compete in the market if you have automation and digital process in place so that you can build new products and pivot in an agile manner.

    Big Data / Data Science

    Businesses need to deal with increasing amounts of data due to IoT, social media, mobile and due to the adoption of software for various processes. And they need to use this data intelligently. Cloud platforms provide the services and solutions to accelerate your data science and machine learning strategies. AWS, Google Cloud & open-source libraries like Tensorflow, SciPy, Keras, etc. have a broad set of machine learning and big data services that can be leveraged. Companies need to build mature data processing pipelines to aggregate data from various sources and store it for quick and efficient access to various teams. Companies are leveraging these services and libraries to build solutions like:

    • Predictive analytics
    • Cognitive computing
    • Robotic Process Automation
    • Fraud detection
    • Customer churn and segmentation analysis
    • Recommendation engines
    • Forecasting
    • Anomaly detection

    Companies are creating data science teams to build long term capabilities and moats around their business by using their data smartly.

    Re-platforming & App Modernization

    Enterprises want to modernize their legacy, often monolithic apps as they migrate to the cloud. The move can be triggered due to hardware refresh cycles or license renewals or IT cost optimization or adoption of software-focused business models.

     Benefits of modernization to customers and businesses

    Intelligent Applications

    Software is getting more intelligent and to enable this, businesses need to integrate disparate datasets, distributed teams, and processes. This is best done on a scalable global cloud platform with agile processes. Big data and data science enables the creation of intelligent applications.

    How can smart applications help your business?

    • New intelligent systems of engagement: intelligent apps surface insights to users enabling the user to be more effective and efficient. For example, CRMs and marketing software is getting intelligent and multi-platform enabling sales and marketing reps to become more productive.
    • Personalisation: E-Commerce, social networks and now B2B software is getting personalized. In order to improve user experience and reduce churn, your applications should be personalized based on the user preferences and traits.
    • Drive efficiencies: IoT is an excellent example where the efficiency of machines can be improved with data and cloud software. Real-time insights can help to optimize processes or can be used for preventive maintenance.
    • Creation of new business models: Traditional and modern industries can use AI to build new business models. For example, what if insurance companies allow you to pay insurance premiums only for the miles driven?

    Security

    Security threats to governments, enterprises and data have never been greater. As business adopt cloud native, DevOps & micro-services practices, their security practices need to evolve.

    In our experience, these are few of the features of a mature cloud native security practice:

    • Automated: Systems are updated automatically with the latest fixes. Another approach is immutable infrastructure with the adoption of containers and serverless.
    • Proactive: Automated security processes tend to be proactive. For example, if a malware of vulnerability is found in one environment, automation can fix it in all environments. Mature DevOps & CI/CD processes ensure that fixes can be deployed in hours or days instead of weeks or months.
    • Cloud Platforms: Businesses have realized that the mega-clouds are way more secure than their own data centers can be. Many of these cloud platforms have audit, security and compliance services which should be leveraged.
    • Protecting credentials: Use AWS KMS, Hashicorp Vault or other solutions for protecting keys, passwords and authorizations.
    • Bug bounties: Either setup bug bounties internally or through sites like HackerOne. You want the good guys to work for you and this is an easy way to do that.

    Conclusion

    As you can see, all of these approaches and best practices are intertwined and need to be implemented in concert to gain the desired results. It is best to start with one project, one group or one application and build on early wins. Remember, that is is a process and you are looking for gradual improvements to achieve your final objectives.

    Please let us know your thoughts and experiences by adding comments to this blog or reaching out to @kalpakshah or RSI. We would love to help your business adopt these best practices and help to build great software together. Drop me a note at kalpak (at) velotio (dot) com.