Category: Data and AI

  • Best Practices for Kafka Security

    Overview‍

    We will cover the security concepts of Kafka and walkthrough the implementation of encryption, authentication, and authorization for the Kafka cluster.

    This article will explain how to configure SASL_SSL (simple authentication security layer) security for your Kafka cluster and how to protect the data in transit. SASL_SSL is a communication type in which clients use authentication mechanisms like PLAIN, SCRAM, etc., and the server uses SSL certificates to establish secure communication. We will use the SCRAM authentication mechanism here for the client to help establish mutual authentication between the client and server. We’ll also discuss authorization and ACLs, which are important for securing your cluster.

    Prerequisites

    Running Kafka Cluster, basic understanding of security components.

    Need for Kafka Security

    The primary reason is to prevent unlawful internet activities for the purpose of misuse, modification, disruption, and disclosure. So, to understand the security in Kafka cluster a secure Kafka cluster, we need to know three terms:

    • Authentication – It is a security method used for servers to determine whether users have permission to access their information or website.
    • Authorization – The authorization security method implemented with authentication enables servers to have a methodology of identifying clients for access. Basically, it gives limited access, which is sufficient for the client.
    • Encryption – It is the process of transforming data to make it distorted and unreadable without a decryption key. Encryption ensures that no other client can intercept and steal or read data.

    Here is the quick start guide by Apache Kafka, so check it out if you still need to set up Kafka.

    https://kafka.apache.org/quickstart

    We’ll not cover the theoretical aspects here, but you can find a ton of sources on how these three components work internally. For now, we’ll focus on the implementation part and how Kafka revolves around security.

    This image illustrates SSL communication between the Kafka client and server.

    We are going to implement the steps in the below order:

    • Create a Certificate Authority
    • Create a Truststore & Keystore

    Certificate Authority – It is a trusted entity that issues SSL certificates. As such, a CA is an independent entity that acts as a trusted third party, issuing certificates for use by others. A certificate authority validates the credentials of a person or organization that requests a certificate before issuing one.

    Truststore – A truststore contains certificates from other parties with which you want to communicate or certificate authorities that you trust to identify other parties. In simple words, a list of CAs that can validate the certificate signed by the trusted CA.

    KeyStore – A KeyStore contains private keys and certificates with their corresponding public keys. Keystores can have one or more CA certificates depending upon what’s needed.

    For Kafka Server, we need a server certificate, and here, Keystore comes into the picture since it stores a server certificate. The server certificate should be signed by Certificate Authority (CA). The KeyStore requests to sign the server certificate and in response, CA send a signed CRT to Keystore.

    We will create our own certificate authority for demonstration purposes. If you don’t want to create a private certificate authority, there are many certificate providers you can go with, like IdenTrust and GoDaddy. Since we are creating one, we need to tell our Kafka client to trust our private certificate authority using the Trust Store.

    This block diagram shows you how all the components communicate with each other and their role to generate the final certificate.

    So, let’s create our Certificate Authority. Run the below command in your terminal:

    “openssl req -new -keyout <private_key_name> -out <public_certificate_name>”

    It will ask for a passphrase, and keep it safe for future use cases. After successfully executing the command, we should have two files named private_key_name and public_certificate_name.

    Now, let’s create a KeyStore and trust store for brokers; we need both because brokers also interact internally with each other. Let’s understand with the help of an example: Broker A wants to connect with Broker B, so Broker A acts as a client and Broker B as a server. We are using the SASL_SSL protocol, so A needs SASL credentials, and B needs a certificate for authentication. The reverse is also possible where Broker B wants to connect with Broker A, so we need both a KeyStore and a trust store for authentication.

    Now let’s create a trust store. Execute the below command in the terminal, and it should ask for the password. Save the password for future use:

    “keytool -keystore <truststore_name.jks> -alias <alias name of the entry to process> -import -file <public_certificate_name>”

    Here, we are using the .jks extension for the file, which stands for Java KeyStore. You can also use Public-Key Cryptography Standards #12 (pkcs12) instead of .jks, but that’s totally up to you. public_certificate_name is the same certificate while we create CA.

    For the KeyStore configuration, run the below command and store the password:

    “keytool genkey -keystore <keystore_name.jks> -validity <number_of_days> -storepass <store_password> -genkey -alias <alias_name> -keyalg <key algorithm name> -ext SAN=<“DNS:localhost”>”

    This action creates the KeyStore file in the current working directory. The question “First and Last Name” requires you to enter a fully qualified domain name because some certificate authorities, such as VeriSign, expect this property to be a fully qualified domain name. Not all CAs require a fully qualified domain name, but I recommend using a fully qualified domain name for portability. All other information should be valid. If the information cannot be verified, a certificate authority such as VeriSign will not sign the CSR generated for that record. I’m using localhost for the domain name here, as seen in the above command itself.

    Keystore has an entry with alias_name. It contains the private key and information needed for generating a CSR. Now let’s create a signing certificate request, so it will be used to get a signed certificate from Certificate Authority.

    Execute the below command in your terminal:

    “keytool -keystore <keystore_name.jks> -alias <alias_name> -certreq -file <file_name.csr>”

    So, we have generated a signing certificate request using a KeyStore (the KeyStore name and alias name should be the same). It should ask for the KeyStore password, so enter the same one used while creating the KeyStore.

    Now, execute the below command. It will ask for the password, so enter the CA password, and now we have a signed certificate:

    “openssl x509 -req -CA <public_certificate_name> -CAkey <private_key_name> -in <csr file> -out <signed_file_name> -CAcreateserial”

    Finally, we need to add the public certificate of CA and signed certificate in the KeyStore, so run the below command. It will add the CA certificate to the KeyStore.

    “keytool -keystore <keystore_name.jks> -alias <public_certificate_name> -import -file <public_certificate_name>”

    Now, let’s run the below command; it will add the signed certificate to the KeyStore.

    “keytool -keystore <keystore_name.jks> -alias <alias_name> -import -file <signed_file_name>”

    As of now, we have generated all the security files for the broker. For internal broker communication, we are using SASL_SSL (see security.inter.broker.protocol in server.properties). Now we need to create a broker username and password using the SCRAM method. For more details, click here.

    Run the below command:

    “kafka-configs.sh –zookeeper <host: port> –entity-type users –entity-name <username> –alter –add-config ‘SCRAM-SHA-512=[password=<password>]’”

    NOTE: Credentials for inter-broker communication must be created before Kafka brokers are started.

    Now, we need to configure the Kafka broker property file, so update the file as given below:

    listeners=SASL_SSL://localhost:9092
    advertised.listeners=SASL_SSL://localhost:9092
    ssl.truststore.location={path/to/truststore_name.jks}
    ssl.truststore.password={truststore_password}
    ssl.keystore.location={/path/to/keystore_name.jks}
    ssl.keystore.password={keystore_password}
    security.inter.broker.protocol=SASL_SSL
    ssl.client.auth=none
    ssl.protocol=TLSv1.2
    sasl.enabled.mechanisms=SCRAM-SHA-512
    sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
    listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username={username} password={password};
    super.users=User:{username}

    NOTE: If you are using an external jaas config file, then remove the ScramLoginModule line and set this environment variable before starting broker. “export KAFKA_OPTS=-Djava.security.auth.login.config={path/to/broker.conf}”

    Now, if we run Kafka, the broker should be running on port 9092 without any failure, and if you have multiple brokers inside Kafka, the same config file can be replicated among them, but the port should be different for each broker.

    Producers and consumers need a username and a password to access the broker, so let’s create their credentials and update respective configurations.

    Create a producer user and update producer.properties inside the bin directory, so execute the below command in your terminal.

    “bin/kafka-configs.sh –zookeeper <host: port> –entity-type users –entity-name <producer_name> –alter –add-config ‘SCRAM-SHA-512=[password=<password>]’”

    We need a trust store file for our clients (producer and consumer), but as we already know how to create a trust store, this is a small task for you. It is suggested that producers and consumers should have separate trust stores because when we move Kafka to production, there could be multiple producers and consumers on different machines.

    security.protocol=SASL_SSL
    ssl.protocol=TLSv1.2
    ssl.truststore.location={path/to/client.truststore.jks}
    ssl.truststore.password={password}
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username={producer_name} password={password};

    The below command creates a consumer user, so now let’s update consumer.properties inside the bin directory:

    “bin/kafka-configs.sh –zookeeper <host: port> –entity-type users –entity-name <consumer_name> –alter –add-config ‘SCRAM-SHA-512=[password=<password>]’”

    security.protocol=SASL_SSL
    ssl.protocol=TLSv1.2
    ssl.truststore.location={path/to/client.truststore.jks}
    ssl.truststore.password={password}
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username={consumer_name} password={password};

    As of now, we have implemented encryption and authentication for Kafka brokers. To verify that our producer and consumer are working properly with SCRAM credentials, run the console producer and consumer on some topics.

    Authorization is not implemented yet. Kafka uses access control lists (ACLs) to specify which users can perform which actions on specific resources or groups of resources. Each ACL has a principal, a permission type, an operation, a resource type, and a name.

    The default authorizer is ACLAuthorizer provided by Kafka; Confluent also provides the Confluent Server Authorizer, which is totally different from ACLAuthorizer. An authorizer is a server plugin used by Kafka to authorize actions. Specifically, the authorizer controls whether operations should be authorized based on the principal and resource being accessed.

    Format of ACLs – Principal P is [Allowed/Denied] Operation O from Host H on any Resource R matching ResourcePattern RP

    Execute the below command to create an ACL with writing permission for the producer:

    “bin/kafka-acls.sh –authorizer-properties zookeeper.connect=<host: port> –add –allow-principal User:<producer_name> –operation WRITE –topic <topic_name>”

    The above command should create ACL of write operation for producer_name on topic_name.

    Now, execute the below command to create an ACL with reading permission for the consumer:

    “bin/kafka-acls.sh –authorizer-properties zookeeper.connect=<host: port> –add –allow-principal User:<consumer_name> –operation READ –topic <topic_name>”

    Now we need to define the consumer group ID for this consumer, so the below command associates a consumer with a given consumer group ID.

    “bin/kafka-acls.sh –authorizer-properties zookeeper.connect=<host: port> –add –allow-principal User:<consumer_name> –operation READ –group <consumer_group_name>”

    Now, we need to add some configuration in two files: broker.properties and consumer.properties.

    # Authorizer class
    authorizer.class.name=kafka.security.authorizer.AclAuthorizer

    The above line indicates that AclAuthorizer class is used for authorization.

    # consumer group id
    group.id=<consumer_group_name>

    Consumer group-id is mandatory, and if we do not specify any group, a consumer will not be able to access the data from topics, so to start a consumer, group-id should be provided.

    Let’s test the producer and consumer one by one, run the console producer and also run the console consumer in another terminal; both should be running without error.

    console-producer
    console-consumer

    Voila!! Your Kafka is secured.

    Summary

    In a nutshell, we have implemented security in our Kafka using the SASL_SSL mechanism and learned how to create ACLs and give different permission to different users.

    Apache Kafka is the wild west without security. By default, there is no encryption, authentication, or access control list. Any client can communicate with the Kafka broker using the PLAINTEXT port. Access using this port should be restricted to trusted clients only. You can use network segmentation and/or authentication ACLs to restrict access to trusted IP addresses in these cases. If none of these are used, the cluster is wide open and available to anyone. A basic knowledge of Kafka authentication, authorization, encryption, and audit trails is required to safely move a system into production.

  • Building an Intelligent Recommendation Engine with Collaborative Filtering

    In this post, we will talk about building a collaborative recommendation system. For this, we will utilize patient ratings with a drug and medical condition dataset to generate treatment suggestions.

    Let’s take a practical scenario where multiple medical practitioners have treated patients with different medical conditions with the most suitable drugs available. For every prescribed drug, the patients are diagnosed and then suggested a treatment plan, which is our experiences.

    The purpose of the recommendation system is to understand and find patterns with the information provided by patients during the diagnosis, and then suggest a treatment plan, which most closely matches the pattern identified by the recommendation system. 

    At the end of this article, we are going deeper into how these recommendations work and how we can find one preferred suggestion and the next five closest suggestions for any treatment.

    Definitions

    A recommendation system suggests or predicts a user’s behaviour by observing patterns of their past behaviour compared to others.

    In simple terms, it is a filtering engine that picks more relevant information for specific users by using all the available information. It is often used in ecommerce like Amazon, Flipkart, Youtube, and Netflix and personalized user products like Alexa and Google Home Mini.

    For the medical industry, where suggestions must be most accurate, a recommendation system will also take experiences into account. So, we must use all our experiences, and such applications will use every piece of information for any treatment. 

    Recommendation systems use information like various medical conditions and their effect on each patient. They compare these patterns to every new treatment to find the closest similarity.

    Concepts and Technology

    To design the recommendation system, we need a few concepts, which are listed below.

    1. Concepts: Pattern Recognition, Correlation, Cosine Similarity, Vector norms (L1, L2, L-Infinity)

    2. Language: Python (library: Numpy & Pandas), Scipy, Sklearn

    As far as the prototype development is concerned, we have support of a library (Scipy & Sklearn) that executes all the algorithms for us. All we need is a little Python and to use library functions.

    Different Approaches for Recommendation Systems

    Below I have listed a few filtering approaches and examples:

    • Collaborative filtering: It is based on review or response of users for any entity. Here, the suggestion is based on the highest rated item by most of the users. E.g., movie or mobile suggestions.
    • Content-based filtering: It is based on the pattern of each user’s past activity. Here, the suggestion is based on the most preferred by similar users. E.g., food suggestions.
    • Popularity-based filtering: It is based on a pattern of popularity among all users. E.g., YouTube video suggestions  

    Based on these filtering approaches, there will be different approaches to recommender systems, which are explained below:

    • Multi-criteria recommender systems: Various conditions like age, gender, location, likes, and dislikes are used for categorization and then items are suggested. E.g., suggestion of apparel based on age and gender.
    • Risk-aware recommender systems: There is always uncertainty when users use Internet applications (website or mobile). Recommending any advertisement over the Internet must consider risk and users must be aware of this. E.g., advertisement display suggestion over Internet application. 
    • Mobile recommender systems: These are location-based suggestions that consist of users’ current location or future location and provide suggestions based on that. E.g., mostly preferred in traveling and tourism.
    • Hybrid recommender systems: These are the combination of multiple approaches for recommendations. E.g., suggestion of hotels and restaurants based on user preference and travel information.
    • Collaborative and content recommender systems: These are the combination of collaborative and content-based approaches. E.g., suggestion of the highest-rated movie of users’ preference along with their watch history.

    Practical Example with Implementation

    In this example, we have a sample dataset of drugs prescribed for various medical conditions and ratings given by patients. What we need here is for any medical condition we have to receive a suggestion for the most suitable prescribed drugs for treatment.

    Sample Dataset: 

    Below is the sample of the publicly available medical drug dataset used from the Winter 2018 Kaggle University Club Hackathon.

    drugNameconditionratingcondition_id
    MirtazapineDepression10201
    MesalamineCrohn’s Disease, Maintenance8185
    BactrimUrinary Tract Infection9657
    ContraveWeight Loss9677
    Cyclafem 1 / 35Birth Control9122
    ZyclaraKeratosis4365
    CopperBirth Control6122
    AmitriptylineMigraine Prevention9403
    MethadoneOpiate Withdrawal7460
    LevoraBirth Control2122
    ParoxetineHot Flashes1310
    MiconazoleVaginal Yeast Infection6664
    BelviqWeight Loss1677
    SeroquelSchizoaffective Disorde10575
    AmbienInsomnia2347
    NuvigilNarcolepsy9424
    ChantixSmoking Cessation10597
    Microgestin Fe 1 / 20Acne349
    KlonopinBipolar Disorde6121
    CiprofloxacinUrinary Tract Infection10657
    TrazodoneInsomnia1347
    EnteraGamIrritable Bowel Syndrome9356
    AripiprazoleBipolar Disorde1121
    CyclosporineKeratoconjunctivitis Sicca1364

    Sample Code: 

    We will do this in 5 steps:

    1. Importing required libraries

    2. Reading the drugsComTest_raw.csv file and creating a pivot matrix.

    3. Creating a KNN model using the NearestNeighbors function with distance metric- ‘cosine’ & algorithm- ‘brute’. Possible values for distance metric are ‘cityblock’, ‘euclidean’, ‘l1’, ‘l2’ & ‘manhattan’. Possible values for the algorithm are ‘auto’, ‘ball_tree’, ‘kd_tree’, ‘brute’ & ‘cuml’.

    4. Selecting one medical condition randomly for which we have to suggest 5 drugs for treatment.

    5. Finding the 6 nearest neighbors for the sample, calling the kneighbors function with the trained KNN models created in step 3. The first k-neighbor for the sample medical condition is self with a distance of 0. The next 5 k-neighbors are drugs prescribed for the sample medical condition.

    #!/usr/bin/env python
    # coding: utf-8
    
    # Step 1
    import pandas as pd
    import numpy as np
    
    from scipy.sparse import csr_matrix
    from sklearn.neighbors import NearestNeighbors
    from sklearn.preprocessing import LabelEncoder
    encoder = LabelEncoder()
    
    
    # Step 2
    df = pd.read_csv('drugsComTest_raw.csv').fillna('NA')
    df['condition_id'] = pd.Series(encoder.fit_transform(df['condition'].values), index=df.index)
    df_medical = df.filter(['drugName', 'condition', 'rating', 'condition_id'], axis=1)
    df_medical_ratings_pivot=df_medical.pivot_table(index='drugName',columns='condition_id',values='rating').fillna(0)
    df_medical_ratings_pivot_matrix = csr_matrix(df_medical_ratings_pivot.values)
    
    
    # Step 3
    # distance =  [‘cityblock’, ‘cosine’, ‘euclidean’, ‘l1’, ‘l2’, ‘manhattan’]
    # algorithm = ['auto', 'ball_tree', 'kd_tree', 'brute', 'cuml']
    model_knn = NearestNeighbors(metric = 'cosine', algorithm = 'brute')
    model_knn.fit(df_medical_ratings_pivot_matrix)
    
    
    # Step 4
    sample_index = np.random.choice(df_medical_ratings_pivot.shape[0])
    sample_condition = df_medical_ratings_pivot.iloc[sample_index,:].values.reshape(1, -1)
    
    
    # Step 5
    distances, indices = model_knn.kneighbors(sample_condition, n_neighbors = 6)
    for i in range(0, len(distances.flatten())):
        if i == 0:
            print('Recommendations for {0}:n'.format(df_medical_ratings_pivot.index[sample_index]))
        else:
            recommendation = df_medical_ratings_pivot.index[indices.flatten()[i]]
            distanceFromSample = distances.flatten()[i]
            print('{0}: {1}, with distance of {2}:'.format(i, recommendation, distanceFromSample))

    Explanation:

    This is the collaborative-based recommendation system that uses the patients’ ratings of given drug treatments to find similarities in medical conditions. Here, we are matching the patterns for ratings given to drugs by patients. This system compares all the rating patterns and tries to find similarities (cosine similarity).

    Challenges of Recommendation System

    Any recommendation system requires a decent quantity of quality information to process. Before developing such a system, we must be aware of it. Acknowledging and handling such challenges improve the accuracy of recommendation.

    1. Cold Start: Recommending a new user or a user without any previous behavior is a problem. We can recommend the most popular options to them. E.g., YouTube videos suggestion for newly registered users.

    2. Not Enough Data: Having insufficient data provides recommendations with less certainty. E.g., suggestion of hotels or restaurants will not be accurate if systems are uncertain about users’ locations.

    3. Grey Sheep Problem: This problem occurs when the inconsistent behavior of a user makes it difficult to find a pattern. E.g., multiple users are using the same account, so user activity will be wide, and the system will have difficulty in mapping such patterns. 

    4. Similar items: In these cases, there is not enough data to separate similar items. For these situations, we can recommend all similar items randomly. E.g., apparel suggestions for users with color and sizes. All shirts are similar. 

    5. Shilling Attacks: Intentional negative behavior that leads to bad/unwanted recommendations. While immoral, we cannot deny the possibility of such attacks. E.g., user ratings and reviews over various social media platforms.

    Accuracy and Performance Measures

    Accuracy evaluation is important as we always follow and try to improve algorithms. The most preferred measures for improving algorithms are user studies, online evaluations, and offline evaluations. Our recommendation models must be ready to learn from users’ activity daily. For online evaluations, we have to regularly test our recommendation system.

    If we understand the challenges of the recommendation system, we can prepare such testing datasets to test its accuracy. With these variations of datasets, we can improve our approach of user studies and offline evaluations.

    1. Online Evaluations: In online evaluations, prediction models are updated frequently with the unmonitored data, which leads to the possibility of unexpected accuracy. To verify this, the prediction models are exposed to the unmonitored data with less uncertainty and then the uncertainty of unmonitored data is gradually increased. 

    2. Offline Evaluations: In offline evaluations, the prediction models are trained with a sample dataset that consists of all possible uncertainty with expected outcomes. To verify this, the sample dataset will be gradually updated and prediction models will be verified with predicted and actual outcomes. E.g., creating multiple users with certain activity and expecting genuine suggestions for them.

    Conclusion

    As a part of this article, we have learned about the approaches, challenges, and evaluation methods, and then we created a practical example of the collaboration-based recommendation system. We also explored various types and filtering approaches with real-world scenarios.

    We have also executed sample code with a publicly available medical drug dataset with patient ratings. We can opt for various options for distance matrix and algorithm for the NearestNeighbors calculation. We have also listed various challenges for this system and understood the accuracy evaluation measures and things that affect and improve them.

  • Elasticsearch – Basic and Advanced Concepts

    What is Elasticsearch?

    In our previous blog, we have seen Elasticsearch is a highly scalable open-source full-text search and analytics engine, built on the top of Apache Lucene. Elasticsearch allows you to store, search, and analyze huge volumes of data as quickly as possible and in near real-time.

    Basic Concepts –

    • Index – Large collection of JSON documents. Can be compared to a database in relational databases. Every document must reside in an index.
    • Shards – Since, there is no limit on the number of documents that reside in an index, indices are often horizontally partitioned as shards that reside on nodes in the cluster. 
      Max documents allowed in a shard = 2,147,483,519 (as of now)
    • Type – Logical partition of an index. Similar to a table in relational databases. 
    • Fields – Similar to a column in relational databases. 
    • Analyzers – Used while indexing/searching the documents. These contain “tokenizers” that split phrases/text into tokens and “token-filters”, that filter/modify tokens during indexing & searching.
    • Mappings – Combination of Field + Analyzers. It defines how your fields can be stored & indexed.

    Inverted Index

    ES uses Inverted Indexes under the hood. Inverted Index is an index which maps terms to documents containing them.

    Let’s say, we have 3 documents :

    1. Food is great
    2. It is raining
    3. Wind is strong

    An inverted index for these documents can be constructed as –

    The terms in the dictionary are stored in a sorted order to find them quickly.

    Searching multiple terms is done by performing a lookup on the terms in the index. It performs either UNION or INTERSECTION on them and fetches relevant matching documents.

    An ES Index is spanned across multiple shards, each document is routed to a shard in a round–robin fashion while indexing. We can customize which shard to route the document, and which shard search-requests are sent to.

    ES Index is made of multiple Lucene indexes, which in turn, are made up of index segments. These are write once, read many types of indices, i.e the index files Lucene writes are immutable (except for deletions).

    Analyzers –

    Analysis is the process of converting text into tokens or terms which are added to the inverted index for searching. Analysis is performed by an analyzer. An analyzer can be either a built-in or a custom. 

    We can define single analyzer for both indexing & searching, or a different search-analyzer and an index-analyzer for a mapping.

    Building blocks of analyzer- 

    • Character filters – receives the original text as a stream of characters and can transform the stream by adding, removing, or changing characters.
    • Tokenizers – receives a stream of characters, breaks it up into individual tokens. 
    • Token filters – receives the token stream and may add, remove, or change tokens.

    Some Commonly used built-in analyzers –

    1. Standard –

    Divides text into terms on word boundaries. Lower-cases all terms. Removes punctuation and stopwords (if specified, default = None).

    Text:  The 2 QUICK Brown-Foxes jumped over the lazy dog’s bone.

    Output: [the, 2, quick, brown, foxes, jumped, over, the, lazy, dog’s, bone]

    2. Simple/Lowercase –

    Divides text into terms whenever it encounters a non-letter character. Lower-cases all terms.

    Text: The 2 QUICK Brown-Foxes jumped over the lazy dog’s bone.

    Output: [ the, quick, brown, foxes, jumped, over, the, lazy, dog, s, bone ]

    3. Whitespace –

    Divides text into terms whenever it encounters a white-space character.

    Text: The 2 QUICK Brown-Foxes jumped over the lazy dog’s bone.

    Output: [ The, 2, QUICK, Brown-Foxes, jumped, over, the, lazy, dog’s, bone.]

    4. Stopword –

    Same as simple-analyzer with stop word removal by default.

    Text: The 2 QUICK Brown-Foxes jumped over the lazy dog’s bone.

    Output: [ quick, brown, foxes, jumped, over, lazy, dog, s, bone]

    5. Keyword / NOOP –

    Returns the entire input string as it is.

    Text: The 2 QUICK Brown-Foxes jumped over the lazy dog’s bone.

    Output: [The 2 QUICK Brown-Foxes jumped over the lazy dog’s bone.]

    Some Commonly used built-in tokenizers –

    1. Standard –

    Divides text into terms on word boundaries, removes most punctuation.

    2. Letter –

    Divides text into terms whenever it encounters a non-letter character.

    3. Lowercase –

    Letter tokenizer which lowercases all tokens.

    4. Whitespace –

    Divides text into terms whenever it encounters any white-space character.

    5. UAX-URL-EMAIL –

    Standard tokenizer which recognizes URLs and email addresses as single tokens.

    6. N-Gram –

    Divides text into terms when it encounters anything from a list of specified characters (e.g. whitespace or punctuation), and returns n-grams of each word: a sliding window of continuous letters, e.g. quick → [qu, ui, ic, ck, qui, quic, quick, uic, uick, ick].

    7. Edge-N-Gram –

    It is similar to N-Gram tokenizer with n-grams anchored to the start of the word (prefix- based NGrams). e.g. quick → [q, qu, qui, quic, quick].

    8. Keyword –

    Emits exact same text as a single term.

    Make your mappings right –

    Analyzers if not made right, can increase your search time extensively. 

    Avoid using regular expressions in queries as much as possible. Let your analyzers handle them.

    ES provides multiple tokenizers (standard, whitespace, ngram, edge-ngram, etc) which can be directly used, or you can create your own tokenizer. 

    A simple use-case where we had to search for a user who either has “brad” in their name or “brad_pitt” in their email (substring based search), one would simply go and write a regex for this query, if no proper analyzers are written for this mapping.

    {
      "query": {
        "bool": {
          "should": [
            {
              "regexp": {
                "email.raw": ".*brad_pitt.*"
              }
            },
            {
              "regexp": {
                "name.raw": ".*brad.*"
              }
            }
          ]
        }
      }
    }

    This took 16s for us to fetch 1 lakh out of 60 million documents

    Instead, we created an n-gram analyzer with lower-case filter which would generate all relevant tokens while indexing.

    The above regex query was updated to –

    {
      "query": {
        "bool": {
          "multi_match": {
            "query": "brad",
            "fields": [
              "email.suggestion",
              "full_name.suggestion"
            ]
          }
        }
      },
      "size": 25
    }

    This took 109ms for us to fetch 1 lakh out of 60 million documents

    Thus, previous search query which took more than 10-25s got reduced to less than 800-900ms to fetch the same set of records.

    Had the use-case been to search results where name starts with “brad” or email starts with “brad_pitt” (prefix based search), it is better to go for edge-n-gram analyzer or suggesters.

    Performance Improvement with Filter Queries –

    Use Filter queries whenever possible. 

    ES usually scores documents and returns them in sorted order as per their scores. This may take a hit on performance if scoring of documents is not relevant to our use-case. In such scenarios, use “filter” queries which give boolean scores to documents.

    {
      "query": {
        "bool": {
          "multi_match": {
            "query": "brad",
            "fields": [
              "email.suggestion",
              "full_name.suggestion"
            ]
          }
        }
      },
      "size": 25
    }

    Above query can now be written as –

    {
      "query": {
        "bool": {
          "filter": {
            "bool": {
              "must": [
                {
                  "multi_match": {
                    "query": "brad",
                    "fields": [
                      "email.suggestion",
                      "full_name.suggestion"
                    ]
                  }
                }
              ]
            }
          }
        }
      },
      "size": 25
    }

    This will reduce query-time by a few milliseconds.

    Re-indexing made faster –

    Before creating any mappings, know your use-case well.

    ES does not allow us to alter existing mappings unlike “ALTER” command in relational databases, although we can keep adding new mappings to the index. 

    The only way to change existing mappings is by creating a new index, re-indexing existing documents and aliasing the new-index with required name with ZERO downtime on production. Note – This process can take days if you have millions of records to re-index.

    To re-index faster, we can change a few settings  –

    1. Disable swapping – Since no requests will be directed to the new index till indexing is done, we can safely disable swap
    Command for Linux machines –

    sudo swapoff -a

    2. Disable refresh_interval for ES – Default refresh_interval is 1s which can safely be disabled while documents are getting re-indexed.

    3. Change bulk size while indexing – ES usually indexes documents in chunks of size 1k. It is preferred to increase this default size to approx 5 to 10K, although we need to find the sweet spot while reindexing to avoid load on current index.

    4. Reset replica count to 0  – ES creates at least 1 replica per shard, by default. We can set this to 0 while indexing & reset it to required value post indexing.

    Conclusion

    ElasticSearch is a very powerful database for text-based searches. The Elastic ecosystem is widely used for reporting, alerting, machine learning, etc. This article just gives an overview of ElasticSearch mappings and how creating relevant mappings can improve your query performance & accuracy. Giving right mappings, right resources to your ElasticSearch cluster can do wonders.

  • Explanatory vs. Predictive Models in Machine Learning

    My vision on Data Analysis is that there is continuum between explanatory models on one side and predictive models on the other side. The decisions you make during the modeling process depend on your goal. Let’s take Customer Churn as an example, you can ask yourself why are customers leaving? Or you can ask yourself which customers are leaving? The first question has as its primary goal to explain churn, while the second question has as its primary goal to predict churn. These are two fundamentally different questions and this has implications for the decisions you take along the way. The predictive side of Data Analysis is closely related to terms like Data Mining and Machine Learning.

    SPSS & SAS

    When we’re looking at SPSS and SAS, both of these languages originate from the explanatory side of Data Analysis. They are developed in an academic environment, where hypotheses testing plays a major role. This makes that they have significant less methods and techniques in comparison to R and Python. Nowadays, SAS and SPSS both have data mining tools (SAS Enterprise Miner and SPSS Modeler), however these are different tools and you’ll need extra licenses.

    I have spent some time to build extensive macros in SAS EG to seamlessly create predictive models, which also does a decent job at explaining the feature importance. While a Neural Network may do a fair job at making predictions, it is extremely difficult to explain such models, let alone feature importance. The macros that I have built in SAS EG does precisely the job of explaining the features, apart from producing excellent predictions.

    Open source TOOLS: R & PYTHON

    One of the major advantages of open source tools is that the community continuously improves and increases functionality. R was created by academics, who wanted their algorithms to spread as easily as possible. R has the widest range of algorithms, which makes R strong on the explanatory side and on the predictive side of Data Analysis.

    Python is developed with a strong focus on (business) applications, not from an academic or statistical standpoint. This makes Python very powerful when algorithms are directly used in applications. Hence, we see that the statistical capabilities are primarily focused on the predictive side. Python is mostly used in Data Mining or Machine Learning applications where a data analyst doesn’t need to intervene. Python is therefore also strong in analyzing images and videos. Python is also the easiest language to use when using Big Data Frameworks like Spark. With the plethora of packages and ever improving functionality, Python is a very accessible tool for data scientists.

    MACHINE LEARNING MODELS

    While procedures like Logistic Regression are very good at explaining the features used in a prediction, some others like Neural Networks are not. The latter procedures may be preferred over the former when it comes to only prediction accuracy and not explaining the models. Interpreting or explaining the model becomes an issue for Neural Networks. You can’t just peek inside a deep neural network to figure out how it works. A network’s reasoning is embedded in the behavior of numerous simulated neurons, arranged into dozens or even hundreds of interconnected layers. In most cases the Product Marketing Officer may be interested in knowing what are the factors that are most important for a specific advertising project. What can they concentrate on to get the response rates higher, rather than, what will be their response rate, or revenues in the upcoming year. These questions are better answered by procedures which can be interpreted in an easier way. This is a great article about the technical and ethical consequences of the lack of explanations provided by complex AI models.

    Procedures like Decision Trees are very good at explaining and visualizing what exactly are the decision points (features and their metrics). However, those do not produce the best models. Random Forests, Boosting are the procedures which use Decision Trees as the basic starting point to build the predictive models, which are by far some of the best methods to build sophisticated prediction models.

    While Random Forests use fully grown (highly complex) Trees, and by taking random samples from the training set (a process called Bootstrapping), then each split uses only a proper subset of features from the entire feature set to actually make the split, rather than using all of the features. This process of bootstrapping helps with lower number of training data (in many cases there is no choice to get more data). The (proper) subsetting of the features has a tremendous effect on de-correlating the Trees grown in the Forest (hence randomizing it), leading to a drop in Test Set error. A fresh subset of features is chosen at each step of splitting, making the method robust. The strategy also stops the strongest feature from appearing each time a split is considered, making all the trees in the forest similar. The final result is obtained by averaging the result over all trees (in case of Regression problems), or by taking a majority class vote (in case of classification problem).

    On the other hand, Boosting is a method where a Forest is grown using Trees which are NOT fully grown, or in other words, with Weak Learners. One has to specify the number of trees to be grown, and the initial weights of those trees for taking a majority vote for class selection. The default weight, if not specified is the average of the number of trees requested. At each iteration, the method fits these weak learners, finds the residuals. Then the weights of those trees which failed to predict the correct class is increased so that those trees can concentrate better on the failed examples. This way, the method proceeds by improving the accuracy of the Boosted Trees, stopping when the improvement is below a threshold. One particularly implementation of Boosting, AdaBoost has very good accuracy over other implementations. AdaBoost uses Trees of depth 1, known as Decision Stump as each member of the Forest. These are slightly better than random guessing to start with, but over time they learn the pattern and perform extremely well on test set. This method is more like a feedback control mechanism (where the system learns from the errors). To address overfitting, one can use the hyper-parameter Learning Rate (lambda) by choosing values in the range: (0,1]. Very small values of lambda will take more time to converge, however larger values may have difficulty converging. This can be achieved by a iterative process to select the correct value for lambda, plotting the test error rate against values of lambda. The value of lambda with the lowest test error should be chosen.

    In all these methods, as we move from Logistic Regression, to Decision Trees to Random Forests and Boosting, the complexity of the models increase, making it almost impossible to EXPLAIN the Boosting model to marketers/product managers. Decision Trees are easy to visualize, Logisitic Regression results can be used to demonstrate the most important factors in a customer acquisition model and hence will be well received by business leaders. On the other hand, the Random Forest and Boosting methods are extremely good predictors, without much scope for explaining. But there is hope: These models have functions for revealing the most important variables, although it is not possible to visualize why. 

    USING A BALANCED APPROACH

    So I use a mixed strategy: Use the previous methods as a step in Exploratory Data Analysis, present the importance of features, characteristics of the data to the business leaders in phase one, and then use the more complicated models to build the prediction models for deployment, after building competing models. That way, one not only gets to understand what is happening and why, but also gets the best predictive power. In most cases that I have worked, I have rarely seen a mismatch between the explanation and the predictions using different methods. After all, this is all math and the way of delivery should not change end results. Now that’s a happy ending for all sides of the business!

  • Building an ETL Workflow Using Apache NiFi and Hive

    The objective of this article is to design an ETL workflow using Apache NiFi that will scrape a web page with almost no code to get an endpoint, extract and transform the dataset, and load the transformed data into a Hive table.

    Problem Statement

    One potential use case where we need to create a data pipeline would be to capture the district level COVID-19 information from the COVID19-India API website, which gets updated daily. So, the aim is to create a flow that collates and loads a dataset into a warehouse system used by various downstream applications for further analysis, and the flow should be easily configurable for future changes.

    Prerequisites

    Before we start, we must have a basic understanding of Apache NiFi, and having it installed on a system would be a great start for this article. If you do not have it installed, please follow these quick steps. Apache Hive should be added to this architecture, which also requires a fully functional Hadoop framework. For this article, I am using Hive on a single cluster installed locally, but you can use a remote hive connection as well.

    Basic Terminologies

    Apache NiFi is an ETL tool with flow-based programming that comes with a web UI built to provide an easy way (drag & drop) to handle data flow in real-time. It also supports powerful and scalable means of data routing and transformation, which can be run on a single server or in a clustered mode across many servers.

    NiFi workflow consists of processors, the rectangular components that can process, verify, filter, join, split, or adjust data. They exchange pieces of information called FlowFiles through queues named connections, and the FlowFile Controller helps to manage the resources between those components.

    Web scraping is a process to extract and collect structured web data with automation. It includes extracting and processing underlying HTML code using CSS selectors and the extracted data gets stored into a database.

    Apache Hive is a warehouse system built on top of Hadoop used for data summarization, query, and ad-hoc analysis.

    Steps for ETL Workflow

    Fig:- End-to-End NiFi WorkFlow

    The above flow comprises multiple processors each performing different tasks at different stages to process data. The different stages are Collect (InvokeHTTP – API Web Page, InvokeHTTP – Download District Data), Filter (GetHTMLElement, ExtractEndPoints, RouteOnAttributeDistrict API, QueryRecord), Transform (ReplaceHeaders, ConvertJSONToSQL), Load (PutHiveQL), and Logging (LogAttribute). Each processor is connected through different relationship connections and gets triggered on success until the data gets loaded into the table. The entire flow is scheduled to run daily.

    So, let’s dig into each step to understand the flow better.

    1. Get the HTML document using the Remote URL

    The flow starts with an InvokeHTTP processor that sends an HTTP GET request to the COVID19-India API URL and returns an HTML page in the response queue for further inspection. The processor can be used to invoke multiple HTTP methods (GET, PUT, POST, or PATCH) as well.

    Fig:- InvokeHTTP – API Web Page Configuration

    2. Extract listed endpoints

    The second step occurs when the GETHTMLElement processor targets HTML table rows from the response where all the endpoints are listed inside anchor tags using the CSS selector as tr > td > a. and extracts data into FlowFiles.

    Fig:- GetHTMLElement Configuration

    After the success of the previous step, the ExtractText processor evaluates regular expressions against the content of the FlowFile to extract the URLs, which are then assigned to a FlowFile attribute named data_url.

    Fig:- ExtractEndPoints Configuration

    Note: The layout of the web page may have changed in the future. So, if you are reading this article in the future, configure the above processors as per the layout changes if any.

    3. Pick districts API and Download the dataset

    Here, the RouteOnAttribute processor filters out an API for district-level information and ignores other APIs using Apache NiFi Expression since we are only interested in district.csv

    Fig:- RouteOnAttribute – District API Configuration

    And this time, the InvokeHTTP processor downloads the data using the extracted API endpoint assigned to the attribute data_url surrounded with curly braces and the response data will be in the CSV format.

    Fig:- InvokeHTTP – Download District Data Configuration

    ‍4. Transform and Filter the dataset

    In this stage, the header of the response data is changed to lowercase using the ReplaceText processor with Literal Replace strategy, and the first field name is changed from date to recorded_date to avoid using reserved database keywords.

    Since the data is being updated daily on an incremental basis, we will only extract the data from the previous day using the QueryRecord processor. It will also convert the CSV data into JSON FlowFile using the CSVReader and JsonRecordSetWriter controller services.

    Please note that both the CSVReader and JsonRecordSetWriter services can have the default settings for our use. You can check out this blog for more reading on the controller services.

    And as mentioned, QueryRecord evaluates the below query to get data from the previous day out of the FlowFile and passes it to the next processor.

    select * from FlowFile where recorded_date=’${now():toNumber():minus(86400000):format(‘yyyy-MM-dd’)}’ 

    Fig:- ReplaceHeaders Configuration

    Fig:- QueryRecord Configuration

    ‍5. Establish JDBC connection pool for Hive and create a table

    Let’s set up the Hive JDBC driver for the NiFi flow using HiveConnectinPool with required local/remote configurations (database connection URL, user, and password).  Hive Configuration Resources property expects Hive configuration file path, i.e., hive-site.xml.

    Fig:- HiveConnectionPool Setup

    Now, we need an empty table to load the data from the NiFi flow, and to do so, you can use the DDL structure below:

    CREATE TABLE IF NOT EXISTS demo.districts (recorded_date string, state string, district string, confirmed string, recovered string, deceased string, other string, tested string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

    6. Load data into the Hive table

    In this step, the JSON-formatted FlowFile is converted into an SQL statement using ConvertJSONToSQL to provide a SQL query as the output FlowFile. We can configure the HiveConnectinPool for the JDBC Connection Pool property along with the table name and statement type before running the processor. In this case, the statement would be an insert type since we need to load the data into the table.

    Also, please note that when preparing a SQL command, the SQL Parameter Attribute Prefix property should be hiveql. Otherwise, the very next processor will not be able to identify it and will throw an error.

    Then, on success, PutHiveQL executes the input SQL command and loads the data into the table. The success of this processor marks the end of the workflow and the data can be verified by fetching the target table.

    Fig:- ConvertJSONToSQL Configurations

     

    Fig:- PutHiveQL Configuration

    ‍7. Schedule the flow for daily updates

    You can schedule the entire flow to run at any given time using different NiFi scheduling strategies. Since the first InvokeHTTP is the initiator of this flow, we can configure it to run daily at 2 AM.

    Fig:- Scheduling Strategy

    8. Log Management

    Almost every processor has been directed to the LogAttribute processor with a failure/success queue, which will write the state and information of all used attributes into the NiFi file, logs/nifi-app.log. By checking this file, we can debug and fix the issue in case of any failure. To extend it even further, we can also set up a flow to capture and notify error logs using Apache Kafka over email.

    9. Consume data for analysis

    You can use various open-source visualization tools to start off with the exploratory data analysis on the data stored in the Hive table.

    You can download the template covid_etl_workflow.xml and run it on your machine for reference.

    Future Scope

    There are different ways to build any workflow, and this was one of them. You can take this further by allowing multiple datasets (state_wise, test_datasets) from the list with different combinations of various processors/controllers as a part of the flow. 

    You can also try scraping data from a product listing page of multiple e-commerce websites for a comparison between goods and price or you can even extract movie reviews and ratings from the IMDb website and use it as a recommendation for users. 

    Conclusion

    In this article, we discussed Apache NiFi and created a workflow to extract, filter, transform, and load the data for analysis purposes. If you are more comfortable building logics and want to focus on the architecture with less code, then Apache NiFi is the tool for you.

  • Exploring OpenAI Gym: A Platform for Reinforcement Learning Algorithms

    Introduction 

    According to the OpenAI Gym GitHub repository “OpenAI Gym is a toolkit for developing and comparing reinforcement learning algorithms. This is the gym open-source library, which gives you access to a standardized set of environments.”

    Open AI Gym has an environment-agent arrangement. It simply means Gym gives you access to an “agent” which can perform specific actions in an “environment”. In return, it gets the observation and reward as a consequence of performing a particular action in the environment.

    There are four values that are returned by the environment for every “step” taken by the agent.

    1. Observation (object): an environment-specific object representing your observation of the environment. For example, board state in a board game etc
    2. Reward (float): the amount of reward/score achieved by the previous action. The scale varies between environments, but the goal is always to increase your total reward/score.
    3. Done (boolean): whether it’s time to reset the environment again. E.g you lost your last life in the game.
    4. Info (dict): diagnostic information useful for debugging. However, official evaluations of your agent are not allowed to use this for learning.

    Following are the available Environments in the Gym:

    1. Classic control and toy text
    2. Algorithmic
    3. Atari
    4. 2D and 3D robots

    Here you can find a full list of environments.

    Cart-Pole Problem

    Here we will try to write a solve a classic control problem from Reinforcement Learning literature, “The Cart-pole Problem”.

    The Cart-pole problem is defined as follows:
    “A pole is attached by an un-actuated joint to a cart, which moves along a frictionless track. The system is controlled by applying a force of +1 or -1 to the cart. The pendulum starts upright, and the goal is to prevent it from falling over. A reward of +1 is provided for every timestep that the pole remains upright. The episode ends when the pole is more than 15 degrees from vertical, or the cart moves more than 2.4 units from the center.”

    The following code will quickly allow you see how the problem looks like on your computer.

    import gym
    env = gym.make('CartPole-v0')
    env.reset()
    for _ in range(1000):
        env.render()
        env.step(env.action_space.sample())

    This is what the output will look like:

    Coding the neural network 

    #We first import the necessary libraries and define hyperparameters - 
    
    import gym
    import random
    import numpy as np
    import tflearn
    from tflearn.layers.core import input_data, dropout, fully_connected
    from tflearn.layers.estimator import regression
    from statistics import median, mean
    from collections import Counter
    
    LR = 2.33e-4
    env = gym.make("CartPole-v0")
    observation = env.reset()
    goal_steps = 500
    score_requirement = 50
    initial_games = 10000
    
    #Now we will define a function to generate training data - 
    
    def initial_population():
        # [OBS, MOVES]
        training_data = []
        # all scores:
        scores = []
        # scores above our threshold:
        accepted_scores = []
        # number of episodes
        for _ in range(initial_games):
            score = 0
            # moves specifically from this episode:
            episode_memory = []
            # previous observation that we saw
            prev_observation = []
            for _ in range(goal_steps):
                # choose random action left or right i.e (0 or 1)
                action = random.randrange(0,2)
                observation, reward, done, info = env.step(action)
                # since that the observation is returned FROM the action
                # we store previous observation and corresponding action
                if len(prev_observation) > 0 :
                    episode_memory.append([prev_observation, action])
                prev_observation = observation
                score+=reward
                if done: break
    
            # reinforcement methodology here.
            # IF our score is higher than our threshold, we save
            # all we're doing is reinforcing the score, we're not trying
            # to influence the machine in any way as to HOW that score is
            # reached.
            if score >= score_requirement:
                accepted_scores.append(score)
                for data in episode_memory:
                    # convert to one-hot (this is the output layer for our neural network)
                    if data[1] == 1:
                        output = [0,1]
                    elif data[1] == 0:
                        output = [1,0]
    
                    # saving our training data
                    training_data.append([data[0], output])
    
            # reset env to play again
            env.reset()
            # save overall scores
            scores.append(score)
    
    # Now using tflearn we will define our neural network 
    
    def neural_network_model(input_size):
    
        network = input_data(shape=[None, input_size, 1], name='input')
    
        network = fully_connected(network, 128, activation='relu')
        network = dropout(network, 0.8)
    
        network = fully_connected(network, 256, activation='relu')
        network = dropout(network, 0.8)
    
        network = fully_connected(network, 512, activation='relu')
        network = dropout(network, 0.8)
    
        network = fully_connected(network, 256, activation='relu')
        network = dropout(network, 0.8)
    
        network = fully_connected(network, 128, activation='relu')
        network = dropout(network, 0.8)
    
        network = fully_connected(network, 2, activation='softmax')
        network = regression(network, optimizer='adam', learning_rate=LR, loss='categorical_crossentropy', name='targets')
        model = tflearn.DNN(network, tensorboard_dir='log')
    
        return model
    
    #It is time to train the model now -
    
    def train_model(training_data, model=False):
    
        X = np.array([i[0] for i in training_data]).reshape(-1,len(training_data[0][0]),1)
        y = [i[1] for i in training_data]
    
        if not model:
            model = neural_network_model(input_size = len(X[0]))
    
        model.fit({'input': X}, {'targets': y}, n_epoch=5, snapshot_step=500, show_metric=True, run_id='openai_CartPole')
        return model
    
    training_data = initial_population()
    
    model = train_model(training_data)
    
    #Training complete, now we should play the game to see how the output looks like 
    
    scores = []
    choices = []
    for each_game in range(10):
        score = 0
        game_memory = []
        prev_obs = []
        env.reset()
        for _ in range(goal_steps):
            env.render()
    
            if len(prev_obs)==0:
                action = random.randrange(0,2)
            else:
                action = np.argmax(model.predict(prev_obs.reshape(-1,len(prev_obs),1))[0])
    
            choices.append(action)
    
            new_observation, reward, done, info = env.step(action)
            prev_obs = new_observation
            game_memory.append([new_observation, action])
            score+=reward
            if done: break
    
        scores.append(score)
    
    print('Average Score:',sum(scores)/len(scores))
    print('choice 1:{}  choice 0:{}'.format(float((choices.count(1))/float(len(choices)))*100,float((choices.count(0))/float(len(choices)))*100))
    print(score_requirement)

    This is what the result will look like:

    Conclusion

    Though we haven’t used the Reinforcement Learning model in this blog, the normal fully connected neural network gave us a satisfactory accuracy of 60%. We used tflearn, which is a higher level API on top of Tensorflow for speeding-up experimentation. We hope that this blog will give you a head start in using OpenAI Gym.

    We are waiting to see exciting implementations using Gym and Reinforcement Learning. Happy Coding!

  • A Quick Introduction to Data Analysis With Pandas

    Python is a great language for doing data analysis, primarily because of the fantastic ecosystem of data-centric Python packages. Pandas is one of those packages and makes importing and analyzing data much easier.

    Pandas aims to integrate the functionality of NumPy and matplotlib to give you a convenient tool for data analytics and visualization. Besides the integration,  it also makes the usage far more better.

    In this blog, I’ll give you a list of useful pandas snippets that can be reused over and over again. These will definitely save you some time that you may otherwise need to skim through the comprehensive Pandas docs.

    The data structures in Pandas are capable of holding elements of any type: Series, DataFrame.

    Series

    A one-dimensional object that can hold any data type such as integers, floats, and strings

    A Series object can be created of different values. Series can be remembered similar to a Python list.

    In the below example, NaN is NumPy’s nan symbol which tells us that the element is not a number but it can be used as one numerical type pointing out to be not a number. The type of series is an object because the series has mixed contents of strings and numbers.

    >>> import pandas as pd
    >>> import numpy as np
    >>> series = pd.Series([12,32,54,2, np.nan, "a string", 6])
    >>> series
    0          12
    1          32
    2          54
    3           2
    4         NaN
    5    a string
    6           6
    dtype: object

    Now if we use only numerical values, we get the basic NumPy dtype – float for our series.

    >>> series = pd.Series([1,2,np.nan, 4])
    >>> series
    0    1.0
    1    2.0
    2    NaN
    3    4.0
    dtype: float64

    DataFrame

    A two-dimensional labeled data structure where columns can be of different types.

    Each column in a Pandas DataFrame represents a Series object in memory.

    In order to convert a certain Python object (dictionary, lists, etc) to a DataFrame, it is extremely easy. From the python dictionaries, the keys map to Column names while values correspond to a list of column values.

    >>> d = {
        "stats": pd.Series(np.arange(10,15,1.0)),
        "year": pd.Series(["2012","2007","2012","2003"]),
        "intake": pd.Series(["SUMMER","WINTER","WINTER","SUMMER"]),
    }
    >>> df = pd.DataFrame(d)
    >>> df

    Reading CSV files

    Pandas can work with various file types while reading any file you need to remember.

    pd.read_filetype()

    Now you will have to only replace “filetype” with the actual type of the file, like csv or excel. You will have to give the path of the file inside the parenthesis as the first argument. You can also pass in different arguments that relate to opening the file. (Reading a csv file? See this)

    >>> df = pd.read_csv('companies.csv')
    >>> df.head()
    view raw

    Accessing Columns and Rows

    DataFrame comprises of three sub-components, the indexcolumns, and the data (also known as values).

    The index represents a sequence of values. In the DataFrame, it always on the left side. Values in an index are in bold font. Each individual value of the index is called a label. Index is like positions while the labels are values at that particular index. Sometimes the index is also referred to as row labels. In all the examples below, the labels and indexes are the same and are just integers beginning from 0 up to n-1, where n is the number of rows in the table.

    Selecting rows is done using loc and iloc:

    • loc gets rows (or columns) with particular labels from the index. Raises KeyError when the items are not found.
    • iloc gets rows (or columns) at particular positions/index (so it only takes integers). Raises IndexError if a requested indexer is out-of-bounds.
    >>> df.loc[:5]              #similar to df.head()

    Accessing the data using column names

    Pandas takes an extra step and allows us to access data through labels in DataFrames.

    >>> df.loc[:5, ["name","vertical", "url"]]

    In Pandas, selecting data is very easy and similar to accessing an element from a dictionary or a list.

    You can select a column (df[col_name]) and it will return column with label col_name as a Series, because rows and columns are stored as Series in a DataFrame, If you need to access more columns (df[[col_name_1, col_name_2]]) and it returns columns as a new DataFrame.

    Filtering DataFrames with Conditional Logic

    Let’s say we want all the companies with the vertical as B2B, the logic would be:

    >>> df[(df['vertical'] == 'B2B')]

    If we want the companies for the year 2009, we would use:

    >>> df[(df['year'] == 2009)]

    Need to combine them both? Here’s how you would do it:

    >>> df[(df['vertical'] == 'B2B') & (df['year'] == 2009)]

    Get all companies with vertical as B2B for the year 2009

    Sort and Groupby

    Sorting

    Sort values by a certain column in ascending order by using:

    >>> df.sort_values(colname)

    >>> df.sort_values(colname,ascending=False)

    Furthermore, it’s also possible to sort values by multiple columns with different orders. colname_1 is being sorted in ascending order and colname_2 in descending order by using:

    >>> df.sort_values([colname_1,colname_2],ascending=[True,False])

    Grouping

    This operation involves 3 steps; splitting of the data, applying a function on each of the group, and finally combining the results into a data structure. This can be used to group large amounts of data and compute operations on these groups.

    df.groupby(colname) returns a groupby object for values from one column while df.groupby([col1,col2]) returns a groupby object for values from multiple columns.

    Data Cleansing

    Data cleaning is a very important step in data analysis.

    Checking missing values in the data

    Check null values in the DataFrame by using:

    >>> df.isnull()

    This returns a boolean array (an array of true for missing values and false for non-missing values).

    >>> df.isnull().sum()

    Check non null values in the DataFrame using pd.notnull(). It returns a boolean array, exactly converse of df.notnull()

    Removing Empty Values

    Dropping empty values can be done easily by using:

    >>> df.dropna()

    This drops the rows having empty values or df.dropna(axis=1) to drop the columns.

    Also, if you wish to fill the missing values with other values, use df.fillna(x). This fills all the missing values with the value x (here you can put any value that you want) or s.fillna(s.mean()) which replaces null values with the mean (mean can be replaced with any function from the arithmetic section).

    Operations on Complete Rows, Columns, or Even All Data

    >>> df["url_len"] = df["url"].map(len)

    The .map() operation applies a function to each element of a column.

    .apply() applies a function to columns. Use .apply(axis=1) to do it on the rows.

    Iterating over rows

    Very similar to iterating any of the python primitive types such as list, tuples, dictionaries.

    >>> for i, row in df.iterrows():
            print("Index {0}".format(i))
            print("Row {0}".format(row))

    The .iterrows() loops 2 variables together i.e, the index of the row and the row itself, variable is the index and variable row is the row in the code above.

    Tips & Tricks

    Using ufuncs (also known as Universal Functions). Python has the .apply() which applies a function to columns/rows. Similarly, Ufuncs can be used while preprocessing. What is the difference between ufuncs and .apply()?

    Ufuncs is a numpy library, implemented in C which is highly efficient (ufuncs are around 10 times faster).

    A list of common Ufuncs:

    isinf: Element-wise checks for positive or negative infinity.

    isnan: Element-wise checks for NaN and returns result as a boolean array.

    isnat: Element-wise checks for NaT (not time) and returns result as a boolean array.

    trunc: Return the truncated value of the input, element-wise.

    .dt commands: Element-wise processing for date objects.

    High-Performance Pandas

    Pandas performs various vectorized/broadcasted operations and grouping-type operations. These operations are efficient and effective.

    As of version 0.13, Pandas included tools that allow us to directly access C-speed operations without costly allocation of intermediate arrays. There are two functions, eval() and query().

    DataFrame.eval() for efficient operations:

    >>> import pandas as pd
    >>> nrows, ncols = 100000, 100
    >>> rng = np.random.RandomState(42)
    >>> df1, df2, df3, df4 = (pd.DataFrame(rng.rand(nrows, ncols))
                          for i in range(4))

    To compute the sum of df1, df2, df3, and df4 DataFrames using the typical Pandas approach, we can just write the sum:

    >>> %timeit df1 + df2 + df3 + df4
    
    10 loops, best of 3: 103.1 ms per loop

    A better and optimized approach for the same operation can be computed via pd.eval():

    >>> %timeit pd.eval('df1 + df2 + df3 + df4')
    
    10 loops, best of 3: 53.6 ms per loop

    %timeit — Measure execution time of small code snippets.

    The eval() expression is about 50% faster (it also consumes mush less memory).

    And it performs the same result:

    >>> np.allclose(df1 + df2 + df3 + df4,d.eval('df1 + df2 + df3 + df4'))
    
    True

    np.allclose() is a numpy function which returns True if two arrays are element-wise equal within a tolerance.

    Column-Wise & Assignment Operations Using df.eval()

    Normal expression to split the first character of a column and assigning it to the same column can be done by using:

    >>> df['batch'] = df['batch'].str[0]

    By using df.eval(), same expression can be performed much faster:

    >>> df.eval("batch=batch.str[0]")

    DataFrame.query() for efficient operations:

    Similar to performing filtering operations with conditional logic, to filter rows with vertical as B2B and year as 2009, we do it by using:

    >>> %timeit df[(df['vertical'] == 'B2B') & (df['year'] == 2009)]
    
    1.69 ms ± 57 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

    With .query() the same filtering can be performed about 50% faster.

    >>> %timeit df.query("vertical == 'B2B' and year == 2009")
    
    875 µs ± 24.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

    When to use eval() and query()? 

    Two aspects: computation time and memory usage. 

    Memory usage: Every operation which involves NumPy/Pandas DataFrames results into implicit creation of temporary variables. In such cases, if the memory usage of these temporary variables is greater, using eval() and query() is an appropriate choice to reduce the memory usage.

    Computation time: Traditional method of performing NumPy/Pandas operations is faster for smaller arrays! The real benefit of eval()/query() is achieved mainly because of the saved memory, and also because of the cleaner syntax they offer.

    Conclusion

    Pandas is a powerful and fun library for data manipulation/analysis. It comes with easy syntax and fast operations. The blog highlights the most used pandas implementation and optimizations. Best way to master your skills over pandas is to use real datasets, beginning with Kaggle kernels to learning how to use pandas for data analysis. Check out more on real time text classification using Kafka and Scikit-learn and explanatory vs. predictive models in machine learning here.  

  • Building an Intelligent Chatbot Using Botkit and Rasa NLU

    Introduction

    Bots are the flavor of the season. Everyday, we hear about a new bot catering to domains like travel, social, legal, support, sales, etc. being launched. Facebook Messenger alone has more than 11,000 bots when I last checked and must have probably added thousands of them as I write this article.

    The first generation of bots were dumb since they could understand only a limited set of queries based on keywords in the conversation. But the commoditization of NLP(Natural Language Processing) and machine learning by services like Wit.ai, API.ai, Luis.ai, Amazon Lex, IBM Watson, etc. has resulted in the growth of intelligent bots like donotpay, chatShopper. I don’t know if bots are just hype or the real deal. But I can say with certainty that building a bot is fun and challenging at the same time. In this article, I would like to introduce you to some of the tools to build an intelligent chatbot.

    The title of the blog clearly tells that we have used Botkit and Rasa (NLU) to build our bot. Before getting into the technicalities, I would like to share the reason for choosing these two platforms and how they fit our use case. Also read – How to build a serverless chatbot with Amazon Lex.

    Bot development Framework — Howdy, Botkit and Microsoft (MS) Bot Framework were good contenders for this. Both these frameworks:
    – are open source
    – have integrations with popular messaging platforms like Slack, Facebook Messenger, Twilio etc
    – have good documentation
    – have an active developer community

    Due to compliance issues, we had chosen AWS to deploy all our services and we wanted the same with the bot as well.

    NLU (Natural Language Understanding) — API.ai (acquired by google) and Wit.ai (acquired by Facebook) are two popular NLU tools in the bot industry which we first considered for this task. Both the solutions:
    – are hosted as a cloud service
    – have Nodejs, Python SDK and a REST interface
    – have good documentation
    – support for state or contextual intents which makes it very easy to build a conversational platform on top of it.

    As stated before, we couldn’t use any of these hosted solutions due to compliance and that is where we came across an open source NLU called Rasa which was a perfect replacement for API.ai and Wit.ai and at the same time, we could host and manage it on AWS.

    You would now be wondering why I used the term NLU for Api.ai and Wit.ai and not NLP (Natural Language Processing). 
    * NLP refers to all the systems which handle the interactions with humans in the way humans find it natural. It means that we could converse with a system just the way we talk to other human beings. 
    * NLU is a subfield of NLP which handles a narrow but complex challenge of converting unstructured inputs into a structured form which a machine can understand and act upon. So when you say “Book a hotel for me in San Francisco on 20th April 2017”, the bot uses NLU to extract
    date=20th April 2017, location=San Francisco and action=book hotel
    which the system can understand.

    RASA NLU

    In this section, I would like to explain Rasa in detail and some terms used in NLP which you should be familiar with.
    * Intent: This tells us what the user would like to do. 
    Ex :  Raise a complaint, request for refund etc

    * Entities: These are the attributes which gives details about the user’s task. Ex — Complaint regarding service disruptions, refund cost etc

    * Confidence Score : This is a distance metric which indicates how closely the NLU could classify the result into the list of intents.

    Here is an example to help you understand the above mentioned terms — 
    Input: “My internet isn’t working since morning”.
        –  intent: 
          “service_interruption” 
         – entities: “service=internet”, 
          “duration=morning”.
         – confidence score: 0.84 (This could vary based on your training)

    NLU’s job (Rasa in our case) is to accept a sentence/statement and give us the intent, entities and a confidence score which could be used by our bot. Rasa basically provides a high level API over various NLP and ML libraries which does intent classification and entity extraction. These NLP and ML libraries are called as backend in Rasa which brings the intelligence in Rasa. These are some of the backends used with Rasa

    • MITIE — This is an all inclusive library meaning that it has NLP library for entity extraction as well as ML library for intent classification built into it.
    • spaCy + sklearn — spaCy is a NLP library which only does entity extraction. sklearn is used with spaCy to add ML capabilities for intent classification.
    • MITIE + sklearn — This uses best of both the worlds. This uses good entity recognition available in MITIE along with fast and good intent classification in sklearn.

    I have used MITIE backend to train Rasa. For the demo, I’ve taken a “Live Support ChatBot” which is trained for messages like this:
    * My phone isn’t working.
    * My phone isn’t turning on.
    * My phone crashed and isn’t working anymore.

    My training data looks like this:

    {
      "rasa_nlu_data": {
        "common_examples": [
        {
            "text": "hi",
            "intent": "greet",
            "entities": []
          },
          {
            "text": "my phone isn't turning on.",
            "intent": "device_failure",
            "entities": [
              {
                "start": 3,
                "end": 8,
                "value": "phone",
                "entity": "device"
              }
            ]
          },
          {
            "text": "my phone is not working.",
            "intent": "device_failure",
            "entities": [
              {
                "start": 3,
                "end": 8,
                "value": "phone",
                "entity": "device"
              }
            ]
          },
          {
            "text": "My phone crashed and isn’t working anymore.",
            "intent": "device_failure",
            "entities": [
              {
                "start": 3,
                "end": 8,
                "value": "phone",
                "entity": "device"
              }
            ]
          }
        ]
      }
    }

    NOTE — We have observed that MITIE gives better accuracy than spaCy + sklearn for a small training set but as you keep adding more intents, training on MITIE gets slower and slower. For a training set of 200+ examples with about 10–15 intents, MITIE takes about 35–45 minutes for us to train on a C4.4xlarge instance(16 cores, 30 GB RAM) on AWS.

    Botkit-Rasa Integration

    Botkit is an open source bot development framework designed by the creators of Howdy. It basically provides a set of tools for building bots on Facebook Messenger, Slack, Twilio, Kik and other popular platforms. They have also come up with an IDE for bot development called Botkit Studio. To summarize, Botkit is a tool which allows us to write the bot once and deploy it on multiple messaging platforms.

    Botkit also has a support for middleware which can be used to extend the functionality of botkit. Integrations with database, CRM, NLU and statistical tools are provided via middleware which makes the framework extensible. This design also allows us to easily add integrations with other tools and software by just writing middleware modules for them.

    I’ve integrated Slack and botkit for this demo. You can use this boilerplate template to setup botkit for Slack. We have extended Botkit-Rasa middleware which you can find here.

    Botkit-Rasa has 2 functions: receive and hears which override the default botkit behaviour.
    1. receive — This function is invoked when botkit receives a message. It sends the user’s message to Rasa and stores the intent and entities into the botkit message object.

    2. hears — This function overrides the default botkit hears method i.e controller.hears. The default hears method uses regex to search the given patterns in the user’s message while the hears method from Botkit-Rasa middleware searches for the intent.

    let Botkit = require('botkit');
    let rasa = require('./Middleware/rasa')({rasa_uri: 'http://localhost:5000'});
    
    let controller = Botkit.slackbot({
      clientId: process.env.clientId,
      clientSecret: process.env.clientSecret,
      scopes: ['bot'],
      json_file_store: __dirname + '/.db/'
    });
    
    // Override receive method in botkit
    controller.middleware.receive.use(rasa.receive);
    
    // Override hears method in botkit
    controller.changeEars(function (patterns, message) {
      return rasa.hears(patterns, message);
    });
    
    controller.setupWebserver(3000, function (err, webserver) {
      // Configure a route to receive webhooks from slack
      controller.createWebhookEndpoints(webserver);
    });

    Let’s try an example — my phone is not turning on”.
    Rasa will return the following
    1. Intent — device_failure
    2. Entites — device=phone

    If you notice carefully, the input I gave i.e my phone is not turning on” is a not present in my training file. Rasa has some intelligence built into it to identify the intent and entities correctly for such combinations. 

    We need to add a hears method listening to intent “device_failure” to process this input. Remember that intent and entities returned by Rasa will be stored in the message object by Rasa-Botkit middleware.

    let Botkit = require('botkit');
    let rasa = require('./Middleware/rasa')({rasa_uri: 'http://localhost:5000'});
    
    let controller = Botkit.slackbot({
      clientId: process.env.clientId,
      clientSecret: process.env.clientSecret,
      scopes: ['bot'],
      json_file_store: __dirname + '/.db/'
    });
    
    // Override receive method in botkit
    controller.middleware.receive.use(rasa.receive);
    
    // Override hears method in botkit
    controller.changeEars(function (patterns, message) {
      return rasa.hears(patterns, message);
    });
    
    controller.setupWebserver(3000, function (err, webserver) {
      // Configure a route to receive webhooks from slack
      controller.createWebhookEndpoints(webserver);
    });

    You should be able run this bot with slack and see the output as shown below (support_bot is the name of my bot).

    Conclusion

    You are now familiar with the process of building chatbots with a bot development framework and a NLU. Hope this helps you get started on your bot very quickly. If you have any suggestions, questions, feedback then tweet me @harjun1601. Keep following our blogs for more articles on bot development, ML and AI.

  • Lessons Learnt While Building an ETL Pipeline for MongoDB & Amazon Redshift Using Apache Airflow

    Recently, I was involved in building an ETL (Extract-Transform-Load) pipeline. It included extracting data from MongoDB collections, perform transformations and then loading it into Redshift tables. Many ETL solutions are available in the market which kind-of solves the issue, but the key part of an ETL process lies in its ability to transform or process raw data before it is pushed to its destination.

    Each ETL pipeline comes with a specific business requirement around processing data which is hard to be achieved using off-the-shelf ETL solutions. This is why a majority of ETL solutions are custom built manually, from scratch. In this blog, I am going to talk about my learning around building a custom ETL solution which involved moving data from MongoDB to Redshift using Apache Airflow.

    Background:

    I began by writing a Python-based command line tool which supported different phases of ETL, like extracting data from MongoDB, processing extracted data locally, uploading the processed data to S3, loading data from S3 to Redshift, post-processing and cleanup. I used the PyMongo library to interact with MongoDB and the Boto library for interacting with Redshift and S3.

    I kept each operation atomic so that multiple instances of each operation can run independently of each other, which will help to achieve parallelism. One of the major challenges was to achieve parallelism while running the ETL tasks. One option was to develop our own framework based on threads or developing a distributed task scheduler tool using a message broker tool like Celery combined with RabbitMQ. After doing some research I settled for Apache Airflow. Airflow is a Python-based scheduler where you can define DAGs (Directed Acyclic Graphs), which would run as per the given schedule and run tasks in parallel in each phase of your ETL. You can define DAG as Python code and it also enables you to handle the state of your DAG run using environment variables. Features like task retries on failure handling are a plus.

    We faced several challenges while getting the above ETL workflow to be near real-time and fault tolerant. We discuss the challenges faced and the solutions below:

    Keeping your ETL code changes in sync with Redshift schema

    While you are building the ETL tool, you may end up fetching a new field from MongoDB, but at the same time, you have to add that column to the corresponding Redshift table. If you fail to do so the ETL pipeline will start failing. In order to tackle this, I created a database migration tool which would become the first step in my ETL workflow.

    The migration tool would:

    • keep the migration status in a Redshift table and
    • would track all migration scripts in a code directory.

    In each ETL run, it would get the most recently ran migrations from Redshift and would search for any new migration script available in the code directory. If found it would run the newly found migration script after which the regular ETL tasks would run. This adds the onus on the developer to add a migration script if he is making any changes like addition or removal of a field that he is fetching from MongoDB.

    Maintaining data consistency

    While extracting data from MongoDB, one needs to ensure all the collections are extracted at a specific point in time else there can be data inconsistency issues. We need to solve this problem at multiple levels:

    • While extracting data from MongoDB define parameters like modified date and extract data from different collections with a filter as records less than or equal to that date. This will ensure you fetch point in time data from MongoDB.
    • While loading data into Redshift tables, don’t load directly to master table, instead load it to some staging table. Once you are done loading data in staging for all related collections, load it to master from staging within a single transaction. This way data is either updated in all related tables or in none of the tables.

    A single bad record can break your ETL

    While moving data across the ETL pipeline into Redshift, one needs to take care of field formats. For example, the Date field in the incoming data can be different than that in the Redshift schema design. Another example can be that the incoming data can exceed the length of the field in the schema. Redshift’s COPY command which is used to load data from files to redshift tables is very vulnerable to such changes in data types. Even a single incorrectly formatted record will lead to all your data getting rejected and effectively breaking the ETL pipeline.

    There are multiple ways in which we can solve this problem. Either handle it in one of the transform jobs in the pipeline. Alternately we put the onus on Redshift to handle these variances. Redshift’s COPY command has many options which can help you solve these problems. Some of the very useful options are

    • ACCEPTANYDATE: Allows any date format, including invalid formats such as 00/00/00 00:00:00, to be loaded without generating an error.
    • ACCEPTINVCHARS: Enables loading of data into VARCHAR columns even if the data contains invalid UTF-8 characters.
    • TRUNCATECOLUMNS: Truncates data in columns to the appropriate number of characters so that it fits the column specification.

    Redshift going out of storage

    Redshift is based on PostgreSQL and one of the common problems is when you delete records from Redshift tables it does not actually free up space. So if your ETL process is deleting and creating new records frequently, then you may run out of Redshift storage space. VACUUM operation for Redshift is the solution to this problem. Instead of making VACUUM operation a part of your main ETL flow, define a different workflow which runs on a different schedule to run VACUUM operation. VACUUM operation reclaims space and resorts rows in either a specified table or all tables in the current database. VACUUM operation can be FULL, SORT ONLY, DELETE ONLY & REINDEX. More information on VACUUM can be found here.

    ETL instance going out of storage

    Your ETL will be generating a lot of files by extracting data from MongoDB onto your ETL instance. It is very important to periodically delete those files otherwise you are very likely to go out of storage on your ETL server. If your data from MongoDB is huge, you might end up creating large files on your ETL server. Again, I would recommend defining a different workflow which runs on a different schedule to run a cleanup operation.

    Making ETL Near Real Time

    Processing only the delta rather than doing a full load in each ETL run

    ETL would be faster if you keep track of the already processed data and process only the new data. If you are doing a full load of data in each ETL run, then the solution would not scale as your data scales. As a solution to this, we made it mandatory for the collection in our MongoDB to have a created and a modified date. Our ETL would check the maximum value of the modified date for the given collection from the Redshift table. It will then generate the filter query to fetch only those records from MongoDB which have modified date greater than that of the maximum value. It may be difficult for you to make changes in your product, but it’s worth the effort!

    Compressing and splitting files while loading

    A good approach is to write files in some compressed format. It saves your storage space on ETL server and also helps when you load data to Redshift. Redshift COPY command suggests that you provide compressed files as input. Also instead of a single huge file, you should split your files into parts and give all files to a single COPY command. This will enable Redshift to use it’s computing resources across the cluster to do the copy in parallel, leading to faster loads.

    Streaming mongo data directly to S3 instead of writing it to ETL server

    One of the major overhead in the ETL process is to write data first to ETL server and then uploading it to S3. In order to reduce disk IO, you should not store data to ETL server. Instead, use MongoDB’s handy stream API. For MongoDB Node driver, both the collection.find() and the collection.aggregate() function return cursors. The stream method also accepts a transform function as a parameter. All your custom transform logic could go into the transform function. AWS S3’s node library’s upload() function, also accepts readable streams. Use the stream from the MongoDB Node stream method, pipe it into zlib to gzip it, then feed the readable stream into AWS S3’s Node library. Simple! You will see a large improvement in your ETL process by this simple but important change.

    Optimizing Redshift Queries

    Optimizing Redshift Queries helps in making the ETL system highly scalable, efficient and also reduce the cost. Lets look at some of the approaches:

    Add a distribution key

    Redshift database is clustered, meaning your data is stored across cluster nodes. When you query for certain set of records, Redshift has to search for those records in each node, leading to slow queries. A distribution key is a single metric, which will decide the data distribution of all data records across your tables. If you have a single metric which is available for all your data, you can specify it as distribution key. When loading data into Redshift, all data for a certain value of distribution key will be placed on a single node of Redshift cluster. So when you query for certain records Redshift knows exactly where to search for your data. This is only useful when you are also using the distribution key to query the data.

    Source: Slideshare

     

    Generating a numeric primary key for string primary key

    In MongoDB, you can have any type of field as your primary key. If your Mongo collections are having a non-numeric primary key and you are using those same keys in Redshift, your joins will end up being on string keys which are slower. Instead, generate numeric keys for your string keys and joining on it which will make queries run much faster. Redshift supports specifying a column with an attribute as IDENTITY which will auto-generate numeric unique value for the column which you can use as your primary key.

    Conclusion:

    In this blog, I have covered the best practices around building ETL pipelines for Redshift  based on my learning. There are many more recommended practices which can be easily found in Redshift and MongoDB documentation. 

  • Benefits of Using Chatbots: How Companies Are Using Them to Their Advantange

    Bots are the new black! The entire tech industry seems to be buzzing with “bot” fever. Me and my co-founders often see a “bot” company and discuss its business model. Chirag Jog has always been enthusiastic about the bot wave while I have been mostly pessimistic, especially about B2C bots. We should consider that there are many types of “bots” —chat bots, voice bots, AI assistants, robotic process automation(RPA) bots, conversational agents within apps or websites, etc.

    Over the last year, we have been building some interesting chat and voice based bots which has given me some interesting insights. I hope to lay down my thoughts on bots in some detail and with some structure.

    What are bots?

    Bots are software programs which automate tasks that humans would otherwise do themselves. Bots are developed using machine learning software and are expected to aggregate data to make the interface more intelligent and intuitive. There have always been simple rule-based bots which provide a very specific service with low utility. In the last couple of years, we are seeing emergence of intelligent bots that can serve more complex use-cases.

    Why now?

    Machine learning, NLP and AI technologies have matured enabling practical applications where bots can actually do intelligent work >75% of the times. Has general AI been solved? No. But is it good enough to do the simple things well and give hope for more complex things? Yes.

    Secondly, there are billions of DAUs on Whatsapp & Facebook Messenger. There are tens of millions of users on enterprise messaging platforms like Slack, Skype & Microsoft Teams. Startups and enterprises want to use this distribution channel and will continue to experiment aggressively to find relevant use-cases. Millennials are very comfortable using the chat and voice interfaces for a broader variety of use-cases since they used chat services as soon as they came online. As millennials become a growing part of the workforce, the adoption of bots may increase.

    Thirdly, software is becoming more prevalent and more complex. Data is exploding and making sense of this data is getting harder and requiring more skill. Companies are experimenting with bots to provide an “easy to consume” interface to casual users. So non-experts can use the bot interface while experts can use the mobile or web application for the complex workflows. This is mostly true for B2B & enterprise. A good example is how Slack has become the system of engagement for many companies (including at @velotiotech). We require all the software we use (Gitlab, Asana, Jira, Google Docs, Zoho, Marketo, Zendesk, etc.) to provide notifications into Slack. Over time, we expect to start querying the respective Slack bots for information. Only domain experts will log into the actual SaaS applications.

    Types of Bots

    B2C Chat-Bots

    Consumer focused bots use popular messaging and social platforms like Facebook, Telegram, Kik, WeChat, etc. Some examples of consumer bots include weather, e-commerce, travel bookings, personal finance, fitness, news. These are mostly inspired by WeChat which owns the China market and is the default gateway to various internet services. These bots show up as “contacts” in these messenger platforms.

    Strategically, the B2C bots are basically trying to get around the distribution monopoly of Apple & Google Android. As many studies have indicated, getting mobile users to install apps is getting extremely hard. Facebook, Skype, Telegram hope to become the system of engagement and distribution for various apps thereby becoming an alternate “App Store” or “Bot Store”.

    I believe that SMS is a great channel for basic chatbot functionality. Chatbots with SMS interface can be used by all age groups and in remote parts of the world where data infrastructure is lacking. I do expect to see some interesting companies use SMS chatbots to build new business models. Also mobile bots that sniff or integrate with as many of your mobile apps to provide cross-platform and cross-app “intelligence” will succeed — Google Now is a good example.

    An often cited example is the DoNotPay chatbot which helps people contest parking tickets in the UK. In my opinion, the novelty is in the service and it’s efficiency and not in the chatbot interface as such. Also, I have not met anyone who uses a B2C chatbot even on a weekly or monthly basis.

    B2B Bots

    Enterprise bots are available through platforms and interfaces like Slack, Skype, Microsoft Teams, website chat windows, email assistants, etc. They are focused on collaboration, replacing/augmenting emails, information assistants, support, and speeding up decision-making/communications.

    Most of the enterprise bots solve niche and specific problems. This is a great advantage considering the current state of AI/ML technologies. Many of these enterprise bot companies are also able to augment their intelligence with human agents thereby providing better experiences to users.

    Some of the interesting bots and services in the enterprise space include:

    • x.ai and Clara Labs which provide a virtual assistant to help you setup and manage your meetings.
    • Gong.io and Chorus provide a bot that listens in on sales calls and uses voice-to-text and other machine learning algorithms to help your sales teams get better and close more deals.
    • Astro is building an AI assisted email app which will have multiple interfaces including voice (Echo).
    • Twyla is helping to make chatbots on website more intelligent using ML. It integrates with your existing ZenDesk, LivePerson or Salesforce support.
    • Clarke.ai is a bot which uses AI to take notes for your meeting so you can focus better.
    • Smacc provides AI assisted automated book-keeping for SMBs.
    • Slack is one of the fastest growing SaaS companies and has the most popular enterprise bot store. Slack bots are great for pushing and pulling information & data. All SaaS services and apps should have bots that can emit useful updates, charts, data, links, etc to a specific set of users. This is much better than sending emails to an email group. Simple decisions can be taken within a chat interface using something like Slack Buttons. Instead of receiving an email and opening a web page, most people would prefer approving a leave or an expense right within Slack. Slack/Skype/etc will add the ability to embed “cards” or “webviews” or “interactive sections” within chats. This will enable some more complex use-cases to be served via bots. Most enterprise services have Slack bots and are allowing Slack to be a basic system of engagement.
    • Chatbots or even voice-based bots on websites will be a big deal. Imagine that each website has a virtual support rep or a sales rep available to you 24×7 in most popular languages. All business would want such “agents” or “bots” for greater sales conversions and better support.
    • Automation of backoffice tasks can be a HUGE business. KPOs & BPOs are a huge market sp if you can build software or software-enabled processes to reduce costs, then you can build a significant sized company. Some interesting examples here Automation Anywhere and WorkFusion.

    Voice based Bots

    Amazon had a surprise hit in the consumer electronics space with their Amazon Echo device which is a voice-based assistant. Google recently releases their own voice enabled apps to complete with Echo/Alexa. Voice assistants provide weather, music, searches, e-commerce ordering via NLP voice interface. Apple’s Siri should have been leading this market but as usual Apple is following rather leading the market.

    Voice bots have one great advantage- with miniaturization of devices (Apple Watch, Earpods, smaller wearables), the only practical interface is voice. The other option is pairing the device with your mobile phone — which is not a smooth and intuitive process. Echo is already a great device for listening to music with its Spotify integration — just this feature is enough of a reason to buy it for most families.

    Conclusion

    Bots are useful and here to stay. I am not sure about the form or the distribution channel through which bots will become prevalent. In my opinion, bots are an additional interface to intelligence and application workflows. They are not disrupting any process or industry. Consumers will not shop more due to chat or voice interface bots, employees will not collaborate as desired due to bots, information discovery within your company will not improve due to bots. Actually, existing software and SaaS services are getting more intelligent, predictive and prescriptive. So this move towards “intelligent interfaces” is the real disruption.

    So my concluding predictions:

    • B2C chatbots will turn out to be mostly hype and very few practical scalable use-cases will emerge.
    • Voice bots will see increasing adoption due to smaller device sizes. IoT, wearables and music are excellent use-cases for voice based interfaces. Amazon’s Alexa will become the dominant platform for voice controlled apps and devices. Google and Microsoft will invest aggressively to take on Alexa.
    • B2B bots can be intelligent interfaces on software platforms and SaaS products. Or they can be agents that solve very specific vertical use-cases. I am most bullish about these enterprise focused bots which are helping enterprises become more productive or to increase efficiency with intelligent assistants for specific job functions.

    If you’d like to chat about anything related to this article, what tools we use to build bots, or anything else, get in touch.

    PS: Velotio is helping to bring the power of machine learning to enterprise software businesses. Click here to learn more about Velotio.