Category: Type

  • Building Type Safe Backend Apps with Typegoose and TypeGraphQL

    In this article, we will be trying to solve the most common problem encountered while trying to model MongoDB backend schema with TypeScript and Mongoose. We will also try to address and solve the difficulties of maintaining GraphQL types. 

    Almost every serious JavaScript developer uses TypeScript. However, many aged libraries do not support it natively, which becomes an increasing issue as the project grows. Then, if you add up GraphQL, which is a great modern API development solution, it becomes too much of a boilerplate.

    Prerequisites

    This article assumes that you have working knowledge of TypeScript, MongoDB, and GraphQL. We’ll be using Mongoose for specifying models, which is the go-to Object Document Mapper (ODM) solution for MongoDB.

    Let’s consider a basic example of a Mongoose model written in TypeScript. This might look something like the one mentioned below, a user model with basic model properties (email, first name, last name, and password):

    import { Document, Model, Schema } from "mongoose";
    import { db } from "../util/database";
    
    export interface IUserProps {
      email: string;
      firstName: string;
      lastName: string;
      password: string;
    }
    
    export interface IUserDocument extends IUserProps, Document {
    }
    
    export interface IUserModel extends Model<IUserDocument> {
      dateCreated: Date;
      lastUpdated: Date;
      hashPassword(password: string): string;
    }
    
    const UserSchema: Schema = new Schema(
      {
        email: {
          type: String,
          unique: true,
        },
        firstName: {
          type: String,
        },
        password: {
          type: String,
        },
      },
      { timestamps: true }
    );
    
    const hashPassword = (_password: string) => {
      // logic to hash passwords
    }
    
    UserSchema.method("hashPassword", hashPassword);
    
    export const User: IUserModel = db.model<IUserDocument, IUserModel>(
      "User",
      UserSchema
    );

    As you can see, it would be cumbersome to add and maintain interfaces manually with Mongoose. We would need at least 2-3 interfaces to occupy the typing needs to get model properties and methods working with proper typing.

    Moving forward to add our queries and mutations, we need to create resolvers for the model above, assuming we have a service that deals with models. Here’s what our resolver looks like:

    import { ObjectId } from 'bson';
    import { IResolvers } from 'graphql-tools';
    import { IUserProps } from './user.model';
    import { UserService } from './user.service';
    
    const userService = new UserService();
    export const userResolvers: IResolvers = {
      Query: {
        User: (_root: unknown, args: { id: ObjectId }) => userService.get(args.id),
        //...
      },
      Mutation: {
        createUser: async (_root: unknown, args: IUserProps) => await userService.create(args),
        //...
      }
    };

    Not bad, we got our model and service and the resolver also looks good. But wait, we need to add GraphQL types as well. Here we are intentionally not including inputs to keep it short. Let’s do that:

    type Query {
      User(id: String): User
    }
    
    type Mutation {
      createUser(
        email: String,
        firstName: String,
        lastName: String,
        password: String,
      ): User
    }
    
    type User {
      id: String!
      email: String!
      firstName: String!
      lastName: String!
      password: String!
    }

    Now, we have to club the schemas and resolvers together then pass them onto the GraphQL Express server—Apollo Server in this case:

    import * as path from 'path';
    import * as fs from 'fs';
    import { ApolloServer } from 'apollo-server'
    import { makeExecutableSchema }  from 'graphql-tools';
    import { resolvers } from './src/resolvers';
    
    const userSchema = path.join(__dirname, 'src/user/user.schema.graphql');
    const schemaDef = fs.readFileSync(userSchema, 'utf8');
    
    const schema = makeExecutableSchema({ typeDefs: schemaDef });
    
    const server = new ApolloServer({ schema, resolvers });
    
    server.listen().then(({ url }) => {
      console.log(`🚀 Server ready at ${url}`);
    });

    With this setup, we got four files per model: model, resolver, service, and GraphQL schema file.

    That’s too many things to keep in sync in real life. Imagine you need to add a new property to the above model after reaching production. You’ll end up doing at least following:

    1. Add a migration to sync the DB
    2. Update the interfaces
    3. Update the model schema
    4. Update the GraphQL schema

    Possible Solution

    As we know, after this setup, we’re mostly dealing with the entity models and struggling to keep its types and relations in sync.

    If the model itself can handle it somehow, we can definitely save some effort, which  means we can sort things out if these entity model classes can represent both the database schema and its types.

    Adding TypeGoose

    Mongoose schema declarations with TypeScript can get tricky—or there might be a better way. Let’s add TypeGoose, so you no longer have to maintain interfaces (arguably). Here’s what the same user model looks like:

    import { DocumentType, getModelForClass, prop as Property } from '@typegoose/typegoose';
    import { getSchemaOptions } from 'src/util/typegoose';
    import { Field as GqlField, ObjectType as GqlType } from 'type-graphql';
    
    export class User {
      readonly _id: string;
    
      @Property({ required: true })
      firstName: string;
    
      @Property({ required: false })
      lastName: string;
    
      @Property({ required: true })
      password: string;
    
      @Property({ required: true, unique: true })
      email: string;
    
      hashPassword(this: DocumentType<User>, _password: string) {
        // logic to hash passwords
      }
    }

    Alright, no need for adding interfaces for the model and documents. You could have an interface for model implementation, but it’s not necessary.

    With Reflect, which is used internally by TypeGoose, we managed to skip the need for additional interfaces.

    If we want to add custom validations and messages, TypeGoose allows us to do that too. The prop decorator offers almost all the things you can expect from a mongoose model schema definition.

    @Property({ required: false, unique: true })

    Adding TypeGraphQL

    Alright, TypeGoose has helped us with handling mongoose schema smoothly. But, we still need to define types for GraphQL. Also, we need to update the model types whenever we change our models. 

    Let’s add TypeGraphQL

    import { DocumentType, getModelForClass, prop as Property } from '@typegoose/typegoose';
    import { getSchemaOptions } from 'src/util/typegoose';
    import { Field as GqlField, ObjectType as GqlType } from 'type-graphql';
    
    @GqlType()
    export class User {
      @GqlField(_type => String)
      readonly _id: string;
    
      @GqlField(_type => String)
      @Property({ required: true })
      firstName: string;
    
      @GqlField(_type => String, { nullable: true })
      @Property({ required: false })
      lastName: string;
    
      @GqlField(_type => String)
      @Property({ required: true })
      password: string;
    
      @GqlField(_type => String)
      @Property({ required: true, unique: true })
      email: string;
    
      hashPassword(this: DocumentType<User>, _password: string) {
        // logic to hash passwords
      }
    }

    What we just did is use the same TypeScript user class to define the schema as well as its GraphQL type—pretty neat.

    Because we have added TypeGraphQL, our resolvers no longer need extra interfaces. We can add input classes for parameter types. Consider common input types such as CreateInput, UpdateInput, and FilterInput.

    import { Arg, Ctx, Mutation, Resolver } from 'type-graphql';
    import { User } from './user.model';
    import { UserService } from './user.service';
    
    @Resolver(_of => User)
    
    export class UserResolver {
      private __userService: UserService;
    
      constructor() {
        this.__userService = new UserService();    
      }
    
      @Mutation(_returns => User)
      async createUser(@Arg('data', type => UserCreateInput) data: UserCreateInput, @Ctx() ctx: any) {
       return this.__userService.create(data)
      }
    }

    You can learn more about the syntax and input definition in the official docs.

    That’s it. We are ready with our setup, and we can now simply build a schema and pass it to the server entry point just like that. No need to import schema files and merge resolvers. Simply pass array of resolvers to buildSchema:

    import {ApolloServer} from 'apollo-server'
    
    import { resolvers } from './src/resolvers';
    import { buildSchema }  from 'type-graphql';
    
    const schema = buildSchema({
      resolvers,
    });
    
    const server = new ApolloServer({ schema, resolvers });
    
    server.listen().then(({ url }) => {
      console.log(`🚀 Server ready at ${url}`);
    });

    Once implemented, this is how our custom demo project architecture might look:

    Fig:- Application Architecture

    Limitations and Alternatives

    Though these packages save some work for us, one may decide not to go for them since they use experimental features such as experimental decorators. However, the acceptance of these experimental features is growing.

    TypeGoose:

    Though TypeGoose offers a great extension to Mongoose, they’ve recently introduced some breaking changes. Upgrading from recent versions might be a risk. One alternative to TypeGoose for decorator-based schema definitions is TypeORM. Though, it currently has basic experimental support for MongoDB. 

    TypeGraphQL:

    TypeGraphQL is a well-maintained library. There are other options, like Nest.js and graphql-schema-decorators, which supports decorators for GraphQL schema. 

    However, as Nest.js’s GraphQL support is more framework-oriented, it might be more than needed. The other one is not supported any longer. You can even integrate TypeGraphQL with Nest.js with some caveats.

    Conclusion

    Unsurprisingly, both of these libraries use experimental decorators API with Reflect Metadata. Reflect Metadata adds additional metadata support to the class and its members. The concept might look innovative but it’s nothing new. Languages like C# and Java support attributes or annotations that add metadata to types. With these added, it becomes handy to create and maintain well-typed applications.

    One thing to note here would be—though the article introduces the benefits of using TypeGraphQL and TypeGoose together—it does not mean you can’t use them separately. Depending upon your requirements, you may use either of the tools or a combination of them.

    This article covers a very basic setup for introduction of the mentioned technologies. You might want to learn more about advanced real-life needs with these tools and techniques from some of the articles mentioned below.

    Further Reading

    You can find the referenced code at this repo.

  • Building an Intelligent Chatbot Using Botkit and Rasa NLU

    Introduction

    Bots are the flavor of the season. Everyday, we hear about a new bot catering to domains like travel, social, legal, support, sales, etc. being launched. Facebook Messenger alone has more than 11,000 bots when I last checked and must have probably added thousands of them as I write this article.

    The first generation of bots were dumb since they could understand only a limited set of queries based on keywords in the conversation. But the commoditization of NLP(Natural Language Processing) and machine learning by services like Wit.ai, API.ai, Luis.ai, Amazon Lex, IBM Watson, etc. has resulted in the growth of intelligent bots like donotpay, chatShopper. I don’t know if bots are just hype or the real deal. But I can say with certainty that building a bot is fun and challenging at the same time. In this article, I would like to introduce you to some of the tools to build an intelligent chatbot.

    The title of the blog clearly tells that we have used Botkit and Rasa (NLU) to build our bot. Before getting into the technicalities, I would like to share the reason for choosing these two platforms and how they fit our use case. Also read – How to build a serverless chatbot with Amazon Lex.

    Bot development Framework — Howdy, Botkit and Microsoft (MS) Bot Framework were good contenders for this. Both these frameworks:
    – are open source
    – have integrations with popular messaging platforms like Slack, Facebook Messenger, Twilio etc
    – have good documentation
    – have an active developer community

    Due to compliance issues, we had chosen AWS to deploy all our services and we wanted the same with the bot as well.

    NLU (Natural Language Understanding) — API.ai (acquired by google) and Wit.ai (acquired by Facebook) are two popular NLU tools in the bot industry which we first considered for this task. Both the solutions:
    – are hosted as a cloud service
    – have Nodejs, Python SDK and a REST interface
    – have good documentation
    – support for state or contextual intents which makes it very easy to build a conversational platform on top of it.

    As stated before, we couldn’t use any of these hosted solutions due to compliance and that is where we came across an open source NLU called Rasa which was a perfect replacement for API.ai and Wit.ai and at the same time, we could host and manage it on AWS.

    You would now be wondering why I used the term NLU for Api.ai and Wit.ai and not NLP (Natural Language Processing). 
    * NLP refers to all the systems which handle the interactions with humans in the way humans find it natural. It means that we could converse with a system just the way we talk to other human beings. 
    * NLU is a subfield of NLP which handles a narrow but complex challenge of converting unstructured inputs into a structured form which a machine can understand and act upon. So when you say “Book a hotel for me in San Francisco on 20th April 2017”, the bot uses NLU to extract
    date=20th April 2017, location=San Francisco and action=book hotel
    which the system can understand.

    RASA NLU

    In this section, I would like to explain Rasa in detail and some terms used in NLP which you should be familiar with.
    * Intent: This tells us what the user would like to do. 
    Ex :  Raise a complaint, request for refund etc

    * Entities: These are the attributes which gives details about the user’s task. Ex — Complaint regarding service disruptions, refund cost etc

    * Confidence Score : This is a distance metric which indicates how closely the NLU could classify the result into the list of intents.

    Here is an example to help you understand the above mentioned terms — 
    Input: “My internet isn’t working since morning”.
        –  intent: 
          “service_interruption” 
         – entities: “service=internet”, 
          “duration=morning”.
         – confidence score: 0.84 (This could vary based on your training)

    NLU’s job (Rasa in our case) is to accept a sentence/statement and give us the intent, entities and a confidence score which could be used by our bot. Rasa basically provides a high level API over various NLP and ML libraries which does intent classification and entity extraction. These NLP and ML libraries are called as backend in Rasa which brings the intelligence in Rasa. These are some of the backends used with Rasa

    • MITIE — This is an all inclusive library meaning that it has NLP library for entity extraction as well as ML library for intent classification built into it.
    • spaCy + sklearn — spaCy is a NLP library which only does entity extraction. sklearn is used with spaCy to add ML capabilities for intent classification.
    • MITIE + sklearn — This uses best of both the worlds. This uses good entity recognition available in MITIE along with fast and good intent classification in sklearn.

    I have used MITIE backend to train Rasa. For the demo, I’ve taken a “Live Support ChatBot” which is trained for messages like this:
    * My phone isn’t working.
    * My phone isn’t turning on.
    * My phone crashed and isn’t working anymore.

    My training data looks like this:

    {
      "rasa_nlu_data": {
        "common_examples": [
        {
            "text": "hi",
            "intent": "greet",
            "entities": []
          },
          {
            "text": "my phone isn't turning on.",
            "intent": "device_failure",
            "entities": [
              {
                "start": 3,
                "end": 8,
                "value": "phone",
                "entity": "device"
              }
            ]
          },
          {
            "text": "my phone is not working.",
            "intent": "device_failure",
            "entities": [
              {
                "start": 3,
                "end": 8,
                "value": "phone",
                "entity": "device"
              }
            ]
          },
          {
            "text": "My phone crashed and isn’t working anymore.",
            "intent": "device_failure",
            "entities": [
              {
                "start": 3,
                "end": 8,
                "value": "phone",
                "entity": "device"
              }
            ]
          }
        ]
      }
    }

    NOTE — We have observed that MITIE gives better accuracy than spaCy + sklearn for a small training set but as you keep adding more intents, training on MITIE gets slower and slower. For a training set of 200+ examples with about 10–15 intents, MITIE takes about 35–45 minutes for us to train on a C4.4xlarge instance(16 cores, 30 GB RAM) on AWS.

    Botkit-Rasa Integration

    Botkit is an open source bot development framework designed by the creators of Howdy. It basically provides a set of tools for building bots on Facebook Messenger, Slack, Twilio, Kik and other popular platforms. They have also come up with an IDE for bot development called Botkit Studio. To summarize, Botkit is a tool which allows us to write the bot once and deploy it on multiple messaging platforms.

    Botkit also has a support for middleware which can be used to extend the functionality of botkit. Integrations with database, CRM, NLU and statistical tools are provided via middleware which makes the framework extensible. This design also allows us to easily add integrations with other tools and software by just writing middleware modules for them.

    I’ve integrated Slack and botkit for this demo. You can use this boilerplate template to setup botkit for Slack. We have extended Botkit-Rasa middleware which you can find here.

    Botkit-Rasa has 2 functions: receive and hears which override the default botkit behaviour.
    1. receive — This function is invoked when botkit receives a message. It sends the user’s message to Rasa and stores the intent and entities into the botkit message object.

    2. hears — This function overrides the default botkit hears method i.e controller.hears. The default hears method uses regex to search the given patterns in the user’s message while the hears method from Botkit-Rasa middleware searches for the intent.

    let Botkit = require('botkit');
    let rasa = require('./Middleware/rasa')({rasa_uri: 'http://localhost:5000'});
    
    let controller = Botkit.slackbot({
      clientId: process.env.clientId,
      clientSecret: process.env.clientSecret,
      scopes: ['bot'],
      json_file_store: __dirname + '/.db/'
    });
    
    // Override receive method in botkit
    controller.middleware.receive.use(rasa.receive);
    
    // Override hears method in botkit
    controller.changeEars(function (patterns, message) {
      return rasa.hears(patterns, message);
    });
    
    controller.setupWebserver(3000, function (err, webserver) {
      // Configure a route to receive webhooks from slack
      controller.createWebhookEndpoints(webserver);
    });

    Let’s try an example — my phone is not turning on”.
    Rasa will return the following
    1. Intent — device_failure
    2. Entites — device=phone

    If you notice carefully, the input I gave i.e my phone is not turning on” is a not present in my training file. Rasa has some intelligence built into it to identify the intent and entities correctly for such combinations. 

    We need to add a hears method listening to intent “device_failure” to process this input. Remember that intent and entities returned by Rasa will be stored in the message object by Rasa-Botkit middleware.

    let Botkit = require('botkit');
    let rasa = require('./Middleware/rasa')({rasa_uri: 'http://localhost:5000'});
    
    let controller = Botkit.slackbot({
      clientId: process.env.clientId,
      clientSecret: process.env.clientSecret,
      scopes: ['bot'],
      json_file_store: __dirname + '/.db/'
    });
    
    // Override receive method in botkit
    controller.middleware.receive.use(rasa.receive);
    
    // Override hears method in botkit
    controller.changeEars(function (patterns, message) {
      return rasa.hears(patterns, message);
    });
    
    controller.setupWebserver(3000, function (err, webserver) {
      // Configure a route to receive webhooks from slack
      controller.createWebhookEndpoints(webserver);
    });

    You should be able run this bot with slack and see the output as shown below (support_bot is the name of my bot).

    Conclusion

    You are now familiar with the process of building chatbots with a bot development framework and a NLU. Hope this helps you get started on your bot very quickly. If you have any suggestions, questions, feedback then tweet me @harjun1601. Keep following our blogs for more articles on bot development, ML and AI.

  • Automating test cases for text-messaging (SMS) feature of your application was never so easy

    Almost all the applications that you work on or deal with throughout the day use SMS (short messaging service) as an efficient and effective way to communicate with end users.

    Some very common use-cases include: 

    • Receiving an OTP for authenticating your login 
    • Getting deals from the likes of Flipkart and Amazon informing you regarding the latest sale.
    • Getting reminder notifications for the doctor’s appointment that you have
    • Getting details for your debit and credit transactions.

    The practical use cases for an SMS can be far-reaching. 

    Even though SMS integration forms an integral part of any application, due to the limitations and complexities involved in automating it via web automation tools like selenium, these are often neglected to be automated.

    Teams often opt for verifying these sets of test cases manually, which, even though is important in getting bugs earlier, it does pose some real-time challenges.

    Pitfalls with Manual Testing

    With these limitations, you obviously do not want your application sending faulty Text Messages after that major Release.

    Automation Testing … #theSaviour ‍

    To overcome the limitations of manual testing, delegating your task to a machine comes in handy.

    Now that we have talked about the WHY, we will look into HOW the feature can be automated.
    Technically, you shouldn’t / can’t use selenium to read the SMS via mobile.
    So, we were looking for a third-party library that is 

    • Easy to integrate with the existing code base
    • Supports a range of languages 
    • Does not involve highly complex codes and focuses on the problem at hand
    • Supports both incoming and outgoing messages

    After a lot of research, we settled with Twilio.

    In this article, we will look at an example of working with Twilio APIs to Read SMS and eventually using it to automate SMS flows.

    Twilio supports a bunch of different languages. For this article, we stuck with Node.js

    Account Setup

    Registration

    To start working with the service, you need to register.

    Once that is done, Twilio will prompt you with a bunch of simple questions to understand why you want to use their service.

    Twilio Dashboard

    A trial balance of $15.50 is received upon signing up for your usage. This can be used for sending and receiving text messages. A unique Account SID and Auth Token is also generated for your account.

    ‍Buy a Number


    Navigate to the buy a number link under Phone Numbers > Manage and purchase a number that you would eventually be using in your automation scripts for receiving text messages from the application.

    Note – for the free trial, Twilio does not support Indian Number (+91)

    Code Setup

    Install Twilio in your code base

     

    Code snippet

    For simplification,
    Just pass in the accountSid and authToken that you will receive from the Dashboard Console to the twilio library.This would return you with a client object containing the list of all the messages in your inbox.

    const accountSid = 'AC13fb4ed9a621140e19581a14472a75b0'
    const authToken = 'fac9498ac36ac29e8dae647d35624af7'
    const client = require('twilio')(accountSid, authToken)
    let messageBody
    let messageContent
    let sentFrom
    let sentTo
    let OTP
    describe('My Login application', () => {
      it('Read Text Message', () => {
        const username = $('#login_field');
        const pass = $('#password');
        const signInBtn = $('input[type="submit"]');
        const otpField = $('#otp');
        const verifyBtn = $(
          'form[action="/sessions/two-factor"] button[type="submit"]'
        );
        browser.url('https://github.com/login');
        username.setValue('your_email@mail.com');
        pass.setValue('your_pass123');
        signInBtn.click();
        // Get Message ...
        const latestMsg = await client.messages.list({ limit: 1 })
        
        messageContent = JSON.stringify(latestMsg,null,"\t")
        messageBody = JSON.stringify(latestMsg.body)
        sentFrom = JSON.stringify(latestMsg.from)
        sentTo = JSON.stringify(latestMsg.to)
        OTP = JSON.stringify(latestMsg.body.match(/\d+/)[0])
        otpField.setValue(OTP);
        verifyBtn.click();
        expect(browser).toHaveUrl('https://github.com/');
      });
    })

    List of other APIs to read an SMS provided by Twilio

    List all messages: Using this API Here you can see how to retrieve all messages from your account.

    const accountSid = process.env.TWILIO_ACCOUNT_SID;
    const authToken = process.env.TWILIO_AUTH_TOKEN;
    const client = require('twilio')(accountSid, authToken);
    
    client.messages.list({limit: 20})
                   .then(messages => messages.forEach(m => console.log(m.sid)));

    List Messages matching filter criteria: If you’d like to have Twilio narrow down this list of messages for you, you can do so by specifying a To number, From the number, and a DateSent.

    const accountSid = process.env.TWILIO_ACCOUNT_SID;
    const authToken = process.env.TWILIO_AUTH_TOKEN;
    const client = require('twilio')(accountSid, authToken);
    
    client.messages
          .list({
             dateSent: new Date(Date.UTC(2016, 7, 31, 0, 0, 0)),
             from: '+15017122661',
             to: '+15558675310',
             limit: 20
           })
          .then(messages => messages.forEach(m => console.log(m.sid)));

    Get a Message : If you know the message SID (i.e., the message’s unique identifier), then you can retrieve that specific message directly. Using this method, you can send emails without attachments.

    const accountSid = process.env.TWILIO_ACCOUNT_SID;
    const authToken = process.env.TWILIO_AUTH_TOKEN;
    const client = require('twilio')(accountSid, authToken);
    
    client.messages('MM800f449d0399ed014aae2bcc0cc2f2ec')
          .fetch()
          .then(message => console.log(message.to));

    Delete a message : If you want to delete a message from history, you can easily do so by deleting the Message instance resource.

    const accountSid = process.env.TWILIO_ACCOUNT_SID;
    const authToken = process.env.TWILIO_AUTH_TOKEN;
    const client = require('twilio')(accountSid, authToken);
    
    client.messages('MM800f449d0399ed014aae2bcc0cc2f2ec').remove();

    Limitations with a Trial Twilio Account

    • The trial version does not support Indian numbers (+91).
    • The trial version just provides an initial balance of $15.50.
      This is sufficient enough for your use case that involves only receiving messages on your Twilio number. But if the use case requires sending back the message from the Twilio number, a paid version can solve the purpose.
    • Messages sent via a short code (557766) are not received on the Twilio number.
      Only long codes are accepted in the trial version.
    • You can buy only a single number with the trial version. If purchasing multiple numbers is required, the user may have to switch to a paid version.

    Conclusion

    In a nutshell, we saw how important it is to thoroughly verify the SMS functionality of our application since it serves as one of the primary ways of communicating with the end users.
    We also saw what the limitations are with following the traditional manual testing approach and how automating SMS scenarios would help us deliver high-quality products.
    Finally, we demonstrated a feasible, efficient and easy-to-use way to Automate SMS test scenarios using Twilio APIs.

    Hope this was a useful read and that you will now be able to easily automate SMS scenarios.
    Happy testing… Do like and share …

  • How to build High-Performance Flutter Apps using Streams

    Performance is a significant factor for any mobile app, and multiple factors like architecture, logic, memory management, etc. cause low performance. When we develop an app in Flutter, the initial performance results are very good, but as development progresses, the negative effects of a bad codebase start showing up.  This blog is aimed at using an architecture that improves Flutter app performance. We will briefly touch base on the following points:

    1. What is High-Performance Architecture?

    1.1. Framework

    1.2. Motivation

    1.3. Implementation

    2. Sample Project

    3. Additional benefits

    4. Conclusion

    1. What is High-Performance Architecture?

    This Architecture uses streams instead of the variable-based state management approach. Streams are the most preferred approach for scenarios in which an app needs data in real-time. Even with these benefits, why are streams not the first choice for developers? One of the reasons is that streams are considered difficult and complicated, but that reputation is slightly overstated. 
    Dart is a programming language designed to have a reactive style system, i.e., architecture, with observable streams, as quoted by Flutter’s Director of Engineering, Eric Seidel in this podcast. [Note: The podcast’s audio is removed but an important part which is related to this architecture can be heard here in Zaiste’s youtube video.] 

    1.1. Framework: 

      Figure 01

    As shown in figure 01, we have 3 main components:

    • Supervisor: The Supervisor wraps the complete application, and is the Supervise responsible for creating the singleton of all managers as well as providing this manager’s singleton to the required screen.
    • Managers: Each Manager has its own initialized streams that any screen can access by accessing the respective singleton. These streams can hold data that we can use anywhere in the application. Plus, as we are using streams, any update in this data will be reflected everywhere at the same time.
    • Screens: Screens will be on the receiver end of this project. Each screen uses local streams for its operations, and if global action is required, then accesses streams from managers using a singleton.

    1.2. Motivation:

    Zaiste proposed an idea in 2019 and created a plugin for such architecture. He named it “Sprinkel Architecture” and his plugin is called sprinkle which made our development easy to a certain level. But as of today, his plugin does not support new null safety features introduced in Dart 2.12.0. You can check more about his implementation here and can try his given sample with following command:

    flutter run -–no-sound-null-safety

    1.3. Implementation:

    We will be using the get plugin and rxdart plugins in combination to create our high performance architecture.

    The Rxdart plugin will handle the stream creation and manipulation, whereas the get plugin can help us in dependency injection, route management, as well as state management.

    2. Sample Project:

    We will create a sample project to understand how to implement this architecture.

    2.1. Create a project using following command:

    flutter create sprinkle_architecture

    2.2. Add these under dependencies of pubspec.yaml (and run command flutter pub get):

    get: ^4.6.5
    Rxdart: ^0.27.4

    2.3. Create 3 directories, constants, managers, and views, inside the lib directory:

    2.4. First, we will start with a manager who will have streams & will increment the counter. Create dart file with name counter_manager.dart under managers directory:

    import 'package:get/get.dart';
    
    class CounterManager extends GetLifeCycle {
        final RxInt count = RxInt(0);
        int get getCounter => count.value;
        void increment() => count.value = count.value + 1;
    } 

    2.5. With this, we have a working manager. Next, we’ll create a Supervisor who will create a singleton of all available managers. In our case, we’ll create a singleton of only one manager. Create a supervisor.dart file in the lib directory:

    import 'package:get/get.dart';
    import 'package:sprinkle_architecture/managers/counter_manager.dart';
    
    abstract class Supervisor {
     static Future<void> init() async {
       _initManagers();
     }
    
     static void _initManagers() {
       Get.lazyPut<CounterManager>(() => CounterManager());
     }
    }

    2.6. This application only has 1 screen, but it is a good practice to create constants related to routing, so let’s add route details. Create a dart file route_paths.dart:

    abstract class RoutePaths {
      static const String counterPage = '/';
    }

    2.7. And route_pages.dart under constants directory:

    import 'package:get/get.dart';
    import 'package:sprinkle_architecture_exp/constants/route_paths.dart';
    import 'package:sprinkle_architecture_exp/views/counter_page.dart';
    
    abstract class RoutePages {
     static final List<GetPage<dynamic>> pages = <GetPage<dynamic>>[
       GetPage<void>(
         name: RoutePaths.counterPage,
         page: () => const CounterPage(title: 'Flutter Demo Home Page'),
         binding: CounterPageBindings(),
       ),
     ];
    }
    
    class CounterPageBindings extends Bindings {
     @override
     void dependencies() => Get.lazyPut<CounterManager>(() => CounterManager());
    }

    2.8. Now, we have a routing constant that we can use. But do not have a CounterPage Class. But before creating this class, let’s update our main file:

    import 'package:flutter/material.dart';
    import 'package:get/get_navigation/src/root/get_material_app.dart';
    import 'package:sprinkle_architecture_exp/constants/route_pages.dart';
    import 'package:sprinkle_architecture_exp/constants/route_paths.dart';
    import 'package:sprinkle_architecture_exp/supervisor.dart';
    
    void main() {
     WidgetsFlutterBinding.ensureInitialized();
     Supervisor.init();
     runApp(
       GetMaterialApp(
         initialRoute: RoutePaths.counterPage,
         getPages: RoutePages.pages,
       ),
     );
    }

    2.9. Finally, add the file counter_page_controller.dart:

    import 'package:get/get.dart';
    import 'package:sprinkle_architecture_exp/managers/counter_manager.dart';
    
    class CounterPageController extends GetxController {
     final CounterManager manager = Get.find();
    }

    2.10. As well as our landing page  counter_page.dart:

    import 'package:flutter/material.dart';
    import 'package:flutter/widgets.dart';
    import 'package:get/get.dart';
    import 'package:sprinkle_architecture_exp_2/views/counter_page_controller.dart';
    
    class CounterPage extends GetWidget<CounterPageController> {
     const CounterPage({Key? key, required this.title}) : super(key: key);
     final String title;
    
     CounterPageController get c => Get.put(CounterPageController());
    
     @override
     Widget build(BuildContext context) {
       return Obx(() {
         return Scaffold(
           appBar: AppBar(title: Text(title)),
           body: Center(
             child: Column(
               mainAxisAlignment: MainAxisAlignment.center,
               children: <Widget>[
                 const Text('You have pushed the button this many times:'),
                 Text('${c.manager.getCounter}',
                     style: Theme.of(context).textTheme.headline4),
               ],
             ),
           ),
           floatingActionButton: FloatingActionButton(
             onPressed: c.manager.increment,
             tooltip: 'Increment',
             child: const Icon(Icons.add),
           ),
         );
       });
     }
    }

    2.11. The get plugin allows us to add 1 controller per screen by using the GetxController class. In this controller, we can do operations whose scope is limited to our screen. Here, CounterPageController provides CounterPage the singleton on CounterManger.

    If everything is done as per the above commands, we will end up with the following tree structure:

    2.12. Now we can test our project by running the following command:

    flutter run

    3. Additional Benefits:

    3.1. Self Aware UI:

    As all managers in our application are using streams to share data, whenever a screen changes managers’ data, the second screens with dependency on that data also update themselves in real-time. This will happen because of the listen() property of streams. 

    3.2. Modularization:

    We have separate managers for handling REST APIs, preferences, appStateInfo, etc. So, the modularization happens automatically. Plus UI logic gets separate from business logic as we are using getXController provided by the get plugin

    3.3. Small rebuild footprint:

    By default, Flutter rebuilds the whole widget tree for updating the UI but with the get and rxdart plugins, only the dependent widget refreshes itself.

    4. Conclusion

    We can achieve good performance of a Flutter app with an appropriate architecture as discussed in this blog. 

  • Creating GraphQL APIs Using Elixir Phoenix and Absinthe

    Introduction

    GraphQL is a new hype in the Field of API technologies. We have been constructing and using REST API’s for quite some time now and started hearing about GraphQL recently. GraphQL is usually described as a frontend-directed API technology as it allows front-end developers to request data in a more simpler way than ever before. The objective of this query language is to formulate client applications formed on an instinctive and adjustable format, for portraying their data prerequisites as well as interactions.

    The Phoenix Framework is running on Elixir, which is built on top of Erlang. Elixir core strength is scaling and concurrency. Phoenix is a powerful and productive web framework that does not compromise speed and maintainability. Phoenix comes in with built-in support for web sockets, enabling you to build real-time apps.

    Prerequisites:

    1. Elixir & Erlang: Phoenix is built on top of these
    2. Phoenix Web Framework: Used for writing the server application. (It’s a well-unknown and lightweight framework in elixir) 
    3. Absinthe: GraphQL library written for Elixir used for writing queries and mutations.
    4. GraphiQL: Browser based GraphQL ide for testing your queries. Consider it similar to what Postman is used for testing REST APIs.

    Overview:

    The application we will be developing is a simple blog application written using Phoenix Framework with two schemas User and Post defined in Accounts and Blog resp. We will design the application to support API’s related to blog creation and management. Assuming you have Erlang, Elixir and mix installed.

    Where to Start:

    At first, we have to create a Phoenix web application using the following command:

    mix phx.new  --no-brunch --no-html

    –no-brunch – do not generate brunch files for static asset building. When choosing this option, you will need to manually handle JavaScript  dependencies if building HTML apps

    • –-no-html – do not generate HTML views.

    Note: As we are going to mostly work with API, we don’t need any web pages, HTML views and so the command args  and

    Dependencies:

    After we create the project, we need to add dependencies in mix.exs to make GraphQL available for the Phoenix application.

    defp deps do
    [
    {:absinthe, "~> 1.3.1"},
    {:absinthe_plug, "~> 1.3.0"},
    {:absinthe_ecto, "~> 0.1.3"}
    ]
    end

    Structuring the Application:

    We can used following components to design/structure our GraphQL application:

    1. GraphQL Schemas : This has to go inside lib/graphql_web/schema/schema.ex. The schema definitions your queries and mutations.
    2. Custom types: Your schema may include some custom properties which should be defined inside lib/graphql_web/schema/types.ex

    Resolvers: We have to write respective Resolver Function’s that handles the business logic and has to be mapped with respective query or mutation. Resolvers should be defined in their own files. We defined it inside lib/graphql/accounts/user_resolver.ex and lib/graphql/blog/post_resolver.ex folder.

    Also, we need to uppdate the router we have to be able to make queries using the GraphQL client in lib/graphql_web/router.ex and also have to create a GraphQL pipeline to route the API request which also goes inside lib/graphql_web/router.ex:

    pipeline :graphql do
    	  plug Graphql.Context  #custom plug written into lib/graphql_web/plug/context.ex folder
    end
    
    scope "/api" do
      pipe_through(:graphql)  #pipeline through which the request have to be routed
    
      forward("/",  Absinthe.Plug, schema: GraphqlWeb.Schema)
      forward("/graphiql", Absinthe.Plug.GraphiQL, schema: GraphqlWeb.Schema)
    end

    Writing GraphQL Queries:

    Lets write some graphql queries which can be considered to be equivalent to GET requests in REST. But before getting into queries lets take a look at GraphQL schema we defined and its equivalent resolver mapping:

    defmodule GraphqlWeb.Schema do
      use Absinthe.Schema
      import_types(GraphqlWeb.Schema.Types)
    
      query do
        field :blog_posts, list_of(:blog_post) do
          resolve(&Graphql.Blog.PostResolver.all/2)
        end
    
        field :blog_post, type: :blog_post do
          arg(:id, non_null(:id))
          resolve(&Graphql.Blog.PostResolver.find/2)
        end
    
        field :accounts_users, list_of(:accounts_user) do
          resolve(&Graphql.Accounts.UserResolver.all/2)
        end
    
        field :accounts_user, :accounts_user do
          arg(:email, non_null(:string))
          resolve(&Graphql.Accounts.UserResolver.find/2)
        end
      end
    end

    You can see above we have defined four queries in the schema. Lets pick a query and see what goes into it :

    field :accounts_user, :accounts_user do
    arg(:email, non_null(:string))
    resolve(&Graphql.Accounts.UserResolver.find/2)
    end

    Above, we have retrieved a particular user using his email address through Graphql query.

    1. arg(:, ): defines an non-null incoming string argument i.e user email for us.
    2. Graphql.Accounts.UserResolver.find/2 : the resolver function that is mapped via schema, which contains the core business logic for retrieving an user.
    3. Accounts_user : the custome defined type which is defined inside lib/graphql_web/schema/types.ex as follows:
    object :accounts_user do
    field(:id, :id)
    field(:name, :string)
    field(:email, :string)
    field(:posts, list_of(:blog_post), resolve: assoc(:blog_posts))
    end

    We need to write a separate resolver function for every query we define. Will go over the resolver function for accounts_user which is present in lib/graphql/accounts/user_resolver.ex file:

    defmodule Graphql.Accounts.UserResolver do
      alias Graphql.Accounts                    #import lib/graphql/accounts/accounts.ex as Accounts
    
      def all(_args, _info) do
        {:ok, Accounts.list_users()}
      end
    
      def find(%{email: email}, _info) do
        case Accounts.get_user_by_email(email) do
          nil -> {:error, "User email #{email} not found!"}
          user -> {:ok, user}
        end
      end
    end

    This function is used to list all users or retrieve a particular user using an email address. Let’s run it now using GraphiQL browser. You need to have the server running on port 4000. To start the Phoenix server use:

    mix deps.get #pulls all the dependencies
    mix deps.compile #compile your code
    mix phx.server #starts the phoenix server

    Let’s retrieve an user using his email address via query:

    Above, we have retrieved the id, email and name fields by executing accountsUser query with an email address. GraphQL also allow us to define variables which we will show later when writing different mutations.

    Let’s execute another query to list all blog posts that we have defined:

     Writing GraphQL Mutations:

    Let’s write some GraphQl mutations. If you have understood the way graphql queries are written mutations are much simpler and similar to queries and easy to understand. It is defined in the same form as queries with a resolver function. Different mutations we are gonna write are as follow:

    1. create_post:- create a new blog post
    2. update_post :- update a existing blog post
    3. delete_post:- delete an existing blog post

    The mutation looks as follows:

    defmodule GraphqlWeb.Schema do
      use Absinthe.Schema
      import_types(GraphqlWeb.Schema.Types)
    
      query do
        mutation do
          field :create_post, type: :blog_post do
            arg(:title, non_null(:string))
            arg(:body, non_null(:string))
            arg(:accounts_user_id, non_null(:id))
    
            resolve(&Graphql.Blog.PostResolver.create/2)
          end
    
          field :update_post, type: :blog_post do
            arg(:id, non_null(:id))
            arg(:post, :update_post_params)
    
            resolve(&Graphql.Blog.PostResolver.update/2)
          end
    
          field :delete_post, type: :blog_post do
            arg(:id, non_null(:id))
            resolve(&Graphql.Blog.PostResolver.delete/2)
          end
        end
    
      end
    end

    Let’s run some mutations to create a post in GraphQL:

    Notice the method is POST and not GET over here.

    Let’s dig into update mutation function :

    field :update_post, type: :blog_post do
    arg(:id, non_null(:id))
    arg(:post, :update_post_params)
    
    resolve(&Graphql.Blog.PostResolver.update/2)
    end

    Here, update post takes two arguments as input ,  non null id and a post parameter of type update_post_params that holds the input parameter values to update. The mutation is defined in lib/graphql_web/schema/schema.ex while the input parameter values are defined in lib/graphql_web/schema/types.ex —

    input_object :update_post_params do
    field(:title, :string)
    field(:body, :string)
    field(:accounts_user_id, :id)
    end

    The difference with previous type definitions is that it’s defined as input_object instead of object.

    The corresponding resolver function is defined as follows :

    def update(%{id: id, post: post_params}, _info) do
    case find(%{id: id}, _info) do
    {:ok, post} -> post |> Blog.update_post(post_params)
    {:error, _} -> {:error, "Post id #{id} not found"}
    end
    end

         

    Here we have defined a query parameter to specify the id of the blog post to be updated.

    Conclusion

    This is all you need, to write a basic GraphQL server for any Phoenix application using Absinthe.  

    References:

    1. https://www.howtographql.com/graphql-elixir/0-introduction/
    2. https://pragprog.com/book/wwgraphql/craft-graphql-apis-in-elixir-with-absinthe
    3. https://itnext.io/graphql-with-elixir-phoenix-and-absinthe-6b0ffd260094
  • Getting Started With Kubernetes Operators (Ansible Based) – Part 2

    Introduction

    In the first part of this blog series, getting started with Kubernetes operators (Helm based), we learned the basics of operators and build a Helm based operator. In this blog post, we will try out an Ansible-based operator. Ansible is a very popular tool used by organizations across the globe for configuration management, deployment, and automation of other operational tasks, this makes Ansible an ideal tool to build operators as with operators also we intend to eliminate/minimize the manual interventions required while running/managing our applications on Kubernetes. Ansible based operators allow us to use Ansible playbooks and roles to manage our application on Kubernetes.

    Operator Maturity Model

    Image source: Github

    Before we start building the operator let’s spend some time in understanding the operator maturity model. Operator maturity model gives an idea of the kind of application management capabilities different types of operators can have. As we can see in the diagram above the model describes five generic phases of maturity/capability for operators. The minimum expectation/requirement from an operator is that they should be able to deploy/install and upgrade application and that is provided by all the operators. Helm based operators are simplest of all of them as Helm is Chart manager and we can do only install and upgrades using it. Ansible based operators can be more mature as Ansible has modules to perform a wide variety of operational tasks, we can use these modules in the Ansible roles/playbooks we use in our operator and make them handle more complex applications or use cases. In the case of Golang based operators, we write the operational logic ourselves so we have the liberty to customize it as per our requirements.

    Building an Ansible Based Operator

    1. Let’s first install the operator sdk

    go get -d github.com/operator-framework/operator-sdk
    cd $GOPATH/src/github.com/operator-framework/operator-sdk
    git checkout master make dep make install

    Now we will have the operator-sdk binary in the $GOPATH/bin folder.

    2.  Setup the project

    Operator-sdk new bookstore-operator–api-version=blog.velotio.com/v1alpha1–kind=BookStore–type=ansible

    In the above command we have set the operator type as ansible as we want an ansible based operator. It creates a folder structure as shown below

    bookstore-operator/
    
    | |- build/ # Contains the Dockerfile to build the operator image.
    
    | |- deploy/ # Contains the crd, cr and manifest files for deploying operator.
    
    | |- roles/ # Contains the helm chart we used while creating the project.
    
    | |- molecule/ # molecule is used for testing the ansible roles.
    
    | |- watches.yaml # Specifies the resource the operator watches (maintains the state of). 

    Inside the roles folder, it creates an Ansible role name `bookstore`. This role is bootstrapped with all the directories and files which are part of the standard ansible roles.

    Now let’s take a look at the watches.yaml file:

    ---
    - version: v1alpha1
      group: blog.velotio.com
      kind: BookStore
      role: /opt/ansible/roles/bookstore

    Here we can see that it looks just like the operator is going to watch the events related to the objects of BookStore kind and execute the ansible role bookstore.  Drawing parallels from our helm based operator we can see that the behavior in both the cases are similar the only difference being that in case of Helm based operator the operator used to execute the helm chart specified in response to the events related to the object it was watching and here we are executing an ansible role.

    In case of ansible based operators, we can get the operator to execute an Ansible playbook as well rather than an ansible role.

    3.  Building the bookstore Ansible role        

    Now we need to modify the bookstore Ansible roles created for us by the operator-framework.

    First we will update the custom resource (CR) file ( blog_v1alpha1_bookstore_cr.yaml) available at deploy/crd/ location. In this CR we can configure all the values which we want to pass to the bookstore Ansible role. By default the CR contains only the size field, we will update it to include other field which we need in our role.  To keep things simple, we will just include some basic variables like image name, tag etc. in our spec.

    apiVersion: blog.velotio.com/v1alpha1
    kind: BookStore
    metadata:
      name: my-bookstore
    spec:
      image:
        app:
          repository: akash125/pyapp
          tag: latest
          pullPolicy: IfNotPresent
        mongodb:
          repository: mongo
          tag: latest
          pullPolicy: IfNotPresent
        
      service:
        app:
          type: LoadBalancer
        mongodb:
          type: ClusterIP

    The Ansible operator passes the key values pairs listed in the spec of the cr as variables to Ansible.  The operator changes the name of the variables to snake_case before running Ansible so when we use the variables in our role we will refer the values in snake case.

    Next, we need to create the tasks the bookstore roles will execute. Now we will update the tasks to define our deployment. By default an Ansible role executes the tasks defined at `/tasks/main.yml`. For defining our deployment we will leverage k8s module of Ansible. We will create a kubernetes deployment and service for our app as well as mongodb.

    ---
    # tasks file for bookstore
    
    - name: Create the mongodb deployment
      k8s:
        definition:
          kind: Deployment
          apiVersion: apps/v1beta1
          metadata:
            name: mongodb-deployment
            namespace: '{{ meta.namespace }}'
          spec:
            replicas: 1
            selector:
            matchLabels:
              app: book-store-mongodb
            template:
              metadata:
                labels:
                  app: book-store-mongodb
              spec:
                containers:
                - name: mongodb
                  image: "{{image.mongodb.repository}}:{{image.mongodb.tag}}"
                  imagePullPolicy: "{{ image.mongodb.pull_policy }}"
                  ports:
                  - containerPort: 27017
    
    - name: Create the mongodb service
      k8s:
        definition:
          apiVersion: v1
          kind: Service
          metadata:
            name: mongodb-service
            namespace: '{{ meta.namespace }}'
            labels:
              app: book-store-mongodb
          spec:
            type: "{{service.mongodb.type}}"
            ports:
            - name: elb-port
              port: 27017
              protocol: TCP
              targetPort: 27017
            selector:
              app: book-store-mongodb
     - name: Create the bookstore deployment
      k8s:
        definition: 
          kind: Deployment
          apiVersion: apps/v1beta1
          metadata:
            name: book-store
            namespace: '{{ meta.namespace }}'
          spec:
            replicas: 1
            selector:
            matchLabels:
              app: book-store
            template:
              metadata:
                labels:
                  app: book-store
              spec:
                containers:
                - name: book-store
                  image: "{{image.app.repository}}:{{image.app.tag}}"
                  imagePullPolicy: "{{image.app.pull_policy}}"         
                  ports:
                  - containerPort: 3000
    
    - name: Create the bookstore service
      k8s:
        definition:
          apiVersion: v1
          kind: Service
          metadata:
            name: book-store
            namespace: '{{ meta.namespace }}'
            labels:
              app: book-store
          spec:
            type: "{{service.app.type}}"
            ports:
            - name: elb-port
              port: 80
              protocol: TCP
              targetPort: 3000
            selector:
              app: book-store

    In the above file we can see that we have used the pullPolicy field defined in our cr spec as ‘pull_policy’ in our tasks. Here we have used inline definition to create our k8s objects as our app is quite simple. For large applications creating objects using separate definition files would be a better approach.

    4 . Build the bookstore-operator image

    The Dockerfile for building the operator image is already in our build folder we need to run the below command from the root folder of our operator project to build the image.

    ‘operator-sdk build akash125/bookstore-operator:ansible’

    You can use your own docker repository instead of ‘akash125/bookstore-operator’

    5. Run the bookstore-operator

    As we have our operator image ready we can now go ahead and run it. The deployment file (operator.yaml under deploy folder) for the operator was created as a part of our project setup we just need to set the image for this deployment to the one we built in the previous step.

    After updating the image in the operator.yaml we are ready to deploy the operator.

    kubectl create -f deploy/service_account.yaml
    kubectl create -f deploy/role.yaml
    kubectl create -f deploy/role_binding.yaml
    kubectl create -f deploy/operator.yaml

    Note: The role created might have more permissions then actually required for the operator so it is always a good idea to review it and trim down the permissions in production setups.

    Verify that the operator pod is in running state.

    Here two containers have been started as part of the operator deployment. One is the operator and the other one is ansible. The ansible pod exists only to make the logs available to stdout in ansible format.

    6. Deploy the bookstore app

    Now we have the bookstore-operator running in our cluster we just need to create the custom resource for deploying our bookstore app.

    First, we can create bookstore cr we need to register its crd

    ‘kubectl delete -f deploy/crds/blog_v1alpha1_bookstore_crd.yaml’

    Now we can create the bookstore object

    ‘kubectl delete -f deploy/crds/blog_v1alpha1_bookstore_cr.yaml’

    Now we can see that our operator has deployed out book-store app:

    Now let’s grab the external IP of the app and make some requests to store details of books.

    Let’s hit the external IP on the browser and see if it lists the books we just stored:

    We can see that our ‘book-store’ app is up and running.

    The operator build is available here.

    Conclusion

    In this blog post, we learned how we can create an Ansible based operator using the operator framework. Ansible based operators are a great way to combine the power of Ansible and Kubernetes as it allows us to deploy our applications using Ansible role and playbooks and we can pass parameters to them (control them) using custom K8s resources. If Ansible is being heavily used across your organization and you are migrating to Kubernetes then Ansible based operators are an ideal choice for managing deployments. In the next blog, we will learn about Golang based operators.

  • Kubernetes Migration: How To Move Data Freely Across Clusters

    This blog focuses on migrating Kubernetes clusters from one cloud provider to another. We will be migrating our entire data from Google Kubernetes Engine to Azure Kubernetes Service using Velero.

    Prerequisite

    • A Kubernetes cluster > 1.10

    Setup Velero with Restic Integration

    Velero consists of a client installed on your local computer and a server that runs in your Kubernetes cluster, like Helm

    Installing Velero Client

    You can find the latest release corresponding to your OS and system and download Velero from there:

    $ wget
    https://github.com/vmware-tanzu/velero/releases/download/v1.3.1/velero-v1.3.1-linux-amd64.tar.gz

    Extract the tarball (change the version depending on yours) and move the Velero binary to /usr/local/bin

    $ tar -xvzf velero-v0.11.0-darwin-amd64.tar.gz
    $ sudo mv velero /usr/local/bin/
    $ velero help

    Create a Bucket for Velero on GCP

    Velero needs an object storage bucket where it will store the backup. Create a GCS bucket using:

    gsutil mb gs://<bucket-name

    Create a Service Account for Velero

    # Create a Service Account
    gcloud iam service-accounts create velero --display-name "Velero service account"
    SERVICE_ACCOUNT_EMAIL=$(gcloud iam service-accounts list --filter="displayName:Velero service account" --format 'value(email)')
    
    #Define Permissions for the Service Account
    ROLE_PERMISSIONS=(
    compute.disks.get
    compute.disks.create
    compute.disks.createSnapshot
    compute.snapshots.get
    compute.snapshots.create
    compute.snapshots.useReadOnly
    compute.snapshots.delete
    compute.zones.get
    )
    
    # Create a Role for Velero
    PROJECT_ID=$(gcloud config get-value project)
    
    gcloud iam roles create velero.server 
    --project $PROJECT_ID 
    --title "Velero Server" 
    --permissions "$(IFS=","; echo "${ROLE_PERMISSIONS[*]}")"
    
    # Create a Role Binding for Velero
    gcloud projects add-iam-policy-binding $PROJECT_ID 
    --member serviceAccount:$SERVICE_ACCOUNT_EMAIL 
    --role projects/$PROJECT_ID/roles/velero.server
    
    gsutil iam ch serviceAccount:$SERVICE_ACCOUNT_EMAIL:objectAdmin
    
    # Generate Service Key file for Velero and save it for later
    gcloud iam service-accounts keys create credentials-velero 
    --iam-account $SERVICE_ACCOUNT_EMAIL

    Install Velero Server on GKE and AKS

    Use the –use-restic flag on the Velero install command to install restic integration.

    $ velero install 
    --use-restic 
    --bucket  
    --provider gcp 
    --secret-file  
    --use-volume-snapshots=false 
    --plugins=--plugins restic/restic
    $ velero plugin add velero/velero-plugin-for-gcp:v1.0.1
    $ velero plugin add velero/velero-plugin-for-microsoft-azure:v1.0.0

    After that, you can see a DaemonSet of restic and deployment of Velero in your Kubernetes cluster.

    $ kubectl get po -n velero

    Restic Components

    In addition, there are three more Custom Resource Definitions and their associated controllers to provide restic support.

    Restic Repository

    • Maintain the complete lifecycle for Velero’s restic repositories.
    • Restic lifecycle commands such as restic init check and prune are handled by this CRD controller.

    PodVolumeBackup

    • This CRD backs up the persistent volume based on the annotated pod in selected namespaces. 
    • This controller executes backup commands on the pod to initialize backups. 

    PodVolumeRestore

    • This controller restores the respective pods that were inside restic backups. And this controller is responsible for the restore commands execution.

    Backup an application on GKE

    For this blog post, we are considering that Kubernetes already has an application that is using persistent volumes. Or you can install WordPress as an example as explained here.

    We will perform GKE Persistent disk migration to Azure Persistent Disk using Velero. 

    Follow the below steps:

    1. To back up, the deployment or statefulset checks for the volume name that is mounted to backup that particular persistent volume. For example, here pods need to be annotated with Volume Name “data”.
    volumes:
        - name: data
            persistentVolumeClaim:
                claimName: mongodb 

    1. Annotate the pods with the volume names, you’d like to take the backup of and only those volumes will be backed up: 
    $ kubectl -n NAMESPACE annotate pod/POD_NAME backup.velero.io/backup-volumes=VOLUME_NAME1,VOLUME_NAME2

    For example, 

    $ kubectl -n application annotate pod/wordpress-pod backup.velero.io/backup-volumes=data

    1. Take a backup of the entire namespace in which the application is running. You can also specify multiple namespaces or skip this flag to backup all namespaces by default.
      We are going to backup only one namespace in this blog.
    $ velero backup create testbackup --include-namespaces application

    1. Monitor the progress of backup:
    $ velero backup describe testbackup --details

       

    Once the backup is complete, you can list it using:

    $ velero backup get

    You can also check the backup on GCP Portal under Storage.
    Select the bucket you created and you should see a similar directory structure:

    Restore the application to AKS

    Follow the below steps to restore the backup:

    1. Make sure to have the same StorageClass available in Azure as used by GKE Persistent Volumes. For example, if the Storage Class of the PVs is “persistent-ssd”, create the same on AKS using below template:
    kind: StorageClass
    apiVersion: storage.k8s.io/v1
    metadata:
      name: persistent-ssd // same name as GKE storageclass name
    provisioner: kubernetes.io/azure-disk
    parameters: 
      storageaccounttype: Premium_LRS
      kind: Managed 

    1. Run Velero restore.
    $ velero restore create testrestore --from-backup testbackup

    You can monitor the progress of restore:

    $ velero restore describe testrestore --details

    You can also check on GCP Portal, a new folder “restores” is created under the bucket.

    In some time, you should be able to see that the application namespace is back and WordPress and MySQL pods are running again.

    Troubleshooting

    For any errors/issues related to Velero, you may find below commands helpful for debugging purposes:

    # Describe the backup to see the status
    $ velero backup describe testbackup --details
    
    # Check backup logs, and look for errors if any
    $ velero backup logs testbackup
    
    # Describe the restore to see the status
    $ velero restore describe testrestore --details
    
    # Check restore logs, and look for errors if any
    $ velero restore logs testrestore
    
    # Check velero and restic pod logs, and look for errors if any
    $ kubectl -n velero logs VELERO_POD_NAME/RESTIC_POD_NAME
    NOTE: You can change the default log-level to debug mode by adding --log-level=debug as an argument to the container command in the velero pod template spec.
    
    # Describe the BackupStorageLocation resource and look for any errors in Events
    $ kubectl describe BackupStorageLocation default -n velero

    Conclusion

    The migration of persistent workloads across Kubernetes clusters on different cloud providers is difficult. This became possible by using restic integration with the Velero backup tool. This tool is still said to be in beta quality as mentioned on the official site. I have performed GKE to AKS migration and it went successfully. You can try other combinations of different cloud providers for migrations.

    The only drawback of using Velero to migrate data is if your data is too huge, it may take a while to complete migration. It took me almost a day to migrate a 350 GB disk from GKE to AKS. But, if your data is comparatively less, this should be a very efficient and hassle-free way to migrate it.

  • Parallelizing Heavy Read and Write Queries to SQL Datastores using Spark and more!

    The amount of data in our world has been exploding exponentially day by day. Processing and analyzing this Big Data has become key in the current age to make informed, data-driven decisions. Spark is a unified distributed data processing engine used for Big Data. Spark can be used to process Big Data in an efficient manner. Spark lets you process Big Data faster by splitting the work into chunks and assigning those chunks to computation resources across nodes. It can handle up to petabytes of data, which is millions of gigabytes of data. It processes all its data in memory, which makes it faster.

    We talked about processing Big Data in Spark, but we know spark doesn’t store any data like other file systems. So, to process data in Spark, we must read data from different data sources, clean or process the data, and again store this data in one of the target data sources. Data sources can be files, APIs, databases, or streams. 

    Database management systems have been present for a decade. Many applications generate huge amounts of data and store data in database management systems. And a lot of times, we need to connect spark to the database and process that data.

    In this blog, we are going to discuss how to use spark to read from and write to databases in parallel. Our focus will be on reading/writing data from/to the database using different methods, which will help us read/write TeraBytes of data in an efficient manner.

    Reading / Writing data from/to Database using Spark:

    To read data or write data from/to the database, we will need to perform a few basic steps regardless of any programming language or framework we are using. What follows is an overview of the steps to read data from databases.

    Step 1: Register Driver or Use Connector

    Get the respective driver of your database and register the driver, or use the connector to connect to the database.

    Step 2: Make a connection

    Next, the driver or connector makes a connection to the database.

    Step 3: Run query statement

    Using the connection created in the previous step, execute the query, which will return the result.

    Step 4: Process result

    For the result, we got in the previous step, process it as per your requirement.

    Step 5: Close the connection

    Dataset we are using:

    Covid data

    This dataset contains details of COVID patients across all states. It has different information such as State, Confirmed, Recovered, Deceased, Other, Tested, and Date.

    You can load this dataset in any of the databases you work with and can try out the entire discussion practically.

    The following image shows ten records of the entire dataset.

    Single-partition Spark program:

    ## Creating a spark session and adding Postgres Driver to spark.
    from pyspark.sql import SparkSession
    
    ## Creating spark session and adding Postgres Driver to spark.
    spark_session = SparkSession.builder 
        .master("local") 
        .appName("Databases") 
        .config("spark.jars.packages", "org.postgresql:postgresql:42.2.8") 
        .getOrCreate()
    
    hostname = "localhost",
    jdbc_port = 5432,
    dbname = "aniket",
    username = "postgres",
    password = "pass@123"
    
    jdbc_url = "jdbc:postgresql://{0}:{1}/{2}".format(hostname, jdbc_port, dbname)
    
    ## reading data
    table_data_df = spark_session.read 
        .format("jdbc") 
        .option("url", jdbc_url) 
        .option("dbtable", "aniket") 
        .option("user", username) 
        .option("password", password) 
        .option("driver", "org.postgresql.Driver") 
        .load()
    
    ## writing data
    table_data_df.write 
        .format("jdbc") 
        .option("url", jdbc_url) 
        .option("dbtable", "spark_schema.zipcode_table") 
        .option("user", username) 
        .option("password", password) 
        .option("driver", "org.postgresql.Driver") 
        .save()

    Spark provides an API to read data from a database and is very simple to use. First of all, we will need to create a Spark session. Then add the driver to Spark. It can be added through the program itself, or we can add it using shell also.

    The first line of code imports the SparkSession class. This is the entry point to programming Spark with the Dataset and DataFrame API

    From the fifth to the ninth line of the above code, we are creating a spark session on a local system with four cores, which will be used for interaction with our spark application. We specify the name for our application using appName(), which in our case, is ‘Databases.’ This app name will be shown on Webb UI for our cluster. Next, we can specify any configurations for the spark application using config(). In our case, we have specified the configuration of the driver for the Postgres database, which will be used to create a connection with the Postgres database. You can specify the driver of any of the available databases. 

    To connect to the database, we must have a hostname, port, database name, username, and password with us. Those details are in 10 through 16 lines of the above code.

    Refer to the code lines from 19 to 28 in the above snippet. Up until now, we have had our Spark session and all the information that we need to connect to the database. Using the Spark Read API, we read the data from the database. This will create a connection to the Postgres database from one of the cores that we have allocated for the Spark application. And using this connection, it will read the data into the table_data_df dataframe. Even if we have multiple cores for our application, it will still create only one connection from one of the cores. The rest of the cores will not be utilized. While we will discuss how to utilize all cores, our first focus is here.

    Refer to the code lines from 29 to 38 in the above snippet. We have the data now, so let’s try to write it to the database. Using the Spark Write API, we will write data to the database. This will also create only one connection to the database from one of the cores that we have allocated for the Spark application. Even if we have more cores for the application, it still uses only one core with the above code.

    Output of Program:

    /usr/bin/python3.8 /home/aniketrajput/aniket_work/Spark/main.py
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
    WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    :: loading settings :: url = jar:file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
    Ivy Default Cache set to: /home/aniketrajput/.ivy2/cache
    The jars for the packages stored in: /home/aniketrajput/.ivy2/jars
    org.postgresql#postgresql added as a dependency
    :: resolving dependencies :: org.apache.spark#spark-submit-parent-83662a49-0573-46c3-be8e-0a280f96c7d8;1.0
    	confs: [default]
    	found org.postgresql#postgresql;42.2.8 in central
    :: resolution report :: resolve 113ms :: artifacts dl 3ms
    	:: modules in use:
    	org.postgresql#postgresql;42.2.8 from central in [default]
    	---------------------------------------------------------------------
    	|                  |            modules            ||   artifacts   |
    	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    	---------------------------------------------------------------------
    	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
    	---------------------------------------------------------------------
    :: retrieving :: org.apache.spark#spark-submit-parent-83662a49-0573-46c3-be8e-0a280f96c7d8
    	confs: [default]
    	0 artifacts copied, 1 already retrieved (0kB/5ms)
    22/04/22 11:55:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    |        state|         district|confirmed|recovered|deceased|other|tested|      date|
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    |Uttar Pradesh|         Varanasi|    23512|    23010|     456|    0|595510|2021-02-24|
    |  Uttarakhand|           Almora|     3259|     3081|      25|  127| 84443|2021-02-24|
    |  Uttarakhand|        Bageshwar|     1534|     1488|      17|   26| 55626|2021-02-24|
    |  Uttarakhand|          Chamoli|     3486|     3373|      15|   88| 90390|2021-02-24|
    |  Uttarakhand|        Champawat|     1819|     1790|       9|    7| 95068|2021-02-24|
    |  Uttarakhand|         Dehradun|    29619|    28152|     962|  439|401496|2021-02-24|
    |  Uttarakhand|         Haridwar|    14137|    13697|     158|  175|369542|2021-02-24|
    |  Uttarakhand|         Nainital|    12636|    12254|     237|   79|204422|2021-02-24|
    |  Uttarakhand|    Pauri Garhwal|     5145|     5033|      60|   24|138878|2021-02-24|
    |  Uttarakhand|      Pithoragarh|     3361|     3291|      47|   11| 72686|2021-02-24|
    |  Uttarakhand|      Rudraprayag|     2270|     2251|      10|    7| 52378|2021-02-24|
    |  Uttarakhand|    Tehri Garhwal|     4227|     4026|      16|  170|105111|2021-02-24|
    |  Uttarakhand|Udham Singh Nagar|    11538|    11267|     117|  123|337292|2021-02-24|
    |  Uttarakhand|       Uttarkashi|     3789|     3645|      17|  118|120026|2021-02-24|
    |  West Bengal|       Alipurduar|     7705|     7616|      86|    0|  null|2021-02-24|
    |  West Bengal|          Bankura|    11940|    11788|      92|    0|  null|2021-02-24|
    |  West Bengal|          Birbhum|    10035|     9876|      89|    0|  null|2021-02-24|
    |  West Bengal|      Cooch Behar|    11835|    11756|      72|    0|  null|2021-02-24|
    |  West Bengal| Dakshin Dinajpur|     8179|     8099|      74|    0|  null|2021-02-24|
    |  West Bengal|       Darjeeling|    18423|    18155|     203|    0|  null|2021-02-24|
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    only showing top 20 rows
    
    
    Process finished with exit code 0

    Multiple Partition spark program:

    from pyspark.sql import SparkSession
    
    ## Creating a spark session and adding Postgres Driver to spark.
    spark_session = SparkSession.builder 
        .master("local[4]") 
        .appName("Databases") 
        .config("spark.jars.packages", "org.postgresql:postgresql:42.2.8")
        .getOrCreate()
    
    hostname = "localhost"
    jdbc_port = 5432
    dbname = "postgres"
    username = "postgres"
    password = "pass@123"
    
    jdbc_url = "jdbc:postgresql://{0}:{1}/{2}".format(hostname, jdbc_port, dbname)
    
    partition_column = 'date'
    lower_bound = '2021-02-20'
    upper_bound = '2021-02-28'
    num_partitions = 4
    
    ## reading data
    table_data_df = spark_session.read 
        .format("jdbc") 
        .option("url", jdbc_url) 
        .option("dbtable", "covid_data") 
        .option("user", username) 
        .option("password", password) 
        .option("driver", "org.postgresql.Driver") 
        .option("partitionColumn", partition_column) 
        .option("lowerBound", lower_bound) 
        .option("upperBound", upper_bound) 
        .option("numPartitions", num_partitions) 
        .load()
    
    table_data_df.show()
    
    ## writing data
    table_data_df.write 
        .format("jdbc") 
        .option("url", jdbc_url) 
        .option("dbtable", "covid_data_output") 
        .option("user", username) 
        .option("password", password) 
        .option("driver", "org.postgresql.Driver") 
        .option("numPartitions", num_partitions) 
        .save()

    As promised in the last section, we will discuss how we can optimize for resource utilization. In the last section, we had only one connection, utilizing very limited resources and causing resources to be idle or unused. To get over this, the Spark Read and Write API has a way by providing a few extra attributes. And those are partitionColumn, lowerBound, upperBound. These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. For each partition, there will be an individual core with its own connection performing the reads or writes. Thus, making the database operation in parallel.

    This is an efficient way of reading and writing data from databases in spark rather than just doing it with one partition. 

    Partitions are decided by the Spark API in the following way.

    Let’s consider an example where:

    lowerBound: 0

    upperBound: 1000

    numPartitions: 10

    Stride is equal to 100, and partitions correspond to the following queries:

    SELECT * FROM table WHERE partitionColumn BETWEEN 0 AND 100 

    SELECT * FROM table WHERE partitionColumn BETWEEN 100 AND 200 

    SELECT * FROM table WHERE partitionColumn > 9000

    BETWEEN here is exclusive on the upper bound.

    Now we have data in multiple partitions. Each executor can have one or more partitions based on cluster configuration. Suppose we have 10 cores and 10 partitions. One partition of data can be fetched from one executor using one core. So, 10 partitions of data can be fetched from 10 executors. Each of these executors will create the connection to the database and will read the data.

    Note– lowerbound and upperbound does not filter the data. It just helps spark to decide the stride of data.

         partitionColumn must be a numeric, date, or timestamp column from the table

    Also, there are some attributes that can be used during the write operation to optimize the write operation. One of the attributes is “batchsize”. The JDBC batch size, which determines how many rows to insert per round trip. This can help the performance of JDBC drivers. This option applies only to writing. One more attribute called “truncate” can be helpful to optimize the write operation. This is a JDBC writer-related option. When SaveMode.Overwrite is enabled, it causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient and prevents the table metadata (e.g., indices) from being removed.

    Output of Program:

    /usr/bin/python3.8 /home/aniketrajput/aniket_work/Spark/main.py
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
    WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    :: loading settings :: url = jar:file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
    Ivy Default Cache set to: /home/aniketrajput/.ivy2/cache
    The jars for the packages stored in: /home/aniketrajput/.ivy2/jars
    org.postgresql#postgresql added as a dependency
    :: resolving dependencies :: org.apache.spark#spark-submit-parent-8047b5cc-11e8-4efb-8a38-70edab0d5404;1.0
    	confs: [default]
    	found org.postgresql#postgresql;42.2.8 in central
    :: resolution report :: resolve 104ms :: artifacts dl 3ms
    	:: modules in use:
    	org.postgresql#postgresql;42.2.8 from central in [default]
    	---------------------------------------------------------------------
    	|                  |            modules            ||   artifacts   |
    	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    	---------------------------------------------------------------------
    	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
    	---------------------------------------------------------------------
    :: retrieving :: org.apache.spark#spark-submit-parent-8047b5cc-11e8-4efb-8a38-70edab0d5404
    	confs: [default]
    	0 artifacts copied, 1 already retrieved (0kB/4ms)
    22/04/22 12:20:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    |        state|         district|confirmed|recovered|deceased|other|tested|      date|
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    |Uttar Pradesh|         Varanasi|    23512|    23010|     456|    0|595510|2021-02-24|
    |  Uttarakhand|           Almora|     3259|     3081|      25|  127| 84443|2021-02-24|
    |  Uttarakhand|        Bageshwar|     1534|     1488|      17|   26| 55626|2021-02-24|
    |  Uttarakhand|          Chamoli|     3486|     3373|      15|   88| 90390|2021-02-24|
    |  Uttarakhand|        Champawat|     1819|     1790|       9|    7| 95068|2021-02-24|
    |  Uttarakhand|         Dehradun|    29619|    28152|     962|  439|401496|2021-02-24|
    |  Uttarakhand|         Haridwar|    14137|    13697|     158|  175|369542|2021-02-24|
    |  Uttarakhand|         Nainital|    12636|    12254|     237|   79|204422|2021-02-24|
    |  Uttarakhand|    Pauri Garhwal|     5145|     5033|      60|   24|138878|2021-02-24|
    |  Uttarakhand|      Pithoragarh|     3361|     3291|      47|   11| 72686|2021-02-24|
    |  Uttarakhand|      Rudraprayag|     2270|     2251|      10|    7| 52378|2021-02-24|
    |  Uttarakhand|    Tehri Garhwal|     4227|     4026|      16|  170|105111|2021-02-24|
    |  Uttarakhand|Udham Singh Nagar|    11538|    11267|     117|  123|337292|2021-02-24|
    |  Uttarakhand|       Uttarkashi|     3789|     3645|      17|  118|120026|2021-02-24|
    |  West Bengal|       Alipurduar|     7705|     7616|      86|    0|  null|2021-02-24|
    |  West Bengal|          Bankura|    11940|    11788|      92|    0|  null|2021-02-24|
    |  West Bengal|          Birbhum|    10035|     9876|      89|    0|  null|2021-02-24|
    |  West Bengal|      Cooch Behar|    11835|    11756|      72|    0|  null|2021-02-24|
    |  West Bengal| Dakshin Dinajpur|     8179|     8099|      74|    0|  null|2021-02-24|
    |  West Bengal|       Darjeeling|    18423|    18155|     203|    0|  null|2021-02-24|
    +-------------+-----------------+---------+---------+--------+-----+------+----------+
    only showing top 20 rows
    
    
    Process finished with exit code 0

    We have seen how to read and write data in Spark. Spark is not the only way to connect with databases, right? There are multiple ways we can access databases and try to achieve parallel read-writes. We will discuss this in further sections. We will mainly focus on reading and writing it from python.

    Single Thread Python Program: 

    import traceback
    import psycopg2
    import pandas as pd
    
    class PostgresDbClient:
        def __init__(self, postgres_hostname, postgres_jdbcport, postgres_dbname, username, password):
            self.db_host = postgres_hostname
            self.db_port = postgres_jdbcport
            self.db_name = postgres_dbname
            self.db_user = username
            self.db_pass = password
    
        def create_conn(self):
            conn = None
            try:
                print('Connecting to the Postgres database...')
                conn = psycopg2.connect("dbname={} user={} host={} password={} port={}".format(self.db_name, self.db_user, self.db_host, self.db_pass, self.db_port))
                print('Successfully connected to the Postgres database...')
            except Exception as e:
                print("Cannot connect to Postgres.")
                print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')
            return conn
    
        def read(self, query):
            try:
                conn = self.create_conn()
                cursor = conn.cursor()
                print(f"Reading data !!!")
                cursor.execute(query)
                data = cursor.fetchall()
                print(f"Read Data !!!")
                cursor.close()
                conn.close()
                return data
            except Exception as e:
                print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')
    
    
    if __name__ == "__main__":
        hostname = "localhost"
        jdbc_port = 5432
        dbname = "postgres"
        username = "postgres"
        password = "pass@123"
        table_name = "covid_data"
        query = f"select * from {table_name}"
    
        db_client = PostgresDbClient(postgres_hostname=hostname, postgres_jdbcport=jdbc_port, postgres_dbname=dbname, username=username,password=password)
        data = pd.DataFrame(db_client.read(query))
        print(data)

    To integrate Postgres with Python, we have different libraries or adopters that we can use. But Psycopg is the widely used adopter. First off all, you will need to install the Psycopg2 library. Psycopg2 is a slightly updated version of the Psycopg adapter. You install it using pip or any way you are comfortable with.

    To connect with the Postgres database, we need hostname, port, database name, username, and password. We are storing all these details as attributes in class. The create connection method will form a connection with the Postgres database using the connect() method of psycopg2 module. This method will return a connection object.
    In the read method, we call this connection method and get a connection object. Using this connection object, we create a cursor. This cursor is bound to have a connection with the database for its lifetime and execute all the commands or queries on the database. Using this query object, we execute a read query on the database. Then the data returned by the executing read query can be fetched using the fetchall() method. Then we close the connection.

    To run the program, we have specified details of database and query. Next, we create an object of PostgresDbClient and call the read method from class PostgresDbClient. This read method will return as data and we are converting this data into relational format using pandas.

    This implementation is very straightforward: this program creates one process in our system and fetches all the data using system resources, CPU, memory, etc. The drawback of this approach is that suppose this program uses 30 percent CPU and memory resources out of 100%, then the remaining 70% of resources are idle. We can maximize this usage by other means like multithreading or multiprocessing. 

    Output of Program:

    Connecting to the Postgres database...
    Successfully connected to the Postgres database...
    Reading data !!!
    Read Data !!!
                                  0              1    2   3  4  5     6           7
    0   Andaman and Nicobar Islands        Unknown   33  11  0  0  None  2020-04-26
    1                Andhra Pradesh      Anantapur   53  14  4  0  None  2020-04-26
    2                Andhra Pradesh       Chittoor   73  13  0  0  None  2020-04-26
    3                Andhra Pradesh  East Godavari   39  12  0  0  None  2020-04-26
    4                Andhra Pradesh         Guntur  214  29  8  0  None  2020-04-26
    ..                          ...            ...  ...  .. .. ..   ...         ...
    95                        Bihar         Araria    1   0  0  0  None  2020-04-30
    96                        Bihar          Arwal    4   0  0  0  None  2020-04-30
    97                        Bihar     Aurangabad    8   0  0  0  None  2020-04-30
    98                        Bihar          Banka    3   0  0  0  None  2020-04-30
    99                        Bihar      Begusarai   11   5  0  0  None  2020-04-30
    
    [100 rows x 8 columns]
    
    Process finished with exit code 0

    Multi Thread python program:

    In the previous section, we discussed the drawback of a single process and single-thread implementation. Let’s get started with how to maximize resource usage. Before getting into multithreading, let’s understand a few basic but important concepts.

    What is a process?

    When you execute any program, the operating system loads it in memory and then starts executing the program. This instance of the program being executed is called a process. Computing and memory resources are associated with each process separately.

    What is a thread?

    A thread is a sequential flow of execution. A process is also a thread. Usually, the process is called a main thread. Unlike a process, the same computing and memory resources can be shared with multiple threads.

    What is multithreading?

    This is when a process has multiple threads, along with the main thread, and these threads run independently but concurrently using the same computing and memory resources associated with the process. Such a program is called a multithreaded program or process. Multithreading uses resources very efficiently, which results in maximizing performance.

    What is multiprocessing?

    When multiple processes run independently, with separate resources associated with each process, it is called multiprocessing. Multiprocessing is achieved with multiple processors running separate processes on each processor. 

    Let’s get back to our program. Here you can see we have a connection and read method. These two methods are exactly the same as from the previous section. Here, we have one new function, which is get_thread(). Be careful, as a method belongs to the class, and afunction, it is not part of this class. So, this get_thred() function is global and acts as a wrapper function for calling the read method from the class PostgresDbClient. This is because we can’t create threads using class methods. Don’t get confused if you don’t understand it, as it is just how we write the code.

    To run the program, we have specified the Postgres database details and queries. In the previous approach, we fetched all the data from the table with one thread only. In this approach, the plan is to fetch one day of data using one thread so that we can maximize resource utilization. Here, each query reads one day’s worth of data from the table using one thread. Having 5 queries will fetch 5 days of data, and 5 threads will be running concurrently.

    To create a thread in Python, we will need to use the Thread() method from the threading library. We need to pass the function that we want to run and arguments of that function. The thread() object will create a new thread and return its object. The thread has been created but has not yet started. To start this thread, we will need to use the start() method. In our program, we are starting 5 threads. If you try executing this entire program multiple times, you will end up with different results. Some data will fetch prior, and some will fetch later. And at the time of the next execution, this order will be different again. This is because resource handling is done by the operating system. Depending on what the OS thinks about which thread to give what resources, the output is generated. If you want to know how this is done, you will need to go deep into operating systems concepts.

    In our use case, we are just printing the data to the console. To store the data, there are multiple ways. One simple way is to define the global variable and store the result in it, but we will need to achieve synchronization as multiple threads might access the global variable, which can lead to race conditions. Another way is to extend the thread class to your custom class, and you can define a class variable—and you can use this variable to save the data. Again, here, you will need to make sure you are achieving synchronization.

    So, whenever you want to store the data in a variable by any available method, you will need to achieve synchronization. So, synchronization will lead to the sequential execution of threads. And this sequential processing is not what we are looking for.
    To avoid synchronization, we can directly write the data to the target—so that when the thread reads the data, the same thread will write data again back to the target database. This way, we can avoid synchronization and store the data in the database for future use. This function can look as below, where db_client.write(data) is a function that writes the data to a database.

    def get_thread(thread_id, db_client, query):

        print(f”Starting thread id {thread_id}”)

        data = pd.DataFrame(db_client.read(query))

        print(f”Thread {thread_id} data “, data, sep=”n”)

        db_client.write(data)

        print(f”Stopping thread id {thread_id}”)

    Python Program:

    import threading
    import traceback
    import psycopg2
    import pandas as pd
    
    class PostgresDbClient:
        def __init__(self, postgres_hostname, postgres_jdbcport, postgres_dbname, username, password):
            self.db_host = postgres_hostname
            self.db_port = postgres_jdbcport
            self.db_name = postgres_dbname
            self.db_user = username
            self.db_pass = password
    
        def create_conn(self):
            conn = None
            try:
                print('Connecting to the Postgres database...')
                conn = psycopg2.connect("dbname={} user={} host={} password={} port={}".format(self.db_name, self.db_user, self.db_host, self.db_pass, self.db_port))
                print('Successfully connected to the Postgres database...')
            except Exception as e:
                print("Cannot connect to Postgres.")
                print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')
            return conn
    
        def read(self, query):
            try:
                conn = self.create_conn()
                cursor = conn.cursor()
                print(f"Reading data !!!")
                cursor.execute(query)
                data = cursor.fetchall()
                print(f"Read Data !!!")
                cursor.close()
                conn.close()
                return data
            except Exception as e:
                print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')
    
    
    def get_thread(thread_id, db_client, query):
        print(f"Starting thread id {thread_id}")
        data = pd.DataFrame(db_client.read(query))
        print(f"Thread {thread_id} data ", data, sep="n")
        print(f"Stopping thread id {thread_id}")
    
    
    if __name__ == "__main__":
        hostname = "localhost"
        jdbc_port = 5432
        dbname = "postgres"
        username = "postgres"
        password = "pass@123"
        table_name = "covid_data"
        query = f"select * from {table_name}"
    
        partition_column = 'date'
        lower_bound = '2020-04-26'
        upper_bound = '2020-04-30'
        num_partitions = 5
    
    
        query1 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
        query2 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
        query3 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
        query4 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
        query5 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'"
    
        db_client = PostgresDbClient(postgres_hostname=hostname, postgres_jdbcport=jdbc_port, postgres_dbname=dbname, username=username,password=password)
        x1 = threading.Thread(target=get_thread, args=(1, db_client, query1))
        x1.start()
        x2 = threading.Thread(target=get_thread, args=(2, db_client, query2))
        x2.start()
        x3 = threading.Thread(target=get_thread, args=(3, db_client, query3))
        x3.start()
        x4 = threading.Thread(target=get_thread, args=(4, db_client, query4))
        x4.start()
        x5 = threading.Thread(target=get_thread, args=(5, db_client, query5))
        x5.start()

    Output of Program:

    Starting thread id 1
    Connecting to the Postgres database...
    Starting thread id 2
    Connecting to the Postgres database...
    Starting thread id 3
    Connecting to the Postgres database...
    Starting thread id 4
    Connecting to the Postgres database...
    Starting thread id 5
    Connecting to the Postgres database...
    Successfully connected to the Postgres database...Successfully connected to the Postgres database...Successfully connected to the Postgres database...
    Reading data !!!
    
    Reading data !!!
    
    Reading data !!!
    Successfully connected to the Postgres database...
    Reading data !!!
    Successfully connected to the Postgres database...
    Reading data !!!
    Read Data !!!
    Read Data !!!
    Read Data !!!
    Read Data !!!
    Read Data !!!
    Thread 2 data 
    Thread 3 data 
    Thread 1 data 
    Thread 5 data 
    Thread 4 data 
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-27
    1                Andhra Pradesh       Anantapur   53  ...  0  None  2020-04-27
    2                Andhra Pradesh        Chittoor   73  ...  0  None  2020-04-27
    3                Andhra Pradesh   East Godavari   39  ...  0  None  2020-04-27
    4                Andhra Pradesh          Guntur  237  ...  0  None  2020-04-27
    5                Andhra Pradesh         Krishna  210  ...  0  None  2020-04-27
    6                Andhra Pradesh         Kurnool  292  ...  0  None  2020-04-27
    7                Andhra Pradesh        Prakasam   56  ...  0  None  2020-04-27
    8                Andhra Pradesh  S.P.S. Nellore   79  ...  0  None  2020-04-27
    9                Andhra Pradesh      Srikakulam    4  ...  0  None  2020-04-27
    10               Andhra Pradesh   Visakhapatnam   22  ...  0  None  2020-04-27
    11               Andhra Pradesh   West Godavari   54  ...  0  None  2020-04-27
    12               Andhra Pradesh   Y.S.R. Kadapa   58  ...  0  None  2020-04-27
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-27
    14                        Assam         Unknown   36  ...  0  None  2020-04-27
    15                        Bihar           Arwal    4  ...  0  None  2020-04-27
    16                        Bihar      Aurangabad    7  ...  0  None  2020-04-27
    17                        Bihar           Banka    2  ...  0  None  2020-04-27
    18                        Bihar       Begusarai    9  ...  0  None  2020-04-27
    19                        Bihar       Bhagalpur    5  ...  0  None  2020-04-27
    
    [20 rows x 8 columns]
    Stopping thread id 2
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-26
    1                Andhra Pradesh       Anantapur   53  ...  0  None  2020-04-26
    2                Andhra Pradesh        Chittoor   73  ...  0  None  2020-04-26
    3                Andhra Pradesh   East Godavari   39  ...  0  None  2020-04-26
    4                Andhra Pradesh          Guntur  214  ...  0  None  2020-04-26
    5                Andhra Pradesh         Krishna  177  ...  0  None  2020-04-26
    6                Andhra Pradesh         Kurnool  279  ...  0  None  2020-04-26
    7                Andhra Pradesh        Prakasam   56  ...  0  None  2020-04-26
    8                Andhra Pradesh  S.P.S. Nellore   72  ...  0  None  2020-04-26
    9                Andhra Pradesh      Srikakulam    3  ...  0  None  2020-04-26
    10               Andhra Pradesh   Visakhapatnam   22  ...  0  None  2020-04-26
    11               Andhra Pradesh   West Godavari   51  ...  0  None  2020-04-26
    12               Andhra Pradesh   Y.S.R. Kadapa   58  ...  0  None  2020-04-26
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-26
    14                        Assam         Unknown   36  ...  0  None  2020-04-26
    15                        Bihar           Arwal    4  ...  0  None  2020-04-26
    16                        Bihar      Aurangabad    2  ...  0  None  2020-04-26
    17                        Bihar           Banka    2  ...  0  None  2020-04-26
    18                        Bihar       Begusarai    9  ...  0  None  2020-04-26
    19                        Bihar       Bhagalpur    5  ...  0  None  2020-04-26
    
    [20 rows x 8 columns]
    Stopping thread id 1
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-28
    1                Andhra Pradesh       Anantapur   54  ...  0  None  2020-04-28
    2                Andhra Pradesh        Chittoor   74  ...  0  None  2020-04-28
    3                Andhra Pradesh   East Godavari   39  ...  0  None  2020-04-28
    4                Andhra Pradesh          Guntur  254  ...  0  None  2020-04-28
    5                Andhra Pradesh         Krishna  223  ...  0  None  2020-04-28
    6                Andhra Pradesh         Kurnool  332  ...  0  None  2020-04-28
    7                Andhra Pradesh        Prakasam   56  ...  0  None  2020-04-28
    8                Andhra Pradesh  S.P.S. Nellore   82  ...  0  None  2020-04-28
    9                Andhra Pradesh      Srikakulam    4  ...  0  None  2020-04-28
    10               Andhra Pradesh   Visakhapatnam   22  ...  0  None  2020-04-28
    11               Andhra Pradesh   West Godavari   54  ...  0  None  2020-04-28
    12               Andhra Pradesh   Y.S.R. Kadapa   65  ...  0  None  2020-04-28
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-28
    14                        Assam         Unknown   38  ...  0  None  2020-04-28
    15                        Bihar          Araria    1  ...  0  None  2020-04-28
    16                        Bihar           Arwal    4  ...  0  None  2020-04-28
    17                        Bihar      Aurangabad    7  ...  0  None  2020-04-28
    18                        Bihar           Banka    3  ...  0  None  2020-04-28
    19                        Bihar       Begusarai    9  ...  0  None  2020-04-28
    
    [20 rows x 8 columns]
    Stopping thread id 3
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-30
    1                Andhra Pradesh       Anantapur   61  ...  0  None  2020-04-30
    2                Andhra Pradesh        Chittoor   80  ...  0  None  2020-04-30
    3                Andhra Pradesh   East Godavari   42  ...  0  None  2020-04-30
    4                Andhra Pradesh          Guntur  287  ...  0  None  2020-04-30
    5                Andhra Pradesh         Krishna  246  ...  0  None  2020-04-30
    6                Andhra Pradesh         Kurnool  386  ...  0  None  2020-04-30
    7                Andhra Pradesh        Prakasam   60  ...  0  None  2020-04-30
    8                Andhra Pradesh  S.P.S. Nellore   84  ...  0  None  2020-04-30
    9                Andhra Pradesh      Srikakulam    5  ...  0  None  2020-04-30
    10               Andhra Pradesh   Visakhapatnam   23  ...  0  None  2020-04-30
    11               Andhra Pradesh   West Godavari   56  ...  0  None  2020-04-30
    12               Andhra Pradesh   Y.S.R. Kadapa   73  ...  0  None  2020-04-30
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-30
    14                        Assam         Unknown   43  ...  0  None  2020-04-30
    15                        Bihar          Araria    1  ...  0  None  2020-04-30
    16                        Bihar           Arwal    4  ...  0  None  2020-04-30
    17                        Bihar      Aurangabad    8  ...  0  None  2020-04-30
    18                        Bihar           Banka    3  ...  0  None  2020-04-30
    19                        Bihar       Begusarai   11  ...  0  None  2020-04-30
    
    [20 rows x 8 columns]
    Stopping thread id 5
                                  0               1    2  ...  5     6           7
    0   Andaman and Nicobar Islands         Unknown   33  ...  0  None  2020-04-29
    1                Andhra Pradesh       Anantapur   58  ...  0  None  2020-04-29
    2                Andhra Pradesh        Chittoor   77  ...  0  None  2020-04-29
    3                Andhra Pradesh   East Godavari   40  ...  0  None  2020-04-29
    4                Andhra Pradesh          Guntur  283  ...  0  None  2020-04-29
    5                Andhra Pradesh         Krishna  236  ...  0  None  2020-04-29
    6                Andhra Pradesh         Kurnool  343  ...  0  None  2020-04-29
    7                Andhra Pradesh        Prakasam   60  ...  0  None  2020-04-29
    8                Andhra Pradesh  S.P.S. Nellore   82  ...  0  None  2020-04-29
    9                Andhra Pradesh      Srikakulam    5  ...  0  None  2020-04-29
    10               Andhra Pradesh   Visakhapatnam   23  ...  0  None  2020-04-29
    11               Andhra Pradesh   West Godavari   56  ...  0  None  2020-04-29
    12               Andhra Pradesh   Y.S.R. Kadapa   69  ...  0  None  2020-04-29
    13            Arunachal Pradesh           Lohit    1  ...  0  None  2020-04-29
    14                        Assam         Unknown   38  ...  0  None  2020-04-29
    15                        Bihar          Araria    1  ...  0  None  2020-04-29
    16                        Bihar           Arwal    4  ...  0  None  2020-04-29
    17                        Bihar      Aurangabad    8  ...  0  None  2020-04-29
    18                        Bihar           Banka    3  ...  0  None  2020-04-29
    19                        Bihar       Begusarai   11  ...  0  None  2020-04-29
    
    [20 rows x 8 columns]
    Stopping thread id 4
    
    Process finished with exit code 0

    Note that in this blog, we have used a password as a hardcoded string, which is definitely not the way to define passwords. We should use secrets, .env files, etc., as input for passwords. We do not hardcode passwords in the production environment.

    Conclusion 

    After going through the above blog, you might have gotten more familiar with how to perform read and write operations on databases using spark, python, and multithreading concepts. You also know now what are multi processes and what multithreading is. You are now also able to analyze the best way to carry out read-and-write operations on a database based on your requirements. 

    In general, if you have a small amount of data, you can use a simple python approach to read and write data. If you have a relatively high amount of data, then you can use a multi-threaded approach or a single-partition Spark approach. If you have a huge amount of data, and where reading millions of records per second is a requirement, then you can use the Spark multi-partition approach. In the end, it’s just mostly personal preference, and using which approach depends on your requirements and availability of resources.

  • A Quick Guide to Building a Serverless Chatbot With Amazon Lex

    Amazon announced “Amazon Lex” in December 2016 and since then we’ve been using it to build bots for our customers. Lex is effectively the technology used by Alexa, Amazon’s voice-activated virtual assistant which lets people control things with voice commands such as playing music, setting alarm, ordering groceries, etc. It provides deep learning-powered natural-language understanding along with automatic speech recognition. Amazon now provides it as a service that allows developers to take advantage of the same features used by Amazon Alexa. So, now there is no need to spend time in setting up and managing the infrastructure for your bots.

    Now, developers just need to design conversations according to their requirements in Lex console. The phrases provided by the developer are used to build the natural language model. After publishing the bot, Lex will process the text or voice conversations and execute the code to send responses.

    I’ve put together this quick-start tutorial using which you can start building Lex chat-bots. To understand the terms correctly, let’s consider an e-commerce bot that supports conversations involving the purchase of books.

    Lex-Related Terminologies

    Bot: It consists of all the components related to a conversation, which includes:

    • Intent: Intent represents a goal, needed to be achieved by the bot’s user. In our case, our goal is to purchase books.
    • Utterances: An utterance is a text phrase that invokes intent. If we have more than one intent, we need to provide different utterances for them. Amazon Lex builds a language model based on utterance phrases provided by us, which then invoke the required intent. For our demo example, we need a single intent “OrderBook”. Some sample utterances would be:
    • I want to order some books
    • Can you please order a book for me
    • Slots: Each slot is a piece of data that the user must supply in order to fulfill the intent. For instance, purchasing a book requires bookType and bookName as slots for intent “OrderBook” (I am considering these two factors for making the example simpler, otherwise there are so many other factors based on which one will purchase/select a book.). 
      Slots are an input, a string, date, city, location, boolean, number etc. that are needed to reach the goal of the intent. Each slot has a name, slot type, a prompt, and is it required. The slot types are the valid values a user can respond with, which can be either custom defined or one of the Amazon pre-built types.
    • Prompt: A prompt is a question that Lex uses to ask the user to supply some correct data (for a slot) that is needed to fulfill an intent e.g. Lex will ask  “what type of book you want to buy?” to fill the slot bookType.
    • Fulfillment: Fulfillment provides the business logic that is executed after getting all required slot values, need to achieve the goal. Amazon Lex supports the use of Lambda functions for fulfillment of business logic and for validations.

    Let’s Implement this Bot!

    Now that we are aware of the basic terminology used in Amazon Lex, let’s start building our chat-bot.

    Creating Lex Bot:

    • Go to Amazon Lex console, which is available only in US, East (N. Virginia) region and click on create button.
    • Create a custom bot by providing following information:
    1. Bot Name: PurchaseBook
    2. Output voice: None, this is only a test based application
    3. Set Session Timeout: 5 min
    4. Add Amazon Lex basic role to Bot app: Amazon will create it automatically.  Find out more about Lex roles & permissions here.
    5. Click on Create button, which will redirect you to the editor page.

    Architecting Bot Conversations

    Create Slots: We are creating two slots named bookType and bookName. Slot type values can be chosen from 275 pre-built types provided by Amazon or we can create our own customized slot types.

    Create custom slot type for bookType as shown here and consider predefined type named Amazon.Book for bookName.

    Create Intent: Our bot requires single custom intent named OrderBook.

    Configuring the Intents

    • Utterances: Provide some utterances to invoke the intent. An utterance can consist only of Unicode characters, spaces, and valid punctuation marks. Valid punctuation marks are periods for abbreviations, underscores, apostrophes, and hyphens. If there is a slot placeholder in your utterance ensure, that it’s in the {slotName} format and has spaces at both ends.

    Slots: Map slots with their types and provide prompt questions that need to be asked to get valid value for the slot. Note the sequence, Lex-bot will ask the questions according to priority.

    Confirmation prompt: This is optional. If required you can provide a confirmation message e.g. Are you sure you want to purchase book named {bookName}?, where bookName is a slot placeholder.

    Fulfillment: Now we have all necessary data gathered from the chatbot, it can just be passed over in lambda function, or the parameters can be returned to the client application that then calls a REST endpoint.

    Creating Amazon Lambda Functions

    Amazon Lex supports Lambda function to provide code hooks to the bot. These functions can serve multiple purposes such as improving the user interaction with the bot by using prior knowledge, validating the input data that bot received from the user and fulfilling the intent.

    • Go to AWS Lambda console and choose to Create a Lambda function.
    • Select blueprint as blank function and click next.
    • To configure your Lambda function, provide its name, runtime and code needs to be executed when the function is invoked. The code can also be uploaded in a zip folder instead of providing it as inline code. We are considering Nodejs4.3 as runtime.
    • Click next and choose Create Function.

    We can configure our bot to invoke these lambda functions at two places. We need to do this while configuring the intent as shown below:-

    where, botCodeHook and fulfillment are name of lambda functions we created.

    Lambda initialization and validation  

    Lambda function provided here i.e. botCodeHook will be invoked on each user input whose intent is understood by Amazon Lex. It will validate the bookName with predefined list of books.

    'use strict';
    exports.handler = (event, context, callback) => {
        const sessionAttributes = event.sessionAttributes;
        const slots = event.currentIntent.slots;
        const bookName = slots.bookName;
      
        // predefined list of available books
        const validBooks = ['harry potter', 'twilight', 'wings of fire'];
      
        // negative check: if valid slot value is not obtained, inform lex that user is expected 
        // respond with a slot value 
        if (bookName && !(bookName === "") && validBooks.indexOf(bookName.toLowerCase()) === -1) {
            let response = { sessionAttributes: event.sessionAttributes,
              dialogAction: {
                type: "ElicitSlot",
                 message: {
                   contentType: "PlainText",
                   content: `We do not have book: ${bookName}, Provide any other book name. For. e.g twilight.`
                },
                 intentName: event.currentIntent.name,
                 slots: slots,
                 slotToElicit : "bookName"
              }
            }
            callback(null, response);
        }
      
        // if valid book name is obtained, send command to choose next course of action
        let response = {sessionAttributes: sessionAttributes,
          dialogAction: {
            type: "Delegate",
            slots: event.currentIntent.slots
          }
        }
        callback(null, response);
    };

    Fulfillment code hook

    This lambda function is invoked after receiving all slot data required to fulfill the intent.

    'use strict';
    
    exports.handler = (event, context, callback) => {
        // when intent get fulfilled, inform lex to complete the state
        let response = {sessionAttributes: event.sessionAttributes,
          dialogAction: {
            type: "Close",
            fulfillmentState: "Fulfilled",
            message: {
              contentType: "PlainText",
              content: "Thanks for purchasing book."
            }
          }
        }
        callback(null, response);
    };

    Error Handling: We can customize the error message for our bot users. Click on error handling and replace default values with the required ones. Since the number of retries given is two, we can also provide different message for every retry.

    Your Bot is Now Ready To Chat

    Click on Build to build the chat-bot. Congratulations! Your Lex chat-bot is ready to test. We can test it in the overlay which appears in the Amazon Lex console.

    Sample conversations:

    I hope you have understood the basic terminologies of Amazon Lex along with how to create a simple chat-bot using serverless (Amazon Lambda). This is a really powerful platform to build mature and intelligent chatbots.

  • Getting Started With Golang Channels! Here’s Everything You Need to Know

    We live in a world where speed is important. With cutting-edge technology coming into the telecommunications and software industry, we expect to get things done quickly. We want to develop applications that are fast, can process high volumes of data and requests, and keep the end-user happy. 

    This is great, but of course, it’s easier said than done. That’s why concurrency and parallelism are important in application development. We must process data as fast as possible. Every programming language has its own way of dealing with this, and we will see how Golang does it.  

    Now, many of us choose Golang because of its concurrency, and the inclusion of goroutines and channels has massively impacted the concurrency.

    This blog will cover channels and how they work internally, as well as their key components. To benefit the most from this content, it will help to know a little about goroutines and channels as this blog gets into  the internals of channels. If you don’t know anything, then don’t worry, we’ll be starting off with an introduction to channels, and then we’ll see how they operate.

    What are channels?

    Normally, when we talk about channels, we think of the ones in applications like RabbitMQ, Redis, AWS SQS, and so on. Anyone with no or only a small amount of Golang knowledge would think like this. But Channels in Golang are different from a work queue system. In the work queue system like above, there are TCP connections to the channels, but in Go, the channel is a data structure or  even a design pattern, which we’ll explain later. So, what are the channels in Golang exactly?

    Channels are the medium through which goroutines can communicate with each other. In simple terms, a channel is a pipe that allows a goroutine to either put or read the data. 

    What are goroutines?

    So, a channel is a communication medium for goroutines. Now, let’s give a quick overview of what goroutines are. If you know this already, feel free to skip this section.

    Technically, a goroutine is a function that executes independently in a concurrent fashion. In simple terms, it’s a lightweight thread that’s managed by go runtime. 

    You can create a goroutine by using a Go keyword before a function call.

    Let’s say there’s a function called PrintHello, like this:

    func PrintHello() {
       fmt.Println("Hello")
    }

    You can make this into a goroutine simply by calling this function, as below:

    //create goroutine
     go PrintHello()

    Now, let’s head back to channels, as that’s the important topic of this blog. 

    How to define a channel?

    Let’s see a syntax that will declare a channel. We can do so by using the chan keyword provided by Go.

    You must specify the data type as the channel can handle data of the same data type. 

    //create channel
     var c chan int

    Very simple! But this is not useful since it would create a Nil channel. Let’s print it and see.

    fmt.Println(c)
    fmt.Printf("Type of channel: %T", c)
    <nil>
    Type of channel: chan int

    As you can see, we have just declared the channel, but we can’t transport data through it. So, to create a useful channel, we must use the make function.

    //create channel
    c := make(chan int)
    fmt.Printf("Type of `c`: %T\n", c)
    fmt.Printf("Value of `c` is %v\n", c)
     
    Type of `c`: chan int
    Value of `c` is 0xc000022120

    As you may notice here, the value of c is a memory address. Keep in mind that channels are nothing but pointers. That’s why we can pass them to goroutines, and we can easily put the data or read the data. Now, let’s quickly see how to read and write the data to a channel.

    Read and write operations on a channel:

    Go provides an easy way to read and write data to a channel by using the left arrow.

    c <- 10

    This is a simple syntax to put the value in our created channel. The same syntax is used to define the “send” only type of channels.

    And to get/read the data from channel, we do this:

    <-c

    This is also the way to define the “receive” only type of channels.

    Let’s see a simple program to use the channels.

    func printChannelData(c chan int) {
       fmt.Println("Data in channel is: ", <-c)
    }

    This simple function just prints whatever data is in the channel. Now, let’s see the main function that will push the data into the channel.

    func main() {
       fmt.Println("Main started...")
       //create channel of int
       c := make(chan int)
       // call to goroutine
       go printChannelData(c)
       // put the data in channel
       c <- 10
       fmt.Println("Main ended...")
    }

    This yields to the output:

    Main started...
    Data in channel is:  10
    Main ended...

    Let’s talk about the execution of the program. 

    1. We declared a printChannelData function, which accepts a channel c of data type integer. In this function, we are just reading data from channel c and printing it.

    2. Now, this method will first print “main started…” to the console.

    3. Then, we have created the channel c of data type integer using the make keyword.

    4. We now pass the channel to the function printChannelData, and as we saw earlier, it’s a goroutine. 

    5. At this point, there are two goroutines. One is the main goroutine, and the other is what we have declared. 

    6. Now, we are putting 10 as data in the channel, and at this point, our main goroutine is blocked and waiting for some other goroutine to read the data. The reader, in this case, is the printChannelData goroutine, which was previously blocked because there was no data in the channel. Now that we’ve pushed the data onto the channel, the Go scheduler (more on this later in the blog) now schedules printChannelData goroutine, and it will read and print the value from the channel.

    7. After that, the main goroutine again activates and prints “main ended…” and the program stops. 

    So, what’s happening here? Basically, blocking and unblocking operations are done over goroutines by the Go scheduler. Unless there’s data in a channel you can’t read from it, which is why our printChannelData goroutine was blocked in the first place, the written data has to be read first to resume further operations. This happened in case of our main goroutine.

    With this, let’s see how channels operate internally. 

    Internals of channels:

    Until now, we have seen how to define a goroutine, how to declare a channel, and how to read and write data through a channel with a very simple example. Now, let’s look at how Go handles this blocking and unblocking nature internally. But before that, let’s quickly see the types of channels.

    Types of channels:

    There are two basic types of channels: buffered channels and unbuffered channels. The above example illustrates the behaviour of unbuffered channels. Let’s quickly see the definition of these:

    • Unbuffered channel: This is what we have seen above. A channel that can hold a single piece of data, which has to be consumed before pushing other data. That’s why our main goroutine got blocked when we added data into the channel. 
    • Buffered channel: In a buffered channel, we specify the data capacity of a channel. The syntax is very simple. c := make(chan int,10)  the second argument in the make function is the capacity of a channel. So, we can put up to ten elements in a channel. When the capacity is full, then that channel would get blocked so that the receiver goroutine can start consuming it.

    Properties of a channel:

    A channel does lot of things internally, and it holds some of the properties below:

    • Channels are goroutine-safe.
    • Channels can store and pass values between goroutines.
    • Channels provide FIFO semantics.
    • Channels cause goroutines to block and unblock, which we just learned about. 

    As we see the internals of a channel, you’ll learn about the first three properties.

    Channel Structure:

    As we learned in the definition, a channel is data structure. Now, looking at the properties above, we want a mechanism that handles goroutines in a synchronized manner and with a FIFO semantics. This can be solved using a queue with a lock. So, the channel internally behaves in that fashion. It has a circular queue, a lock, and some other fields. 

    When we do this c := make(chan int,10) Go creates a channel using hchan struct, which has the following fields: 

    type hchan struct {
       qcount   uint           // total data in the queue
       dataqsiz uint           // size of the circular queue
       buf      unsafe.Pointer // points to an array of dataqsiz elements
       elemsize uint16
       closed   uint32
       elemtype *_type // element type
       sendx    uint   // send index
       recvx    uint   // receive index
       recvq    waitq  // list of recv waiters
       sendq    waitq  // list of send waiters
     
       // lock protects all fields in hchan, as well as several
       // fields in sudogs blocked on this channel.
       //
       // Do not change another G's status while holding this lock
       // (in particular, do not ready a G), as this can deadlock
       // with stack shrinking.
       lock mutex
    }

    (Above info taken from Golang.org]

    This is what a channel is internally. Let’s see one-by-one what these fields are. 

    qcount holds the count of items/data in the queue. 

    dataqsize is the size of a circular queue. This is used in case of buffered channels and is the second parameter used in the make function.

    elemsize is the size of a channel with respect to a single element.

    buf is the actual circular queue where the data is stored when we use buffered channels.

    closed indicates whether the channel is closed. The syntax to close the channel is close(<channel_name>). The default value of this field is 0, which is set when the channel gets created, and it’s set to 1 when the channel is closed. 

    sendx and recvx indicates the current index of a buffer or circular queue. As we add the data into the buffered channel, sendx increases, and as we start receiving, recvx increases.

    recvq and sendq are the waiting queue for the blocked goroutines that are trying to either read data from or write data to the channel.

    lock is basically a mutex to lock the channel for each read or write operation as we don’t want goroutines to go into deadlock state.

    These are the important fields of a hchan struct, which comes into the picture when we create a channel. This hchan struct basically resides on a heap and the make function gives us a pointer to that location. There’s another struct known as sudog, which also comes into the picture, but we’ll learn more about that later. Now, let’s see what happens when we write and read the data.

    Read and write operations on a channel:

    We are considering buffered channels in this. When one goroutine, let’s say G1, wants to write the data onto a channel, it does following:

    • Acquire the lock: As we saw before, if we want to modify the channel, or hchan struct, we must acquire a lock. So, G1 in this case, will acquire a lock before writing the data.
    • Perform enqueue operation: We now know that buf is actually a circular queue that holds the data. But before enqueuing the data, goroutine does a memory copy operation on the data and puts the copy into the buffer slot. We will see an example of this.
    • Release the lock: After performing an enqueue operation, it just releases the lock and goes on performing further executions.

    When goroutine, let’s say G2, reads the above data, it performs the same operation, except instead of enqueue, it performs dequeue while also performing the memory copy operation. This states that in channels there’s no shared memory, so the goroutines only share the hchan struct, which is protected by mutex. Others are just copies of memory.

    This satisfies the famous Golang quote:  “Do not communicate by sharing memory instead share memory by communicating.” 

    Now, let’s look at a small example of this memory copy operation.

    func printData(c chan *int) {
       time.Sleep(time.Second * 3)
       data := <-c
       fmt.Println("Data in channel is: ", *data)
    }
     
    func main() {
       fmt.Println("Main started...")
       var a = 10
       b := &a
       //create channel
       c := make(chan *int)
       go printData(c)
       fmt.Println("Value of b before putting into channel", *b)
       c <- b
       a = 20
       fmt.Println("Updated value of a:", a)
       fmt.Println("Updated value of b:", *b)
       time.Sleep(time.Second * 2)
       fmt.Println("Main ended...")
    }

    And the output of this is:

    Main started...
    Value of b before putting into channel 10
    Updated value of a: 20
    Updated value of b: 20
    Data in channel is:  10
    Main ended...

    So, as you can see, we have added the value of variable a into the channel, and we modify that value before the channel can access it. However, the value in the channel stays the same, i.e., 10. Because here, the main goroutine has performed a memory copy operation before putting the value onto the channel. So, even if you change the value later, the value in the channel does not change. 

    Write in case of buffer overflow:

    We’ve seen that the Go routine can add data up to the buffer capacity, but what happens when the buffer capacity is reached? When the buffer has no more space and a goroutine, let’s say G1, wants to write the data, the go scheduler blocks/pauses G1, which will wait until a receive happens from another goroutine, say G2. Now, since we are talking about buffer channels, when G2 consumes all the data, the Go scheduler makes G1 active again and G2 pauses. Remember this scenario, as we’ll use G1 and G2 frequently here onwards.

    We know that goroutine works in a pause and resume fashion, but who controls it? As you might have guessed, the Go scheduler does the magic here. There are few things that the Go scheduler does and those are very important considering the goroutines and channels.

    Go Runtime Scheduler

    You may already know this, but goroutines are user-space threads. Now, the OS can schedule and manage threads, but it’s overhead to the OS, considering the properties that threads carry. 

    That’s why the Go scheduler handles the goroutines, and it basically multiplexes the goroutines on the OS threads. Let’s see how.

    There are scheduling models, like 1:1, N:1, etc., but the Go scheduler uses the M:N scheduling model.

    Basically, this means that there are a number of goroutines and OS threads, and the scheduler basically schedules the M goroutines on N OS threads. For example:

    OS Thread 1:

    OS Thread 2:

    As you can see, there are two OS threads, and the scheduler is running six goroutines by swapping them as needed. The Go scheduler has three structures as below:

    • M: M represents the OS thread, which is entirely managed by the OS, and it’s similar to POSIX thread. M stands for machine.
    • G: G represents the goroutine. Now, a goroutine is a resizable stack that also includes information about scheduling, any channel it’s blocked on, etc.
    • P: P is a context for scheduling. This is like a single thread that runs the Go code to multiplex M goroutines to N OS threads. This is important part, and that’s why P stands for processor.

    Diagrammatically, we can represent the scheduler as:

    (This diagram is referenced from The Go scheduler]

    The P processor basically holds the queue of runnable goroutines—or simply run queues.

    So, anytime the goroutine (G) wants to run it on a OS thread (M), that OS thread first gets hold of P i.e., the context. Now, this behaviour occurs when a goroutine needs to be paused and some other goroutines must run. One such case is a buffered channel. When the buffer is full, we pause the sender goroutine and activate the receiver goroutine.

    Imagine the above scenario: G1 is a sender that tries to send a full buffered channel, and G2 is a receiver goroutine. Now, when G1 wants to send a full channel, it calls into the runtime Go scheduler and signals it as gopark. So, now scheduler, or M, changes the state of G1 from running to waiting, and it will schedule another goroutine from the run queue, say G2.

    This transition diagram might help you better understand:

    As you can see, after the gopark call, G1 is in a waiting state and G2 is running. We haven’t paused the OS thread (M); instead, we’ve blocked the goroutine and scheduled another one. So, we are using maximum throughput of an OS thread. The context switching of goroutine is handled by the scheduler (P), and because of this, it adds complexity to the scheduler. 

    This is great. But how do we resume G1 now because it still wants to add the data/task on a channel, right? So, before G1 sends the gopark signal, it actually sets a state of itself on a hchan struct, i.e., our channel in the sendq field. Remember the sendq and recvq fields? They’re waiting senders and receivers. 

    Now, G1 stores the state of itself as a sudog struct. A sudog is simply a goroutine that is waiting on an element. The sudog struct has these elements:

    type sudog struct{
       g *g
       isSelect bool
       next *sudog
       prev *sudog
       elem unsafe.Pointer //data element
       ...
    }

    g is a waiting goroutine, next and prev are the pointers to sudog/goroutine respectively if there’s any next or previous goroutine present, and elem is the actual element it’s waiting on.

    So, considering our example, G1 is basically waiting to write the data so it will create a state of itself, which we’ll call sudog as below:

    Cool. Now we know, before going into the waiting state, what operations G1 performs. Currently, G2 is in a running state, and it will start consuming the channel data.

    As soon as it receives the first data/task, it will check the waiting goroutine in the sendq attribute of an hchan struct, and it will find that G1 is waiting to push data or a task. Now, here is the interesting thing: G2 will copy that data/task to the buffer, and it will call the scheduler, and the scheduler will put G1 from the waiting state to runnable, and it will add G1 to the run queue and return to G2. This call from G2 is known as goready, and it will happen for G1. Impressive, right? Golang behaves like this because when G1 runs, it doesn’t want to hold onto a lock and push the data/task. That extra overhead is handled by G2. That’s why the sudog has the data/task and the details for the waiting goroutine. So, the state of G1 is like this:

    As you can see, G1 is placed on a run queue. Now we know what’s done by the goroutine and the go scheduler in case of buffered channels. In this example, the sender gorountine came first, but what if the receiver goroutine comes first? What if there’s no data in the channel and the receiver goroutine is executed first? The receiver goroutine (G2) will create a sudog in recvq on the hchan struct. Things are a little twisted when G1 goroutine activates. It will now see whether there are any goroutines waiting in the recvq, and if there is, it will copy the task to the waiting goroutine’s (G2) memory location, i.e., the elem attribute of the sudog. 

    This is incredible! Instead of writing to the buffer, it will write the task/data to the waiting goroutine’s space simply to avoid G2’s overhead when it activates. We know that each goroutine has its own resizable stack, and they never use each other’s space except in case of channels. Until now, we have seen how the send and receive happens in a buffered channel.

    This may have been confusing, so let me give you the summary of the send operation. 

    Summary of a send operation for buffered channels:

    1. Acquire lock on the entire channel or the hchan struct.
    2. Check if there’s any sudog or a waiting goroutine in the recvq. If so, then put the element directly into its stack. We saw this just now with G1 writing to G2’s stack.
    3. If recvq is empty, then check whether the buffer has space. If yes, then do a memory copy of the data. 
    4. If the buffer is full, then create a sudog under sendq of the hchan struct, which will have details, like a currently executing goroutine and the data to put on the channel.

    We have seen all the above steps in detail, but concentrate on the last point. 

    It’s kind of similar to an unbuffered channel. We know that for unbuffered channels, every read must have a write operation first and vice versa.

    So, keep in mind that an unbuffered channel always works like a direct send. So, a summary of a read and write operation in unbuffered channel could be:

    • Sender first: At this point, there’s no receiver, so the sender will create a sudog of itself and the receiver will receive the value from the sudog.
    • Receiver first: The receiver will create a sudog in recvq, and the sender will directly put the data in the receiver’s stack.

    With this, we have covered the basics of channels. We’ve learned how read and write operates in a buffered and unbuffered channel, and we talked about the Go runtime scheduler.

    Conclusion:

    Channels is a very interesting Golang topic. They seem to be difficult to understand, but when you learn the mechanism, they’re very powerful and help you to achieve concurrency in applications. Hopefully, this blog helps your understanding of the fundamental concepts and the operations of channels.