Bots are the flavor of the season. Everyday, we hear about a new bot catering to domains like travel, social, legal, support, sales, etc. being launched. Facebook Messenger alone has more than 11,000 bots when I last checked and must have probably added thousands of them as I write this article.
The first generation of bots were dumb since they could understand only a limited set of queries based on keywords in the conversation. But the commoditization of NLP(Natural Language Processing) and machine learning by services like Wit.ai, API.ai, Luis.ai, Amazon Lex, IBM Watson, etc. has resulted in the growth of intelligent bots like donotpay, chatShopper. I don’t know if bots are just hype or the real deal. But I can say with certainty that building a bot is fun and challenging at the same time. In this article, I would like to introduce you to some of the tools to build an intelligent chatbot.
The title of the blog clearly tells that we have used Botkit and Rasa (NLU) to build our bot. Before getting into the technicalities, I would like to share the reason for choosing these two platforms and how they fit our use case. Also read – How to build a serverless chatbot with Amazon Lex.
Bot development Framework — Howdy, Botkit and Microsoft (MS) Bot Framework were good contenders for this. Both these frameworks: – are open source – have integrations with popular messaging platforms like Slack, Facebook Messenger, Twilio etc – have good documentation – have an active developer community
Due to compliance issues, we had chosen AWS to deploy all our services and we wanted the same with the bot as well.
NLU (Natural Language Understanding) — API.ai (acquired by google) and Wit.ai (acquired by Facebook) are two popular NLU tools in the bot industry which we first considered for this task. Both the solutions: – are hosted as a cloud service – have Nodejs, Python SDK and a REST interface – have good documentation – support for state or contextual intents which makes it very easy to build a conversational platform on top of it.
As stated before, we couldn’t use any of these hosted solutions due to compliance and that is where we came across an open source NLU called Rasa which was a perfect replacement for API.ai and Wit.ai and at the same time, we could host and manage it on AWS.
You would now be wondering why I used the term NLU for Api.ai and Wit.ai and not NLP (Natural Language Processing). * NLP refers to all the systems which handle the interactions with humans in the way humans find it natural. It means that we could converse with a system just the way we talk to other human beings. * NLU is a subfield of NLP which handles a narrow but complex challenge of converting unstructured inputs into a structured form which a machine can understand and act upon. So when you say “Book a hotel for me in San Francisco on 20th April 2017”, the bot uses NLU to extract date=20th April 2017, location=San Francisco and action=book hotel which the system can understand.
RASA NLU
In this section, I would like to explain Rasa in detail and some terms used in NLP which you should be familiar with. * Intent: This tells us what the user would like to do. Ex : Raise a complaint, request for refund etc
* Entities: These are the attributes which gives details about the user’s task. Ex — Complaint regarding service disruptions, refund cost etc
* Confidence Score : This is a distance metric which indicates how closely the NLU could classify the result into the list of intents.
Here is an example to help you understand the above mentioned terms — Input:“My internet isn’t working since morning”. – intent: “service_interruption” – entities: “service=internet”, “duration=morning”. – confidence score: 0.84 (This could vary based on your training)
NLU’s job (Rasa in our case) is to accept a sentence/statement and give us the intent, entities and a confidence score which could be used by our bot. Rasa basically provides a high level API over various NLP and ML libraries which does intent classification and entity extraction. These NLP and ML libraries are called as backend in Rasa which brings the intelligence in Rasa. These are some of the backends used with Rasa
MITIE — This is an all inclusive library meaning that it has NLP library for entity extraction as well as ML library for intent classification built into it.
spaCy + sklearn — spaCy is a NLP library which only does entity extraction. sklearn is used with spaCy to add ML capabilities for intent classification.
MITIE + sklearn — This uses best of both the worlds. This uses good entity recognition available in MITIE along with fast and good intent classification in sklearn.
I have used MITIE backend to train Rasa. For the demo, I’ve taken a “Live Support ChatBot” which is trained for messages like this: * My phone isn’t working. * My phone isn’t turning on. * My phone crashed and isn’t working anymore.
My training data looks like this:
{"rasa_nlu_data": {"common_examples": [ {"text": "hi","intent": "greet","entities": [] }, {"text": "my phone isn't turning on.","intent": "device_failure","entities": [ {"start": 3,"end": 8,"value": "phone","entity": "device" } ] }, {"text": "my phone is not working.","intent": "device_failure","entities": [ {"start": 3,"end": 8,"value": "phone","entity": "device" } ] }, {"text": "My phone crashed and isn’t working anymore.","intent": "device_failure","entities": [ {"start": 3,"end": 8,"value": "phone","entity": "device" } ] } ] }}
NOTE — We have observed that MITIE gives better accuracy than spaCy + sklearn for a small training set but as you keep adding more intents, training on MITIE gets slower and slower. For a training set of 200+ examples with about 10–15 intents, MITIE takes about 35–45 minutes for us to train on a C4.4xlarge instance(16 cores, 30 GB RAM) on AWS.
Botkit-Rasa Integration
Botkit is an open source bot development framework designed by the creators of Howdy. It basically provides a set of tools for building bots on Facebook Messenger, Slack, Twilio, Kik and other popular platforms. They have also come up with an IDE for bot development called Botkit Studio. To summarize, Botkit is a tool which allows us to write the bot once and deploy it on multiple messaging platforms.
Botkit also has a support for middleware which can be used to extend the functionality of botkit. Integrations with database, CRM, NLU and statistical tools are provided via middleware which makes the framework extensible. This design also allows us to easily add integrations with other tools and software by just writing middleware modules for them.
I’ve integrated Slack and botkit for this demo. You can use this boilerplate template to setup botkit for Slack. We have extended Botkit-Rasa middleware which you can find here.
Botkit-Rasa has 2 functions: receive and hears which override the default botkit behaviour. 1. receive — This function is invoked when botkit receives a message. It sends the user’s message to Rasa and stores the intent and entities into the botkit message object.
2. hears — This function overrides the default botkit hears method i.e controller.hears. The default hears method uses regex to search the given patterns in the user’s message while the hears method from Botkit-Rasa middleware searches for the intent.
let Botkit =require('botkit');let rasa =require('./Middleware/rasa')({rasa_uri: 'http://localhost:5000'});let controller = Botkit.slackbot({ clientId: process.env.clientId, clientSecret: process.env.clientSecret, scopes: ['bot'], json_file_store: __dirname +'/.db/'});// Override receive method in botkitcontroller.middleware.receive.use(rasa.receive);// Override hears method in botkitcontroller.changeEars(function (patterns, message) {return rasa.hears(patterns, message);});controller.setupWebserver(3000, function (err, webserver) {// Configure a route to receive webhooks from slack controller.createWebhookEndpoints(webserver);});
Let’s try an example — “my phone is not turning on”. Rasa will return the following 1. Intent — device_failure 2. Entites — device=phone
If you notice carefully, the input I gave i.e “my phone is not turning on” is a not present in my training file. Rasa has some intelligence built into it to identify the intent and entities correctly for such combinations.
We need to add a hears method listening to intent “device_failure” to process this input. Remember that intent and entities returned by Rasa will be stored in the message object by Rasa-Botkit middleware.
let Botkit =require('botkit');let rasa =require('./Middleware/rasa')({rasa_uri: 'http://localhost:5000'});let controller = Botkit.slackbot({ clientId: process.env.clientId, clientSecret: process.env.clientSecret, scopes: ['bot'], json_file_store: __dirname +'/.db/'});// Override receive method in botkitcontroller.middleware.receive.use(rasa.receive);// Override hears method in botkitcontroller.changeEars(function (patterns, message) {return rasa.hears(patterns, message);});controller.setupWebserver(3000, function (err, webserver) {// Configure a route to receive webhooks from slack controller.createWebhookEndpoints(webserver);});
You should be able run this bot with slack and see the output as shown below (support_bot is the name of my bot).
Conclusion
You are now familiar with the process of building chatbots with a bot development framework and a NLU. Hope this helps you get started on your bot very quickly. If you have any suggestions, questions, feedback then tweet me @harjun1601. Keep following our blogs for more articles on bot development, ML and AI.
Almost all the applications that you work on or deal with throughout the day use SMS (short messaging service) as an efficient and effective way to communicate with end users.
Some very common use-cases include:
Receiving an OTP for authenticating your login
Getting deals from the likes of Flipkart and Amazon informing you regarding the latest sale.
Getting reminder notifications for the doctor’s appointment that you have
Getting details for your debit and credit transactions.
The practical use cases for an SMS can be far-reaching.
Even though SMS integration forms an integral part of any application, due to the limitations and complexities involved in automating it via web automation tools like selenium, these are often neglected to be automated.
Teams often opt for verifying these sets of test cases manually, which, even though is important in getting bugs earlier, it does pose some real-time challenges.
Pitfalls with Manual Testing
With these limitations, you obviously do not want your application sending faulty Text Messages after that major Release.
Automation Testing … #theSaviour
To overcome the limitations of manual testing, delegating your task to a machine comes in handy.
Now that we have talked about the WHY, we will look into HOW the feature can be automated. Technically, you shouldn’t / can’t use selenium to read the SMS via mobile. So, we were looking for a third-party library that is
Easy to integrate with the existing code base
Supports a range of languages
Does not involve highly complex codes and focuses on the problem at hand
Supports both incoming and outgoing messages
After a lot of research, we settled with Twilio.
In this article, we will look at an example of working with Twilio APIs to Read SMS and eventually using it to automate SMS flows.
Twilio supports a bunch of different languages. For this article, we stuck with Node.js
Account Setup
Registration
To start working with the service, you need to register.
Once that is done, Twilio will prompt you with a bunch of simple questions to understand why you want to use their service.
Twilio Dashboard
A trial balance of $15.50 is received upon signing up for your usage. This can be used for sending and receiving text messages. A unique Account SID and Auth Token is also generated for your account.
Buy a Number
Navigate to the buy a number link under Phone Numbers > Manage and purchase a number that you would eventually be using in your automation scripts for receiving text messages from the application.
Note – for the free trial, Twilio does not support Indian Number (+91)
Code Setup
Install Twilio in your code base
Code snippet
For simplification, Just pass in the accountSid and authToken that you will receive from the Dashboard Console to the twilio library.This would return you with a client object containing the list of all the messages in your inbox.
List Messages matching filter criteria: If you’d like to have Twilio narrow down this list of messages for you, you can do so by specifying a To number, From the number, and a DateSent.
Get a Message : If you know the message SID (i.e., the message’s unique identifier), then you can retrieve that specific message directly. Using this method, you can send emails without attachments.
The trial version does not support Indian numbers (+91).
The trial version just provides an initial balance of $15.50. This is sufficient enough for your use case that involves only receiving messages on your Twilio number. But if the use case requires sending back the message from the Twilio number, a paid version can solve the purpose.
Messages sent via a short code (557766) are not received on the Twilio number. Only long codes are accepted in the trial version.
You can buy only a single number with the trial version. If purchasing multiple numbers is required, the user may have to switch to a paid version.
Conclusion
In a nutshell, we saw how important it is to thoroughly verify the SMS functionality of our application since it serves as one of the primary ways of communicating with the end users. We also saw what the limitations are with following the traditional manual testing approach and how automating SMS scenarios would help us deliver high-quality products. Finally, we demonstrated a feasible, efficient and easy-to-use way to Automate SMS test scenarios using Twilio APIs.
Hope this was a useful read and that you will now be able to easily automate SMS scenarios. Happy testing… Do like and share …
Performance is a significant factor for any mobile app, and multiple factors like architecture, logic, memory management, etc. cause low performance. When we develop an app in Flutter, the initial performance results are very good, but as development progresses, the negative effects of a bad codebase start showing up. This blog is aimed at using an architecture that improves Flutter app performance. We will briefly touch base on the following points:
1. What is High-Performance Architecture?
1.1. Framework
1.2. Motivation
1.3. Implementation
2. Sample Project
3. Additional benefits
4. Conclusion
1. What is High-Performance Architecture?
This Architecture uses streams instead of the variable-based state management approach. Streams are the most preferred approach for scenarios in which an app needs data in real-time. Even with these benefits, why are streams not the first choice for developers? One of the reasons is that streams are considered difficult and complicated, but that reputation is slightly overstated. Dart is a programming language designed to have a reactive style system, i.e., architecture, with observable streams, as quoted by Flutter’s Director of Engineering, Eric Seidel in this podcast. [Note: The podcast’s audio is removed but an important part which is related to this architecture can be heard here in Zaiste’s youtube video.]
1.1. Framework:
Figure 01
As shown in figure 01, we have 3 main components:
Supervisor: The Supervisor wraps the complete application, and is the Supervise responsible for creating the singleton of all managers as well as providing this manager’s singleton to the required screen.
Managers: Each Manager has its own initialized streams that any screen can access by accessing the respective singleton. These streams can hold data that we can use anywhere in the application. Plus, as we are using streams, any update in this data will be reflected everywhere at the same time.
Screens: Screens will be on the receiver end of this project. Each screen uses local streams for its operations, and if global action is required, then accesses streams from managers using a singleton.
1.2. Motivation:
Zaiste proposed an idea in 2019 and created a plugin for such architecture. He named it “Sprinkel Architecture” and his plugin is called sprinkle which made our development easy to a certain level. But as of today, his plugin does not support new null safety features introduced in Dart 2.12.0. You can check more about his implementation here and can try his given sample with following command:
flutter run -–no-sound-null-safety
1.3. Implementation:
We will be using the get plugin and rxdart plugins in combination to create our high performance architecture.
The Rxdart plugin will handle the stream creation and manipulation, whereas the get plugin can help us in dependency injection, route management, as well as state management.
2. Sample Project:
We will create a sample project to understand how to implement this architecture.
2.1. Create a project using following command:
flutter create sprinkle_architecture
2.2. Add these under dependencies of pubspec.yaml (and run command flutter pub get):
get: ^4.6.5Rxdart: ^0.27.4
2.3. Create 3 directories, constants, managers, and views, inside the lib directory:
2.4. First, we will start with a manager who will have streams & will increment the counter. Create dart file with name counter_manager.dart under managers directory:
import'package:get/get.dart';classCounterManagerextendsGetLifeCycle { final RxInt count=RxInt(0); int getgetCounter=> count.value;voidincrement() =>count.value = count.value + 1;}
2.5. With this, we have a working manager. Next, we’ll create a Supervisor who will create a singleton of all available managers. In our case, we’ll create a singleton of only one manager. Create a supervisor.dart file in the lib directory:
2.6. This application only has 1 screen, but it is a good practice to create constants related to routing, so let’s add route details. Create a dart file route_paths.dart:
2.9. Finally, add the file counter_page_controller.dart:
import'package:get/get.dart';import'package:sprinkle_architecture_exp/managers/counter_manager.dart';classCounterPageControllerextendsGetxController { final CounterManager manager= Get.find();}
2.10. As well as our landing page counter_page.dart:
import'package:flutter/material.dart';import'package:flutter/widgets.dart';import'package:get/get.dart';import'package:sprinkle_architecture_exp_2/views/counter_page_controller.dart';classCounterPageextendsGetWidget<CounterPageController> { const CounterPage({Key? key, requiredthis.title}) :super(key:key); final String title; CounterPageController getc=> Get.put(CounterPageController()); @override Widget build(BuildContextcontext) {returnObx(() { return Scaffold(appBar:AppBar(title:Text(title)),body:Center(child:Column(mainAxisAlignment:MainAxisAlignment.center,children: <Widget>[constText('You have pushed the button this many times:'),Text('${c.manager.getCounter}',style:Theme.of(context).textTheme.headline4), ], ), ),floatingActionButton:FloatingActionButton(onPressed:c.manager.increment,tooltip:'Increment',child:constIcon(Icons.add), ), ); }); }}
2.11. The get plugin allows us to add 1 controller per screen by using the GetxController class. In this controller, we can do operations whose scope is limited to our screen. Here, CounterPageController provides CounterPage the singleton on CounterManger.
If everything is done as per the above commands, we will end up with the following tree structure:
2.12. Now we can test our project by running the following command:
flutter run
3. Additional Benefits:
3.1. Self Aware UI:
As all managers in our application are using streams to share data, whenever a screen changes managers’ data, the second screens with dependency on that data also update themselves in real-time. This will happen because of the listen() property of streams.
3.2. Modularization:
We have separate managers for handling REST APIs, preferences, appStateInfo, etc. So, the modularization happens automatically. Plus UI logic gets separate from business logic as we are using getXController provided by the get plugin
3.3. Small rebuild footprint:
By default, Flutter rebuilds the whole widget tree for updating the UI but with the get and rxdart plugins, only the dependent widget refreshes itself.
4. Conclusion
We can achieve good performance of a Flutter app with an appropriate architecture as discussed in this blog.
GraphQL is a new hype in the Field of API technologies. We have been constructing and using REST API’s for quite some time now and started hearing about GraphQL recently. GraphQL is usually described as a frontend-directed API technology as it allows front-end developers to request data in a more simpler way than ever before. The objective of this query language is to formulate client applications formed on an instinctive and adjustable format, for portraying their data prerequisites as well as interactions.
The Phoenix Framework is running on Elixir, which is built on top of Erlang. Elixir core strength is scaling and concurrency. Phoenix is a powerful and productive web framework that does not compromise speed and maintainability. Phoenix comes in with built-in support for web sockets, enabling you to build real-time apps.
Prerequisites:
Elixir & Erlang: Phoenix is built on top of these
Phoenix Web Framework: Used for writing the server application. (It’s a well-unknown and lightweight framework in elixir)
Absinthe: GraphQL library written for Elixir used for writing queries and mutations.
GraphiQL: Browser based GraphQL ide for testing your queries. Consider it similar to what Postman is used for testing REST APIs.
Overview:
The application we will be developing is a simple blog application written using Phoenix Framework with two schemas User and Post defined in Accounts and Blog resp. We will design the application to support API’s related to blog creation and management. Assuming you have Erlang, Elixir and mix installed.
Where to Start:
At first, we have to create a Phoenix web application using the following command:
mix phx.new --no-brunch --no-html
• –no-brunch – do not generate brunch files for static asset building. When choosing this option, you will need to manually handle JavaScript dependencies if building HTML apps
• –-no-html – do not generate HTML views.
Note: As we are going to mostly work with API, we don’t need any web pages, HTML views and so the command args and
Dependencies:
After we create the project, we need to add dependencies in mix.exs to make GraphQL available for the Phoenix application.
We can used following components to design/structure our GraphQL application:
GraphQL Schemas : This has to go inside lib/graphql_web/schema/schema.ex. The schema definitions your queries and mutations.
Custom types: Your schema may include some custom properties which should be defined inside lib/graphql_web/schema/types.ex
Resolvers: We have to write respective Resolver Function’s that handles the business logic and has to be mapped with respective query or mutation. Resolvers should be defined in their own files. We defined it inside lib/graphql/accounts/user_resolver.ex and lib/graphql/blog/post_resolver.ex folder.
Also, we need to uppdate the router we have to be able to make queries using the GraphQL client in lib/graphql_web/router.ex and also have to create a GraphQL pipeline to route the API request which also goes inside lib/graphql_web/router.ex:
pipeline :graphql do plug Graphql.Context #custom plug written into lib/graphql_web/plug/context.ex folderendscope "/api"dopipe_through(:graphql) #pipeline through which the request have to be routedforward("/", Absinthe.Plug, schema: GraphqlWeb.Schema)forward("/graphiql", Absinthe.Plug.GraphiQL, schema: GraphqlWeb.Schema)end
Writing GraphQL Queries:
Lets write some graphql queries which can be considered to be equivalent to GET requests in REST. But before getting into queries lets take a look at GraphQL schema we defined and its equivalent resolver mapping:
defmodule GraphqlWeb.Schema do use Absinthe.Schemaimport_types(GraphqlWeb.Schema.Types) query dofield :blog_posts, list_of(:blog_post) doresolve(&Graphql.Blog.PostResolver.all/2) endfield :blog_post, type: :blog_post doarg(:id, non_null(:id))resolve(&Graphql.Blog.PostResolver.find/2) endfield :accounts_users, list_of(:accounts_user) doresolve(&Graphql.Accounts.UserResolver.all/2) endfield :accounts_user, :accounts_user doarg(:email, non_null(:string))resolve(&Graphql.Accounts.UserResolver.find/2) end endend
You can see above we have defined four queries in the schema. Lets pick a query and see what goes into it :
field :accounts_user, :accounts_user doarg(:email, non_null(:string))resolve(&Graphql.Accounts.UserResolver.find/2)end
Above, we have retrieved a particular user using his email address through Graphql query.
arg(:, ): defines an non-null incoming string argument i.e user email for us.
Graphql.Accounts.UserResolver.find/2 : the resolver function that is mapped via schema, which contains the core business logic for retrieving an user.
Accounts_user : the custome defined type which is defined inside lib/graphql_web/schema/types.ex as follows:
We need to write a separate resolver function for every query we define. Will go over the resolver function for accounts_user which is present in lib/graphql/accounts/user_resolver.ex file:
defmodule Graphql.Accounts.UserResolver do alias Graphql.Accounts #import lib/graphql/accounts/accounts.ex as Accounts def all(_args, _info) do {:ok, Accounts.list_users()} end def find(%{email: email}, _info) do case Accounts.get_user_by_email(email) do nil -> {:error, "User email #{email} not found!"} user -> {:ok, user} end endend
This function is used to list all users or retrieve a particular user using an email address. Let’s run it now using GraphiQL browser. You need to have the server running on port 4000. To start the Phoenix server use:
mix deps.get #pulls all the dependenciesmix deps.compile #compile your codemix phx.server #starts the phoenix server
Let’s retrieve an user using his email address via query:
Above, we have retrieved the id, email and name fields by executing accountsUser query with an email address. GraphQL also allow us to define variables which we will show later when writing different mutations.
Let’s execute another query to list all blog posts that we have defined:
Writing GraphQL Mutations:
Let’s write some GraphQl mutations. If you have understood the way graphql queries are written mutations are much simpler and similar to queries and easy to understand. It is defined in the same form as queries with a resolver function. Different mutations we are gonna write are as follow:
create_post:- create a new blog post
update_post :- update a existing blog post
delete_post:- delete an existing blog post
The mutation looks as follows:
defmodule GraphqlWeb.Schema do use Absinthe.Schemaimport_types(GraphqlWeb.Schema.Types) query do mutation dofield :create_post, type: :blog_post doarg(:title, non_null(:string))arg(:body, non_null(:string))arg(:accounts_user_id, non_null(:id))resolve(&Graphql.Blog.PostResolver.create/2) endfield :update_post, type: :blog_post doarg(:id, non_null(:id))arg(:post, :update_post_params)resolve(&Graphql.Blog.PostResolver.update/2) endfield :delete_post, type: :blog_post doarg(:id, non_null(:id))resolve(&Graphql.Blog.PostResolver.delete/2) end end endend
Let’s run some mutations to create a post in GraphQL:
Notice the method is POST and not GET over here.
Let’s dig into update mutation function :
field :update_post, type: :blog_post doarg(:id, non_null(:id))arg(:post, :update_post_params)resolve(&Graphql.Blog.PostResolver.update/2)end
Here, update post takes two arguments as input , non null id and a post parameter of type update_post_params that holds the input parameter values to update. The mutation is defined in lib/graphql_web/schema/schema.ex while the input parameter values are defined in lib/graphql_web/schema/types.ex —
In the first part of this blog series, getting started with Kubernetes operators (Helm based), we learned the basics of operators and build a Helm based operator. In this blog post, we will try out an Ansible-based operator. Ansible is a very popular tool used by organizations across the globe for configuration management, deployment, and automation of other operational tasks, this makes Ansible an ideal tool to build operators as with operators also we intend to eliminate/minimize the manual interventions required while running/managing our applications on Kubernetes. Ansible based operators allow us to use Ansible playbooks and roles to manage our application on Kubernetes.
Operator Maturity Model
Image source: Github
Before we start building the operator let’s spend some time in understanding the operator maturity model. Operator maturity model gives an idea of the kind of application management capabilities different types of operators can have. As we can see in the diagram above the model describes five generic phases of maturity/capability for operators. The minimum expectation/requirement from an operator is that they should be able to deploy/install and upgrade application and that is provided by all the operators. Helm based operators are simplest of all of them as Helm is Chart manager and we can do only install and upgrades using it. Ansible based operators can be more mature as Ansible has modules to perform a wide variety of operational tasks, we can use these modules in the Ansible roles/playbooks we use in our operator and make them handle more complex applications or use cases. In the case of Golang based operators, we write the operational logic ourselves so we have the liberty to customize it as per our requirements.
Building an Ansible Based Operator
1. Let’s first install the operator sdk
go get -d github.com/operator-framework/operator-sdkcd $GOPATH/src/github.com/operator-framework/operator-sdkgit checkout master make dep make install
Now we will have the operator-sdk binary in the $GOPATH/bin folder.
2. Setup the project
Operator-sdk new bookstore-operator–api-version=blog.velotio.com/v1alpha1–kind=BookStore–type=ansible
In the above command we have set the operator type as ansible as we want an ansible based operator. It creates a folder structure as shown below
bookstore-operator/||- build/ # Contains the Dockerfile to build the operator image.||- deploy/ # Contains the crd, cr and manifest files for deploying operator.||- roles/ # Contains the helm chart we used while creating the project.||- molecule/ # molecule is used for testing the ansible roles.||- watches.yaml # Specifies the resource the operator watches (maintains the state of).
Inside the roles folder, it creates an Ansible role name `bookstore`. This role is bootstrapped with all the directories and files which are part of the standard ansible roles.
Here we can see that it looks just like the operator is going to watch the events related to the objects of BookStore kind and execute the ansible role bookstore. Drawing parallels from our helm based operator we can see that the behavior in both the cases are similar the only difference being that in case of Helm based operator the operator used to execute the helm chart specified in response to the events related to the object it was watching and here we are executing an ansible role.
In case of ansible based operators, we can get the operator to execute an Ansible playbook as well rather than an ansible role.
3. Building the bookstore Ansible role
Now we need to modify the bookstore Ansible roles created for us by the operator-framework.
First we will update the custom resource (CR) file ( blog_v1alpha1_bookstore_cr.yaml) available at deploy/crd/ location. In this CR we can configure all the values which we want to pass to the bookstore Ansible role. By default the CR contains only the size field, we will update it to include other field which we need in our role. To keep things simple, we will just include some basic variables like image name, tag etc. in our spec.
The Ansible operator passes the key values pairs listed in the spec of the cr as variables to Ansible. The operator changes the name of the variables to snake_case before running Ansible so when we use the variables in our role we will refer the values in snake case.
Next, we need to create the tasks the bookstore roles will execute. Now we will update the tasks to define our deployment. By default an Ansible role executes the tasks defined at `/tasks/main.yml`. For defining our deployment we will leverage k8s module of Ansible. We will create a kubernetes deployment and service for our app as well as mongodb.
In the above file we can see that we have used the pullPolicy field defined in our cr spec as ‘pull_policy’ in our tasks. Here we have used inline definition to create our k8s objects as our app is quite simple. For large applications creating objects using separate definition files would be a better approach.
4 . Build the bookstore-operator image
The Dockerfile for building the operator image is already in our build folder we need to run the below command from the root folder of our operator project to build the image.
You can use your own docker repository instead of ‘akash125/bookstore-operator’
5. Run the bookstore-operator
As we have our operator image ready we can now go ahead and run it. The deployment file (operator.yaml under deploy folder) for the operator was created as a part of our project setup we just need to set the image for this deployment to the one we built in the previous step.
After updating the image in the operator.yaml we are ready to deploy the operator.
Note: The role created might have more permissions then actually required for the operator so it is always a good idea to review it and trim down the permissions in production setups.
Verify that the operator pod is in running state.
Here two containers have been started as part of the operator deployment. One is the operator and the other one is ansible. The ansible pod exists only to make the logs available to stdout in ansible format.
6. Deploy the bookstore app
Now we have the bookstore-operator running in our cluster we just need to create the custom resource for deploying our bookstore app.
First, we can create bookstore cr we need to register its crd
In this blog post, we learned how we can create an Ansible based operator using the operator framework. Ansible based operators are a great way to combine the power of Ansible and Kubernetes as it allows us to deploy our applications using Ansible role and playbooks and we can pass parameters to them (control them) using custom K8s resources. If Ansible is being heavily used across your organization and you are migrating to Kubernetes then Ansible based operators are an ideal choice for managing deployments. In the next blog, we will learn about Golang based operators.
This blog focuses on migrating Kubernetes clusters from one cloud provider to another. We will be migrating our entire data from Google Kubernetes Engine to Azure Kubernetes Service using Velero.
Prerequisite
A Kubernetes cluster > 1.10
Setup Velero with Restic Integration
Velero consists of a client installed on your local computer and a server that runs in your Kubernetes cluster, like Helm.
Installing Velero Client
You can find the latest release corresponding to your OS and system and download Velero from there:
Extract the tarball (change the version depending on yours) and move the Velero binary to /usr/local/bin
$ tar -xvzf velero-v0.11.0-darwin-amd64.tar.gz$ sudo mv velero /usr/local/bin/$ velero help
Create a Bucket for Velero on GCP
Velero needs an object storage bucket where it will store the backup. Create a GCS bucket using:
gsutil mb gs://<bucket-name
Create a Service Account for Velero
# Create a Service Accountgcloud iam service-accounts create velero --display-name "Velero service account"SERVICE_ACCOUNT_EMAIL=$(gcloud iam service-accounts list --filter="displayName:Velero service account"--format 'value(email)')#Define Permissions for the Service AccountROLE_PERMISSIONS=(compute.disks.getcompute.disks.createcompute.disks.createSnapshotcompute.snapshots.getcompute.snapshots.createcompute.snapshots.useReadOnlycompute.snapshots.deletecompute.zones.get)# Create a Role for VeleroPROJECT_ID=$(gcloud config get-value project)gcloud iam roles create velero.server --project $PROJECT_ID --title "Velero Server"--permissions "$(IFS=","; echo "${ROLE_PERMISSIONS[*]}")"# Create a Role Binding for Velerogcloud projects add-iam-policy-binding $PROJECT_ID --member serviceAccount:$SERVICE_ACCOUNT_EMAIL --role projects/$PROJECT_ID/roles/velero.servergsutil iam ch serviceAccount:$SERVICE_ACCOUNT_EMAIL:objectAdmin# Generate Service Key file for Velero and save it for latergcloud iam service-accounts keys create credentials-velero --iam-account $SERVICE_ACCOUNT_EMAIL
Install Velero Server on GKE and AKS
Use the –use-restic flag on the Velero install command to install restic integration.
After that, you can see a DaemonSet of restic and deployment of Velero in your Kubernetes cluster.
$ kubectl get po -n velero
Restic Components
In addition, there are three more Custom Resource Definitions and their associated controllers to provide restic support.
Restic Repository
Maintain the complete lifecycle for Velero’s restic repositories.
Restic lifecycle commands such as restic init check and prune are handled by this CRD controller.
PodVolumeBackup
This CRD backs up the persistent volume based on the annotated pod in selected namespaces.
This controller executes backup commands on the pod to initialize backups.
PodVolumeRestore
This controller restores the respective pods that were inside restic backups. And this controller is responsible for the restore commands execution.
Backup an application on GKE
For this blog post, we are considering that Kubernetes already has an application that is using persistent volumes. Or you can install WordPress as an example as explained here.
We will perform GKE Persistent disk migration to Azure Persistent Disk using Velero.
Follow the below steps:
To back up, the deployment or statefulset checks for the volume name that is mounted to backup that particular persistent volume. For example, here pods need to be annotated with Volume Name “data”.
Take a backup of the entire namespace in which the application is running. You can also specify multiple namespaces or skip this flag to backup all namespaces by default. We are going to backup only one namespace in this blog.
Once the backup is complete, you can list it using:
$ velero backup get
You can also check the backup on GCP Portal under Storage. Select the bucket you created and you should see a similar directory structure:
Restore the application to AKS
Follow the below steps to restore the backup:
Make sure to have the same StorageClass available in Azure as used by GKE Persistent Volumes. For example, if the Storage Class of the PVs is “persistent-ssd”, create the same on AKS using below template:
kind: StorageClassapiVersion: storage.k8s.io/v1metadata:name: persistent-ssd // same name as GKE storageclass nameprovisioner: kubernetes.io/azure-diskparameters: storageaccounttype: Premium_LRSkind: Managed
You can also check on GCP Portal, a new folder “restores” is created under the bucket.
In some time, you should be able to see that the application namespace is back and WordPress and MySQL pods are running again.
Troubleshooting
For any errors/issues related to Velero, you may find below commands helpful for debugging purposes:
# Describe the backup to see the status$ velero backup describe testbackup --details# Check backup logs, and look for errors if any$ velero backup logs testbackup# Describe the restore to see the status$ velero restore describe testrestore --details# Check restore logs, and look for errors if any$ velero restore logs testrestore# Check velero and restic pod logs, and look for errors if any$ kubectl -n velero logs VELERO_POD_NAME/RESTIC_POD_NAMENOTE: You can change the default log-level to debug mode by adding --log-level=debug asanargumenttothecontainercommandintheveleropodtemplatespec.# Describe the BackupStorageLocation resource and look for any errors in Events$ kubectl describe BackupStorageLocation default-n velero
Conclusion
The migration of persistent workloads across Kubernetes clusters on different cloud providers is difficult. This became possible by using restic integration with the Velero backup tool. This tool is still said to be in beta quality as mentioned on the official site. I have performed GKE to AKS migration and it went successfully. You can try other combinations of different cloud providers for migrations.
The only drawback of using Velero to migrate data is if your data is too huge, it may take a while to complete migration. It took me almost a day to migrate a 350 GB disk from GKE to AKS. But, if your data is comparatively less, this should be a very efficient and hassle-free way to migrate it.
The amount of data in our world has been exploding exponentially day by day. Processing and analyzing this Big Data has become key in the current age to make informed, data-driven decisions. Spark is a unified distributed data processing engine used for Big Data. Spark can be used to process Big Data in an efficient manner. Spark lets you process Big Data faster by splitting the work into chunks and assigning those chunks to computation resources across nodes. It can handle up to petabytes of data, which is millions of gigabytes of data. It processes all its data in memory, which makes it faster.
We talked about processing Big Data in Spark, but we know spark doesn’t store any data like other file systems. So, to process data in Spark, we must read data from different data sources, clean or process the data, and again store this data in one of the target data sources. Data sources can be files, APIs, databases, or streams.
Database management systems have been present for a decade. Many applications generate huge amounts of data and store data in database management systems. And a lot of times, we need to connect spark to the database and process that data.
In this blog, we are going to discuss how to use spark to read from and write to databases in parallel. Our focus will be on reading/writing data from/to the database using different methods, which will help us read/write TeraBytes of data in an efficient manner.
Reading / Writing data from/to Database using Spark:
To read data or write data from/to the database, we will need to perform a few basic steps regardless of any programming language or framework we are using. What follows is an overview of the steps to read data from databases.
Step 1: Register Driver or Use Connector
Get the respective driver of your database and register the driver, or use the connector to connect to the database.
Step 2: Make a connection
Next, the driver or connector makes a connection to the database.
Step 3: Run query statement
Using the connection created in the previous step, execute the query, which will return the result.
Step 4: Process result
For the result, we got in the previous step, process it as per your requirement.
This dataset contains details of COVID patients across all states. It has different information such as State, Confirmed, Recovered, Deceased, Other, Tested, and Date.
You can load this dataset in any of the databases you work with and can try out the entire discussion practically.
The following image shows ten records of the entire dataset.
Spark provides an API to read data from a database and is very simple to use. First of all, we will need to create a Spark session. Then add the driver to Spark. It can be added through the program itself, or we can add it using shell also.
The first line of code imports the SparkSession class. This is the entry point to programming Spark with the Dataset and DataFrame API
From the fifth to the ninth line of the above code, we are creating a spark session on a local system with four cores, which will be used for interaction with our spark application. We specify the name for our application using appName(), which in our case, is ‘Databases.’ This app name will be shown on Webb UI for our cluster. Next, we can specify any configurations for the spark application using config(). In our case, we have specified the configuration of the driver for the Postgres database, which will be used to create a connection with the Postgres database. You can specify the driver of any of the available databases.
To connect to the database, we must have a hostname, port, database name, username, and password with us. Those details are in 10 through 16 lines of the above code.
Refer to the code lines from 19 to 28 in the above snippet. Up until now, we have had our Spark session and all the information that we need to connect to the database. Using the Spark Read API, we read the data from the database. This will create a connection to the Postgres database from one of the cores that we have allocated for the Spark application. And using this connection, it will read the data into the table_data_df dataframe. Even if we have multiple cores for our application, it will still create only one connection from one of the cores. The rest of the cores will not be utilized. While we will discuss how to utilize all cores, our first focus is here.
Refer to the code lines from 29 to 38 in the above snippet. We have the data now, so let’s try to write it to the database. Using the Spark Write API, we will write data to the database. This will also create only one connection to the database from one of the cores that we have allocated for the Spark application. Even if we have more cores for the application, it still uses only one core with the above code.
Output of Program:
/usr/bin/python3.8/home/aniketrajput/aniket_work/Spark/main.pyWARNING: An illegal reflective access operation has occurredWARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.PlatformWARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operationsWARNING: All illegal access operations will be denied in a future release:: loading settings :: url =jar:file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xmlIvy Default Cache set to: /home/aniketrajput/.ivy2/cacheThe jars for the packages stored in: /home/aniketrajput/.ivy2/jarsorg.postgresql#postgresql added asadependency:: resolving dependencies :: org.apache.spark#spark-submit-parent-83662a49-0573-46c3-be8e-0a280f96c7d8;1.0confs: [default] found org.postgresql#postgresql;42.2.8in central:: resolution report :: resolve 113ms :: artifacts dl 3ms :: modules inuse: org.postgresql#postgresql;42.2.8 from central in [default]---------------------------------------------------------------------|| modules || artifacts || conf | number| search|dwnlded|evicted|| number|dwnlded|---------------------------------------------------------------------|default|1|0|0|0||1|0|---------------------------------------------------------------------:: retrieving :: org.apache.spark#spark-submit-parent-83662a49-0573-46c3-be8e-0a280f96c7d8confs: [default]0 artifacts copied, 1 already retrieved (0kB/5ms)22/04/2211:55:33WARNNativeCodeLoader: Unable to load native-hadoop library for your platform...usingbuiltin-java classes where applicableUsing Spark's default log4j profile: org/apache/spark/log4j-defaults.propertiesSetting default log level to "WARN".To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).+-------------+-----------------+---------+---------+--------+-----+------+----------+| state| district|confirmed|recovered|deceased|other|tested| date|+-------------+-----------------+---------+---------+--------+-----+------+----------+|Uttar Pradesh| Varanasi|23512|23010|456|0|595510|2021-02-24|| Uttarakhand| Almora|3259|3081|25|127|84443|2021-02-24|| Uttarakhand| Bageshwar|1534|1488|17|26|55626|2021-02-24|| Uttarakhand| Chamoli|3486|3373|15|88|90390|2021-02-24|| Uttarakhand| Champawat|1819|1790|9|7|95068|2021-02-24|| Uttarakhand| Dehradun|29619|28152|962|439|401496|2021-02-24|| Uttarakhand| Haridwar|14137|13697|158|175|369542|2021-02-24|| Uttarakhand| Nainital|12636|12254|237|79|204422|2021-02-24|| Uttarakhand| Pauri Garhwal|5145|5033|60|24|138878|2021-02-24|| Uttarakhand| Pithoragarh|3361|3291|47|11|72686|2021-02-24|| Uttarakhand| Rudraprayag|2270|2251|10|7|52378|2021-02-24|| Uttarakhand| Tehri Garhwal|4227|4026|16|170|105111|2021-02-24|| Uttarakhand|Udham Singh Nagar|11538|11267|117|123|337292|2021-02-24|| Uttarakhand| Uttarkashi|3789|3645|17|118|120026|2021-02-24|| West Bengal| Alipurduar|7705|7616|86|0|null|2021-02-24|| West Bengal| Bankura|11940|11788|92|0|null|2021-02-24|| West Bengal| Birbhum|10035|9876|89|0|null|2021-02-24|| West Bengal| Cooch Behar|11835|11756|72|0|null|2021-02-24|| West Bengal| Dakshin Dinajpur|8179|8099|74|0|null|2021-02-24|| West Bengal| Darjeeling|18423|18155|203|0|null|2021-02-24|+-------------+-----------------+---------+---------+--------+-----+------+----------+only showing top 20 rowsProcess finished with exit code 0
As promised in the last section, we will discuss how we can optimize for resource utilization. In the last section, we had only one connection, utilizing very limited resources and causing resources to be idle or unused. To get over this, the Spark Read and Write API has a way by providing a few extra attributes. And those are partitionColumn, lowerBound, upperBound. These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. For each partition, there will be an individual core with its own connection performing the reads or writes. Thus, making the database operation in parallel.
This is an efficient way of reading and writing data from databases in spark rather than just doing it with one partition.
Partitions are decided by the Spark API in the following way.
Let’s consider an example where:
lowerBound: 0
upperBound: 1000
numPartitions: 10
Stride is equal to 100, and partitions correspond to the following queries:
SELECT * FROM table WHERE partitionColumn BETWEEN 0 AND 100
SELECT * FROM table WHERE partitionColumn BETWEEN 100 AND 200
…
…
SELECT * FROM table WHERE partitionColumn > 9000
BETWEEN here is exclusive on the upper bound.
Now we have data in multiple partitions. Each executor can have one or more partitions based on cluster configuration. Suppose we have 10 cores and 10 partitions. One partition of data can be fetched from one executor using one core. So, 10 partitions of data can be fetched from 10 executors. Each of these executors will create the connection to the database and will read the data.
Note– lowerbound and upperbound does not filter the data. It just helps spark to decide the stride of data.
partitionColumn must be a numeric, date, or timestamp column from the table
Also, there are some attributes that can be used during the write operation to optimize the write operation. One of the attributes is “batchsize”. The JDBC batch size, which determines how many rows to insert per round trip. This can help the performance of JDBC drivers. This option applies only to writing. One more attribute called “truncate” can be helpful to optimize the write operation. This is a JDBC writer-related option. When SaveMode.Overwrite is enabled, it causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient and prevents the table metadata (e.g., indices) from being removed.
Output of Program:
/usr/bin/python3.8/home/aniketrajput/aniket_work/Spark/main.pyWARNING: An illegal reflective access operation has occurredWARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.PlatformWARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operationsWARNING: All illegal access operations will be denied in a future release:: loading settings :: url =jar:file:/home/aniketrajput/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xmlIvy Default Cache set to: /home/aniketrajput/.ivy2/cacheThe jars for the packages stored in: /home/aniketrajput/.ivy2/jarsorg.postgresql#postgresql added asadependency:: resolving dependencies :: org.apache.spark#spark-submit-parent-8047b5cc-11e8-4efb-8a38-70edab0d5404;1.0confs: [default] found org.postgresql#postgresql;42.2.8in central:: resolution report :: resolve 104ms :: artifacts dl 3ms :: modules inuse: org.postgresql#postgresql;42.2.8 from central in [default]---------------------------------------------------------------------|| modules || artifacts || conf | number| search|dwnlded|evicted|| number|dwnlded|---------------------------------------------------------------------|default|1|0|0|0||1|0|---------------------------------------------------------------------:: retrieving :: org.apache.spark#spark-submit-parent-8047b5cc-11e8-4efb-8a38-70edab0d5404confs: [default]0 artifacts copied, 1 already retrieved (0kB/4ms)22/04/2212:20:32WARNNativeCodeLoader: Unable to load native-hadoop library for your platform...usingbuiltin-java classes where applicableUsing Spark's default log4j profile: org/apache/spark/log4j-defaults.propertiesSetting default log level to "WARN".To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).+-------------+-----------------+---------+---------+--------+-----+------+----------+| state| district|confirmed|recovered|deceased|other|tested| date|+-------------+-----------------+---------+---------+--------+-----+------+----------+|Uttar Pradesh| Varanasi|23512|23010|456|0|595510|2021-02-24|| Uttarakhand| Almora|3259|3081|25|127|84443|2021-02-24|| Uttarakhand| Bageshwar|1534|1488|17|26|55626|2021-02-24|| Uttarakhand| Chamoli|3486|3373|15|88|90390|2021-02-24|| Uttarakhand| Champawat|1819|1790|9|7|95068|2021-02-24|| Uttarakhand| Dehradun|29619|28152|962|439|401496|2021-02-24|| Uttarakhand| Haridwar|14137|13697|158|175|369542|2021-02-24|| Uttarakhand| Nainital|12636|12254|237|79|204422|2021-02-24|| Uttarakhand| Pauri Garhwal|5145|5033|60|24|138878|2021-02-24|| Uttarakhand| Pithoragarh|3361|3291|47|11|72686|2021-02-24|| Uttarakhand| Rudraprayag|2270|2251|10|7|52378|2021-02-24|| Uttarakhand| Tehri Garhwal|4227|4026|16|170|105111|2021-02-24|| Uttarakhand|Udham Singh Nagar|11538|11267|117|123|337292|2021-02-24|| Uttarakhand| Uttarkashi|3789|3645|17|118|120026|2021-02-24|| West Bengal| Alipurduar|7705|7616|86|0|null|2021-02-24|| West Bengal| Bankura|11940|11788|92|0|null|2021-02-24|| West Bengal| Birbhum|10035|9876|89|0|null|2021-02-24|| West Bengal| Cooch Behar|11835|11756|72|0|null|2021-02-24|| West Bengal| Dakshin Dinajpur|8179|8099|74|0|null|2021-02-24|| West Bengal| Darjeeling|18423|18155|203|0|null|2021-02-24|+-------------+-----------------+---------+---------+--------+-----+------+----------+only showing top 20 rowsProcess finished with exit code 0
We have seen how to read and write data in Spark. Spark is not the only way to connect with databases, right? There are multiple ways we can access databases and try to achieve parallel read-writes. We will discuss this in further sections. We will mainly focus on reading and writing it from python.
Single Thread Python Program:
import tracebackimport psycopg2import pandas as pdclass PostgresDbClient: def __init__(self, postgres_hostname, postgres_jdbcport, postgres_dbname, username, password): self.db_host = postgres_hostname self.db_port = postgres_jdbcport self.db_name = postgres_dbname self.db_user = username self.db_pass = password def create_conn(self): conn = None try: print('Connecting to the Postgres database...') conn = psycopg2.connect("dbname={} user={} host={} password={} port={}".format(self.db_name, self.db_user, self.db_host, self.db_pass, self.db_port)) print('Successfully connected to the Postgres database...') except Exception as e: print("Cannot connect to Postgres.") print(f'Error: {str(e)}nTrace: {traceback.format_exc()}') return conn def read(self, query): try: conn = self.create_conn() cursor = conn.cursor() print(f"Reading data !!!") cursor.execute(query) data = cursor.fetchall() print(f"Read Data !!!") cursor.close() conn.close() return data except Exception as e: print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')if __name__ == "__main__": hostname = "localhost" jdbc_port = 5432 dbname = "postgres" username = "postgres" password = "pass@123" table_name = "covid_data" query = f"select * from {table_name}" db_client = PostgresDbClient(postgres_hostname=hostname, postgres_jdbcport=jdbc_port, postgres_dbname=dbname, username=username,password=password) data = pd.DataFrame(db_client.read(query)) print(data)
To integrate Postgres with Python, we have different libraries or adopters that we can use. But Psycopg is the widely used adopter. First off all, you will need to install the Psycopg2 library. Psycopg2 is a slightly updated version of the Psycopg adapter. You install it using pip or any way you are comfortable with.
To connect with the Postgres database, we need hostname, port, database name, username, and password. We are storing all these details as attributes in class. The create connection method will form a connection with the Postgres database using the connect() method of psycopg2 module. This method will return a connection object. In the read method, we call this connection method and get a connection object. Using this connection object, we create a cursor. This cursor is bound to have a connection with the database for its lifetime and execute all the commands or queries on the database. Using this query object, we execute a read query on the database. Then the data returned by the executing read query can be fetched using the fetchall() method. Then we close the connection.
To run the program, we have specified details of database and query. Next, we create an object of PostgresDbClient and call the read method from class PostgresDbClient. This read method will return as data and we are converting this data into relational format using pandas.
This implementation is very straightforward: this program creates one process in our system and fetches all the data using system resources, CPU, memory, etc. The drawback of this approach is that suppose this program uses 30 percent CPU and memory resources out of 100%, then the remaining 70% of resources are idle. We can maximize this usage by other means like multithreading or multiprocessing.
Output of Program:
Connecting to the Postgres database...Successfully connected to the Postgres database...Reading data !!!Read Data !!!012345670 Andaman and Nicobar Islands Unknown 331100 None 2020-04-261 Andhra Pradesh Anantapur 531440 None 2020-04-262 Andhra Pradesh Chittoor 731300 None 2020-04-263 Andhra Pradesh East Godavari 391200 None 2020-04-264 Andhra Pradesh Guntur 2142980 None 2020-04-26.. ......... .. .. .. ......95 Bihar Araria 1000 None 2020-04-3096 Bihar Arwal 4000 None 2020-04-3097 Bihar Aurangabad 8000 None 2020-04-3098 Bihar Banka 3000 None 2020-04-3099 Bihar Begusarai 11500 None 2020-04-30[100 rows x 8 columns]Process finished with exit code 0
Multi Thread python program:
In the previous section, we discussed the drawback of a single process and single-thread implementation. Let’s get started with how to maximize resource usage. Before getting into multithreading, let’s understand a few basic but important concepts.
What is a process?
When you execute any program, the operating system loads it in memory and then starts executing the program. This instance of the program being executed is called a process. Computing and memory resources are associated with each process separately.
What is a thread?
A thread is a sequential flow of execution. A process is also a thread. Usually, the process is called a main thread. Unlike a process, the same computing and memory resources can be shared with multiple threads.
What is multithreading?
This is when a process has multiple threads, along with the main thread, and these threads run independently but concurrently using the same computing and memory resources associated with the process. Such a program is called a multithreaded program or process. Multithreading uses resources very efficiently, which results in maximizing performance.
What is multiprocessing?
When multiple processes run independently, with separate resources associated with each process, it is called multiprocessing. Multiprocessing is achieved with multiple processors running separate processes on each processor.
Let’s get back to our program. Here you can see we have a connection and read method. These two methods are exactly the same as from the previous section. Here, we have one new function, which is get_thread(). Be careful, as a method belongs to the class, and afunction, it is not part of this class. So, this get_thred() function is global and acts as a wrapper function for calling the read method from the class PostgresDbClient. This is because we can’t create threads using class methods. Don’t get confused if you don’t understand it, as it is just how we write the code.
To run the program, we have specified the Postgres database details and queries. In the previous approach, we fetched all the data from the table with one thread only. In this approach, the plan is to fetch one day of data using one thread so that we can maximize resource utilization. Here, each query reads one day’s worth of data from the table using one thread. Having 5 queries will fetch 5 days of data, and 5 threads will be running concurrently.
To create a thread in Python, we will need to use the Thread() method from the threading library. We need to pass the function that we want to run and arguments of that function. The thread() object will create a new thread and return its object. The thread has been created but has not yet started. To start this thread, we will need to use the start() method. In our program, we are starting 5 threads. If you try executing this entire program multiple times, you will end up with different results. Some data will fetch prior, and some will fetch later. And at the time of the next execution, this order will be different again. This is because resource handling is done by the operating system. Depending on what the OS thinks about which thread to give what resources, the output is generated. If you want to know how this is done, you will need to go deep into operating systems concepts.
In our use case, we are just printing the data to the console. To store the data, there are multiple ways. One simple way is to define the global variable and store the result in it, but we will need to achieve synchronization as multiple threads might access the global variable, which can lead to race conditions. Another way is to extend the thread class to your custom class, and you can define a class variable—and you can use this variable to save the data. Again, here, you will need to make sure you are achieving synchronization.
So, whenever you want to store the data in a variable by any available method, you will need to achieve synchronization. So, synchronization will lead to the sequential execution of threads. And this sequential processing is not what we are looking for. To avoid synchronization, we can directly write the data to the target—so that when the thread reads the data, the same thread will write data again back to the target database. This way, we can avoid synchronization and store the data in the database for future use. This function can look as below, where db_client.write(data) is a function that writes the data to a database.
def get_thread(thread_id, db_client, query):
print(f”Starting thread id {thread_id}”)
data = pd.DataFrame(db_client.read(query))
print(f”Thread {thread_id} data “, data, sep=”n”)
db_client.write(data)
print(f”Stopping thread id {thread_id}”)
Python Program:
import threadingimport tracebackimport psycopg2import pandas as pdclass PostgresDbClient: def __init__(self, postgres_hostname, postgres_jdbcport, postgres_dbname, username, password): self.db_host = postgres_hostname self.db_port = postgres_jdbcport self.db_name = postgres_dbname self.db_user = username self.db_pass = password def create_conn(self): conn = None try: print('Connecting to the Postgres database...') conn = psycopg2.connect("dbname={} user={} host={} password={} port={}".format(self.db_name, self.db_user, self.db_host, self.db_pass, self.db_port)) print('Successfully connected to the Postgres database...') except Exception as e: print("Cannot connect to Postgres.") print(f'Error: {str(e)}nTrace: {traceback.format_exc()}') return conn def read(self, query): try: conn = self.create_conn() cursor = conn.cursor() print(f"Reading data !!!") cursor.execute(query) data = cursor.fetchall() print(f"Read Data !!!") cursor.close() conn.close() return data except Exception as e: print(f'Error: {str(e)}nTrace: {traceback.format_exc()}')def get_thread(thread_id, db_client, query): print(f"Starting thread id {thread_id}") data = pd.DataFrame(db_client.read(query)) print(f"Thread {thread_id} data ", data, sep="n") print(f"Stopping thread id {thread_id}")if __name__ == "__main__": hostname = "localhost" jdbc_port = 5432 dbname = "postgres" username = "postgres" password = "pass@123" table_name = "covid_data" query = f"select * from {table_name}" partition_column = 'date' lower_bound = '2020-04-26' upper_bound = '2020-04-30' num_partitions = 5 query1 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'" query2 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'" query3 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'" query4 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'" query5 = f"select * from {table_name} where {partition_column} >= '{lower_bound}' and {partition_column} < '{upper_bound}'" db_client = PostgresDbClient(postgres_hostname=hostname, postgres_jdbcport=jdbc_port, postgres_dbname=dbname, username=username,password=password) x1 = threading.Thread(target=get_thread, args=(1, db_client, query1)) x1.start() x2 = threading.Thread(target=get_thread, args=(2, db_client, query2)) x2.start() x3 = threading.Thread(target=get_thread, args=(3, db_client, query3)) x3.start() x4 = threading.Thread(target=get_thread, args=(4, db_client, query4)) x4.start() x5 = threading.Thread(target=get_thread, args=(5, db_client, query5)) x5.start()
Note that in this blog, we have used a password as a hardcoded string, which is definitely not the way to define passwords. We should use secrets, .env files, etc., as input for passwords. We do not hardcode passwords in the production environment.
Conclusion
After going through the above blog, you might have gotten more familiar with how to perform read and write operations on databases using spark, python, and multithreading concepts. You also know now what are multi processes and what multithreading is. You are now also able to analyze the best way to carry out read-and-write operations on a database based on your requirements.
In general, if you have a small amount of data, you can use a simple python approach to read and write data. If you have a relatively high amount of data, then you can use a multi-threaded approach or a single-partition Spark approach. If you have a huge amount of data, and where reading millions of records per second is a requirement, then you can use the Spark multi-partition approach. In the end, it’s just mostly personal preference, and using which approach depends on your requirements and availability of resources.
Amazon announced “Amazon Lex” in December 2016 and since then we’ve been using it to build bots for our customers. Lex is effectively the technology used by Alexa, Amazon’s voice-activated virtual assistant which lets people control things with voice commands such as playing music, setting alarm, ordering groceries, etc. It provides deep learning-powered natural-language understanding along with automatic speech recognition. Amazon now provides it as a service that allows developers to take advantage of the same features used by Amazon Alexa. So, now there is no need to spend time in setting up and managing the infrastructure for your bots.
Now, developers just need to design conversations according to their requirements in Lex console. The phrases provided by the developer are used to build the natural language model. After publishing the bot, Lex will process the text or voice conversations and execute the code to send responses.
I’ve put together this quick-start tutorial using which you can start building Lex chat-bots. To understand the terms correctly, let’s consider an e-commerce bot that supports conversations involving the purchase of books.
Lex-Related Terminologies
Bot: It consists of all the components related to a conversation, which includes:
Intent: Intent represents a goal, needed to be achieved by the bot’s user. In our case, our goal is to purchase books.
Utterances: An utterance is a text phrase that invokes intent. If we have more than one intent, we need to provide different utterances for them. Amazon Lex builds a language model based on utterance phrases provided by us, which then invoke the required intent. For our demo example, we need a single intent “OrderBook”. Some sample utterances would be:
I want to order some books
Can you please order a book for me
Slots: Each slot is a piece of data that the user must supply in order to fulfill the intent. For instance, purchasing a book requires bookType and bookName as slots for intent “OrderBook” (I am considering these two factors for making the example simpler, otherwise there are so many other factors based on which one will purchase/select a book.). Slots are an input, a string, date, city, location, boolean, number etc. that are needed to reach the goal of the intent. Each slot has a name, slot type, a prompt, and is it required. The slot types are the valid values a user can respond with, which can be either custom defined or one of the Amazon pre-built types.
Prompt: A prompt is a question that Lex uses to ask the user to supply some correct data (for a slot) that is needed to fulfill an intent e.g. Lex will ask “what type of book you want to buy?” to fill the slot bookType.
Fulfillment: Fulfillment provides the business logic that is executed after getting all required slot values, need to achieve the goal. Amazon Lex supports the use of Lambda functions for fulfillment of business logic and for validations.
Let’s Implement this Bot!
Now that we are aware of the basic terminology used in Amazon Lex, let’s start building our chat-bot.
Creating Lex Bot:
Go to Amazon Lex console, which is available only in US, East (N. Virginia) region and click on create button.
Create a custom bot by providing following information:
Bot Name: PurchaseBook
Output voice: None, this is only a test based application
Set Session Timeout: 5 min
Add Amazon Lex basic role to Bot app: Amazon will create it automatically. Find out more about Lex roles & permissions here.
Click on Create button, which will redirect you to the editor page.
Architecting Bot Conversations
Create Slots: We are creating two slots named bookType and bookName. Slot type values can be chosen from 275 pre-built types provided by Amazon or we can create our own customized slot types.
Create custom slot type for bookType as shown here and consider predefined type named Amazon.Book for bookName.
Create Intent: Our bot requires single custom intent named OrderBook.
Configuring the Intents
Utterances: Provide some utterances to invoke the intent. An utterance can consist only of Unicode characters, spaces, and valid punctuation marks. Valid punctuation marks are periods for abbreviations, underscores, apostrophes, and hyphens. If there is a slot placeholder in your utterance ensure, that it’s in the {slotName} format and has spaces at both ends.
Slots: Map slots with their types and provide prompt questions that need to be asked to get valid value for the slot. Note the sequence, Lex-bot will ask the questions according to priority.
Confirmation prompt: This is optional. If required you can provide a confirmation message e.g. Are you sure you want to purchase book named {bookName}?, where bookName is a slot placeholder.
Fulfillment: Now we have all necessary data gathered from the chatbot, it can just be passed over in lambda function, or the parameters can be returned to the client application that then calls a REST endpoint.
Creating Amazon Lambda Functions
Amazon Lex supports Lambda function to provide code hooks to the bot. These functions can serve multiple purposes such as improving the user interaction with the bot by using prior knowledge, validating the input data that bot received from the user and fulfilling the intent.
Go to AWS Lambda console and choose to Create a Lambda function.
Select blueprint as blank function and click next.
To configure your Lambda function, provide its name, runtime and code needs to be executed when the function is invoked. The code can also be uploaded in a zip folder instead of providing it as inline code. We are considering Nodejs4.3 as runtime.
Click next and choose Create Function.
We can configure our bot to invoke these lambda functions at two places. We need to do this while configuring the intent as shown below:-
where, botCodeHook and fulfillment are name of lambda functions we created.
Lambda initialization and validation
Lambda function provided here i.e. botCodeHook will be invoked on each user input whose intent is understood by Amazon Lex. It will validate the bookName with predefined list of books.
'use strict';exports.handler= (event, context, callback) => {constsessionAttributes= event.sessionAttributes;constslots= event.currentIntent.slots;constbookName= slots.bookName;// predefined list of available booksconstvalidBooks= ['harry potter', 'twilight', 'wings of fire'];// negative check: if valid slot value is not obtained, inform lex that user is expected // respond with a slot value if (bookName &&!(bookName ==="") && validBooks.indexOf(bookName.toLowerCase()) ===-1) {let response = { sessionAttributes: event.sessionAttributes, dialogAction: { type: "ElicitSlot", message: { contentType: "PlainText", content: `We do not have book: ${bookName}, Provide any other book name. For. e.g twilight.` }, intentName: event.currentIntent.name, slots: slots, slotToElicit : "bookName" } }callback(null, response); }// if valid book name is obtained, send command to choose next course of actionlet response = {sessionAttributes: sessionAttributes, dialogAction: { type: "Delegate", slots: event.currentIntent.slots } }callback(null, response);};
Fulfillment code hook
This lambda function is invoked after receiving all slot data required to fulfill the intent.
'use strict';exports.handler= (event, context, callback) => {// when intent get fulfilled, inform lex to complete the statelet response = {sessionAttributes: event.sessionAttributes, dialogAction: { type: "Close", fulfillmentState: "Fulfilled", message: { contentType: "PlainText", content: "Thanks for purchasing book." } } }callback(null, response);};
Error Handling: We can customize the error message for our bot users. Click on error handling and replace default values with the required ones. Since the number of retries given is two, we can also provide different message for every retry.
Your Bot is Now Ready To Chat
Click on Build to build the chat-bot. Congratulations! Your Lex chat-bot is ready to test. We can test it in the overlay which appears in the Amazon Lex console.
Sample conversations:
I hope you have understood the basic terminologies of Amazon Lex along with how to create a simple chat-bot using serverless (Amazon Lambda). This is a really powerful platform to build mature and intelligent chatbots.
We live in a world where speed is important. With cutting-edge technology coming into the telecommunications and software industry, we expect to get things done quickly. We want to develop applications that are fast, can process high volumes of data and requests, and keep the end-user happy.
This is great, but of course, it’s easier said than done. That’s why concurrency and parallelism are important in application development. We must process data as fast as possible. Every programming language has its own way of dealing with this, and we will see how Golang does it.
Now, many of us choose Golang because of its concurrency, and the inclusion of goroutines and channels has massively impacted the concurrency.
This blog will cover channels and how they work internally, as well as their key components. To benefit the most from this content, it will help to know a little about goroutines and channels as this blog gets into the internals of channels. If you don’t know anything, then don’t worry, we’ll be starting off with an introduction to channels, and then we’ll see how they operate.
What are channels?
Normally, when we talk about channels, we think of the ones in applications like RabbitMQ, Redis, AWS SQS, and so on. Anyone with no or only a small amount of Golang knowledge would think like this. But Channels in Golang are different from a work queue system. In the work queue system like above, there are TCP connections to the channels, but in Go, the channel is a data structure or even a design pattern, which we’ll explain later. So, what are the channels in Golang exactly?
Channels are the medium through which goroutines can communicate with each other. In simple terms, a channel is a pipe that allows a goroutine to either put or read the data.
What are goroutines?
So, a channel is a communication medium for goroutines. Now, let’s give a quick overview of what goroutines are. If you know this already, feel free to skip this section.
Technically, a goroutine is a function that executes independently in a concurrent fashion. In simple terms, it’s a lightweight thread that’s managed by go runtime.
You can create a goroutine by using a Go keyword before a function call.
Let’s say there’s a function called PrintHello, like this:
func PrintHello() { fmt.Println("Hello")}
You can make this into a goroutine simply by calling this function, as below:
//create goroutine go PrintHello()
Now, let’s head back to channels, as that’s the important topic of this blog.
How to define a channel?
Let’s see a syntax that will declare a channel. We can do so by using the chan keyword provided by Go.
You must specify the data type as the channel can handle data of the same data type.
//create channelvar c chan int
Very simple! But this is not useful since it would create a Nil channel. Let’s print it and see.
fmt.Println(c)fmt.Printf("Type of channel: %T", c)<nil>Type of channel: chan int
As you can see, we have just declared the channel, but we can’t transport data through it. So, to create a useful channel, we must use the make function.
//create channelc :=make(chan int)fmt.Printf("Type of `c`: %T\n", c)fmt.Printf("Value of `c` is %v\n", c)Type of`c`: chan intValue of`c` is 0xc000022120
As you may notice here, the value of c is a memory address. Keep in mind that channels are nothing but pointers. That’s why we can pass them to goroutines, and we can easily put the data or read the data. Now, let’s quickly see how to read and write the data to a channel.
Read and write operations on a channel:
Go provides an easy way to read and write data to a channel by using the left arrow.
c <-10
This is a simple syntax to put the value in our created channel. The same syntax is used to define the “send” only type of channels.
And to get/read the data from channel, we do this:
<-c
This is also the way to define the “receive” only type of channels.
Let’s see a simple program to use the channels.
func printChannelData(c chan int) { fmt.Println("Data in channel is: ", <-c)}
This simple function just prints whatever data is in the channel. Now, let’s see the main function that will push the data into the channel.
func main() { fmt.Println("Main started...")//create channel of intc :=make(chan int)// call to goroutine go printChannelData(c)// put the data in channel c <-10 fmt.Println("Main ended...")}
This yields to the output:
Main started...Data in channel is: 10Main ended...
Let’s talk about the execution of the program.
1. We declared a printChannelData function, which accepts a channel c of data type integer. In this function, we are just reading data from channel c and printing it.
2. Now, this method will first print “main started…” to the console.
3. Then, we have created the channel c of data type integer using the make keyword.
4. We now pass the channel to the function printChannelData, and as we saw earlier, it’s a goroutine.
5. At this point, there are two goroutines. One is the main goroutine, and the other is what we have declared.
6. Now, we are putting 10 as data in the channel, and at this point, our main goroutine is blocked and waiting for some other goroutine to read the data. The reader, in this case, is the printChannelData goroutine, which was previously blocked because there was no data in the channel. Now that we’ve pushed the data onto the channel, the Go scheduler (more on this later in the blog) now schedules printChannelData goroutine, and it will read and print the value from the channel.
7. After that, the main goroutine again activates and prints “main ended…” and the program stops.
So, what’s happening here? Basically, blocking and unblocking operations are done over goroutines by the Go scheduler. Unless there’s data in a channel you can’t read from it, which is why our printChannelData goroutine was blocked in the first place, the written data has to be read first to resume further operations. This happened in case of our main goroutine.
With this, let’s see how channels operate internally.
Internals of channels:
Until now, we have seen how to define a goroutine, how to declare a channel, and how to read and write data through a channel with a very simple example. Now, let’s look at how Go handles this blocking and unblocking nature internally. But before that, let’s quickly see the types of channels.
Types of channels:
There are two basic types of channels: buffered channels and unbuffered channels. The above example illustrates the behaviour of unbuffered channels. Let’s quickly see the definition of these:
Unbuffered channel: This is what we have seen above. A channel that can hold a single piece of data, which has to be consumed before pushing other data. That’s why our main goroutine got blocked when we added data into the channel.
Buffered channel: In a buffered channel, we specify the data capacity of a channel. The syntax is very simple. c := make(chan int,10) the second argument in the make function is the capacity of a channel. So, we can put up to ten elements in a channel. When the capacity is full, then that channel would get blocked so that the receiver goroutine can start consuming it.
Properties of a channel:
A channel does lot of things internally, and it holds some of the properties below:
Channels are goroutine-safe.
Channels can store and pass values between goroutines.
Channels provide FIFO semantics.
Channels cause goroutines to block and unblock, which we just learned about.
As we see the internals of a channel, you’ll learn about the first three properties.
Channel Structure:
As we learned in the definition, a channel is data structure. Now, looking at the properties above, we want a mechanism that handles goroutines in a synchronized manner and with a FIFO semantics. This can be solved using a queue with a lock. So, the channel internally behaves in that fashion. It has a circular queue, a lock, and some other fields.
When we do this c := make(chan int,10) Go creates a channel using hchan struct, which has the following fields:
typehchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters// lock protects all fields in hchan, as well as several// fields in sudogs blocked on this channel.//// Do not change another G's status while holding this lock// (in particular, do not ready a G), as this can deadlock// with stack shrinking. lock mutex}
This is what a channel is internally. Let’s see one-by-one what these fields are.
qcount holds the count of items/data in the queue.
dataqsize is the size of a circular queue. This is used in case of buffered channels and is the second parameter used in the make function.
elemsize is the size of a channel with respect to a single element.
buf is the actual circular queue where the data is stored when we use buffered channels.
closed indicates whether the channel is closed. The syntax to close the channel is close(<channel_name>). The default value of this field is 0, which is set when the channel gets created, and it’s set to 1 when the channel is closed.
sendx and recvx indicates the current index of a buffer or circular queue. As we add the data into the buffered channel, sendx increases, and as we start receiving, recvx increases.
recvq and sendq are the waiting queue for the blocked goroutines that are trying to either read data from or write data to the channel.
lock is basically a mutex to lock the channel for each read or write operation as we don’t want goroutines to go into deadlock state.
These are the important fields of a hchan struct, which comes into the picture when we create a channel. This hchan struct basically resides on a heap and the make function gives us a pointer to that location. There’s another struct known as sudog, which also comes into the picture, but we’ll learn more about that later. Now, let’s see what happens when we write and read the data.
Read and write operations on a channel:
We are considering buffered channels in this. When one goroutine, let’s say G1, wants to write the data onto a channel, it does following:
Acquire the lock: As we saw before, if we want to modify the channel, or hchan struct, we must acquire a lock. So, G1 in this case, will acquire a lock before writing the data.
Perform enqueue operation: We now know that buf is actually a circular queue that holds the data. But before enqueuing the data, goroutine does a memory copy operation on the data and puts the copy into the buffer slot. We will see an example of this.
Release the lock: After performing an enqueue operation, it just releases the lock and goes on performing further executions.
When goroutine, let’s say G2, reads the above data, it performs the same operation, except instead of enqueue, it performs dequeue while also performing the memory copy operation. This states that in channels there’s no shared memory, so the goroutines only share the hchan struct, which is protected by mutex. Others are just copies of memory.
This satisfies the famous Golang quote: “Do not communicate by sharing memory instead share memory by communicating.”
Now, let’s look at a small example of this memory copy operation.
func printData(c chan *int) { time.Sleep(time.Second *3)data :=<-c fmt.Println("Data in channel is: ", *data)}func main() { fmt.Println("Main started...")var a =10b :=&a//create channelc :=make(chan *int) go printData(c) fmt.Println("Value of b before putting into channel", *b) c <- b a =20 fmt.Println("Updated value of a:", a) fmt.Println("Updated value of b:", *b) time.Sleep(time.Second *2) fmt.Println("Main ended...")}
And the output of this is:
Main started...Value of b before putting into channel 10Updated value ofa: 20Updated value ofb: 20Data in channel is: 10Main ended...
So, as you can see, we have added the value of variable a into the channel, and we modify that value before the channel can access it. However, the value in the channel stays the same, i.e., 10. Because here, the main goroutine has performed a memory copy operation before putting the value onto the channel. So, even if you change the value later, the value in the channel does not change.
Write in case of buffer overflow:
We’ve seen that the Go routine can add data up to the buffer capacity, but what happens when the buffer capacity is reached? When the buffer has no more space and a goroutine, let’s say G1, wants to write the data, the go scheduler blocks/pauses G1, which will wait until a receive happens from another goroutine, say G2. Now, since we are talking about buffer channels, when G2 consumes all the data, the Go scheduler makes G1 active again and G2 pauses. Remember this scenario, as we’ll use G1 and G2 frequently here onwards.
We know that goroutine works in a pause and resume fashion, but who controls it? As you might have guessed, the Go scheduler does the magic here. There are few things that the Go scheduler does and those are very important considering the goroutines and channels.
Go Runtime Scheduler
You may already know this, but goroutines are user-space threads. Now, the OS can schedule and manage threads, but it’s overhead to the OS, considering the properties that threads carry.
That’s why the Go scheduler handles the goroutines, and it basically multiplexes the goroutines on the OS threads. Let’s see how.
There are scheduling models, like 1:1, N:1, etc., but the Go scheduler uses the M:N scheduling model.
Basically, this means that there are a number of goroutines and OS threads, and the scheduler basically schedules the M goroutines on N OS threads. For example:
OS Thread 1:
OS Thread 2:
As you can see, there are two OS threads, and the scheduler is running six goroutines by swapping them as needed. The Go scheduler has three structures as below:
M: M represents the OS thread, which is entirely managed by the OS, and it’s similar to POSIX thread. M stands for machine.
G: G represents the goroutine. Now, a goroutine is a resizable stack that also includes information about scheduling, any channel it’s blocked on, etc.
P: P is a context for scheduling. This is like a single thread that runs the Go code to multiplex M goroutines to N OS threads. This is important part, and that’s why P stands for processor.
Diagrammatically, we can represent the scheduler as:
The P processor basically holds the queue of runnable goroutines—or simply run queues.
So, anytime the goroutine (G) wants to run it on a OS thread (M), that OS thread first gets hold of P i.e., the context. Now, this behaviour occurs when a goroutine needs to be paused and some other goroutines must run. One such case is a buffered channel. When the buffer is full, we pause the sender goroutine and activate the receiver goroutine.
Imagine the above scenario: G1 is a sender that tries to send a full buffered channel, and G2 is a receiver goroutine. Now, when G1 wants to send a full channel, it calls into the runtime Go scheduler and signals it as gopark. So, now scheduler, or M, changes the state of G1 from running to waiting, and it will schedule another goroutine from the run queue, say G2.
This transition diagram might help you better understand:
As you can see, after the gopark call, G1 is in a waiting state and G2 is running. We haven’t paused the OS thread (M); instead, we’ve blocked the goroutine and scheduled another one. So, we are using maximum throughput of an OS thread. The context switching of goroutine is handled by the scheduler (P), and because of this, it adds complexity to the scheduler.
This is great. But how do we resume G1 now because it still wants to add the data/task on a channel, right? So, before G1 sends the gopark signal, it actually sets a state of itself on a hchan struct, i.e., our channel in the sendq field. Remember the sendq and recvq fields? They’re waiting senders and receivers.
Now, G1 stores the state of itself as a sudog struct. A sudog is simply a goroutine that is waiting on an element. The sudog struct has these elements:
typesudog struct{ g *g isSelect bool next *sudog prev *sudog elem unsafe.Pointer //data element ...}
g is a waiting goroutine, next and prev are the pointers to sudog/goroutine respectively if there’s any next or previous goroutine present, and elem is the actual element it’s waiting on.
So, considering our example, G1 is basically waiting to write the data so it will create a state of itself, which we’ll call sudog as below:
Cool. Now we know, before going into the waiting state, what operations G1 performs. Currently, G2 is in a running state, and it will start consuming the channel data.
As soon as it receives the first data/task, it will check the waiting goroutine in the sendq attribute of an hchan struct, and it will find that G1 is waiting to push data or a task. Now, here is the interesting thing: G2 will copy that data/task to the buffer, and it will call the scheduler, and the scheduler will put G1 from the waiting state to runnable, and it will add G1 to the run queue and return to G2. This call from G2 is known as goready, and it will happen for G1. Impressive, right? Golang behaves like this because when G1 runs, it doesn’t want to hold onto a lock and push the data/task. That extra overhead is handled by G2. That’s why the sudog has the data/task and the details for the waiting goroutine. So, the state of G1 is like this:
As you can see, G1 is placed on a run queue. Now we know what’s done by the goroutine and the go scheduler in case of buffered channels. In this example, the sender gorountine came first, but what if the receiver goroutine comes first? What if there’s no data in the channel and the receiver goroutine is executed first? The receiver goroutine (G2) will create a sudog in recvq on the hchan struct. Things are a little twisted when G1 goroutine activates. It will now see whether there are any goroutines waiting in the recvq, and if there is, it will copy the task to the waiting goroutine’s (G2) memory location, i.e., the elem attribute of the sudog.
This is incredible! Instead of writing to the buffer, it will write the task/data to the waiting goroutine’s space simply to avoid G2’s overhead when it activates. We know that each goroutine has its own resizable stack, and they never use each other’s space except in case of channels. Until now, we have seen how the send and receive happens in a buffered channel.
This may have been confusing, so let me give you the summary of the send operation.
Summary of a send operation for buffered channels:
Acquire lock on the entire channel or the hchan struct.
Check if there’s any sudog or a waiting goroutine in the recvq. If so, then put the element directly into its stack. We saw this just now with G1 writing to G2’s stack.
If recvq is empty, then check whether the buffer has space. If yes, then do a memory copy of the data.
If the buffer is full, then create a sudog under sendq of the hchan struct, which will have details, like a currently executing goroutine and the data to put on the channel.
We have seen all the above steps in detail, but concentrate on the last point.
It’s kind of similar to an unbuffered channel. We know that for unbuffered channels, every read must have a write operation first and vice versa.
So, keep in mind that an unbuffered channel always works like a direct send. So, a summary of a read and write operation in unbuffered channel could be:
Sender first: At this point, there’s no receiver, so the sender will create a sudog of itself and the receiver will receive the value from the sudog.
Receiver first: The receiver will create a sudog in recvq, and the sender will directly put the data in the receiver’s stack.
With this, we have covered the basics of channels. We’ve learned how read and write operates in a buffered and unbuffered channel, and we talked about the Go runtime scheduler.
Conclusion:
Channels is a very interesting Golang topic. They seem to be difficult to understand, but when you learn the mechanism, they’re very powerful and help you to achieve concurrency in applications. Hopefully, this blog helps your understanding of the fundamental concepts and the operations of channels.
Bots are the new black! The entire tech industry seems to be buzzing with “bot” fever. Me and my co-founders often see a “bot” company and discuss its business model. Chirag Jog has always been enthusiastic about the bot wave while I have been mostly pessimistic, especially about B2C bots. We should consider that there are many types of “bots” —chat bots, voice bots, AI assistants, robotic process automation(RPA) bots, conversational agents within apps or websites, etc.
Over the last year, we have been building some interesting chat and voice based bots which has given me some interesting insights. I hope to lay down my thoughts on bots in some detail and with some structure.
What are bots?
Bots are software programs which automate tasks that humans would otherwise do themselves. Bots are developed using machine learning software and are expected to aggregate data to make the interface more intelligent and intuitive. There have always been simple rule-based bots which provide a very specific service with low utility. In the last couple of years, we are seeing emergence of intelligent bots that can serve more complex use-cases.
Why now?
Machine learning, NLP and AI technologies have matured enabling practical applications where bots can actually do intelligent work >75% of the times. Has general AI been solved? No. But is it good enough to do the simple things well and give hope for more complex things? Yes.
Secondly, there are billions of DAUs on Whatsapp & Facebook Messenger. There are tens of millions of users on enterprise messaging platforms like Slack, Skype & Microsoft Teams. Startups and enterprises want to use this distribution channel and will continue to experiment aggressively to find relevant use-cases. Millennials are very comfortable using the chat and voice interfaces for a broader variety of use-cases since they used chat services as soon as they came online. As millennials become a growing part of the workforce, the adoption of bots may increase.
Thirdly, software is becoming more prevalent and more complex. Data is exploding and making sense of this data is getting harder and requiring more skill. Companies are experimenting with bots to provide an “easy to consume” interface to casual users. So non-experts can use the bot interface while experts can use the mobile or web application for the complex workflows. This is mostly true for B2B & enterprise. A good example is how Slack has become the system of engagement for many companies (including at @velotiotech). We require all the software we use (Gitlab, Asana, Jira, Google Docs, Zoho, Marketo, Zendesk, etc.) to provide notifications into Slack. Over time, we expect to start querying the respective Slack bots for information. Only domain experts will log into the actual SaaS applications.
Types of Bots
B2C Chat-Bots
Consumer focused bots use popular messaging and social platforms like Facebook, Telegram, Kik, WeChat, etc. Some examples of consumer bots include weather, e-commerce, travel bookings, personal finance, fitness, news. These are mostly inspired by WeChat which owns the China market and is the default gateway to various internet services. These bots show up as “contacts” in these messenger platforms.
Strategically, the B2C bots are basically trying to get around the distribution monopoly of Apple & Google Android. As many studies have indicated, getting mobile users to install apps is getting extremely hard. Facebook, Skype, Telegram hope to become the system of engagement and distribution for various apps thereby becoming an alternate “App Store” or “Bot Store”.
I believe that SMS is a great channel for basic chatbot functionality. Chatbots with SMS interface can be used by all age groups and in remote parts of the world where data infrastructure is lacking. I do expect to see some interesting companies use SMS chatbots to build new business models. Also mobile bots that sniff or integrate with as many of your mobile apps to provide cross-platform and cross-app “intelligence” will succeed — Google Now is a good example.
An often cited example is the DoNotPay chatbot which helps people contest parking tickets in the UK. In my opinion, the novelty is in the service and it’s efficiency and not in the chatbot interface as such. Also, I have not met anyone who uses a B2C chatbot even on a weekly or monthly basis.
B2B Bots
Enterprise bots are available through platforms and interfaces like Slack, Skype, Microsoft Teams, website chat windows, email assistants, etc. They are focused on collaboration, replacing/augmenting emails, information assistants, support, and speeding up decision-making/communications.
Most of the enterprise bots solve niche and specific problems. This is a great advantage considering the current state of AI/ML technologies. Many of these enterprise bot companies are also able to augment their intelligence with human agents thereby providing better experiences to users.
Some of the interesting bots and services in the enterprise space include:
x.ai and Clara Labs which provide a virtual assistant to help you setup and manage your meetings.
Gong.io and Chorus provide a bot that listens in on sales calls and uses voice-to-text and other machine learning algorithms to help your sales teams get better and close more deals.
Astro is building an AI assisted email app which will have multiple interfaces including voice (Echo).
Twyla is helping to make chatbots on website more intelligent using ML. It integrates with your existing ZenDesk, LivePerson or Salesforce support.
Clarke.ai is a bot which uses AI to take notes for your meeting so you can focus better.
Smacc provides AI assisted automated book-keeping for SMBs.
Slack is one of the fastest growing SaaS companies and has the most popular enterprise bot store. Slack bots are great for pushing and pulling information & data. All SaaS services and apps should have bots that can emit useful updates, charts, data, links, etc to a specific set of users. This is much better than sending emails to an email group. Simple decisions can be taken within a chat interface using something like Slack Buttons. Instead of receiving an email and opening a web page, most people would prefer approving a leave or an expense right within Slack. Slack/Skype/etc will add the ability to embed “cards” or “webviews” or “interactive sections” within chats. This will enable some more complex use-cases to be served via bots. Most enterprise services have Slack bots and are allowing Slack to be a basic system of engagement.
Chatbots or even voice-based bots on websites will be a big deal. Imagine that each website has a virtual support rep or a sales rep available to you 24×7 in most popular languages. All business would want such “agents” or “bots” for greater sales conversions and better support.
Automation of backoffice tasks can be a HUGE business. KPOs & BPOs are a huge market sp if you can build software or software-enabled processes to reduce costs, then you can build a significant sized company. Some interesting examples here Automation Anywhere and WorkFusion.
Voice based Bots
Amazon had a surprise hit in the consumer electronics space with their Amazon Echo device which is a voice-based assistant. Google recently releases their own voice enabled apps to complete with Echo/Alexa. Voice assistants provide weather, music, searches, e-commerce ordering via NLP voice interface. Apple’s Siri should have been leading this market but as usual Apple is following rather leading the market.
Voice bots have one great advantage- with miniaturization of devices (Apple Watch, Earpods, smaller wearables), the only practical interface is voice. The other option is pairing the device with your mobile phone — which is not a smooth and intuitive process. Echo is already a great device for listening to music with its Spotify integration — just this feature is enough of a reason to buy it for most families.
Conclusion
Bots are useful and here to stay. I am not sure about the form or the distribution channel through which bots will become prevalent. In my opinion, bots are an additional interface to intelligence and application workflows. They are not disrupting any process or industry. Consumers will not shop more due to chat or voice interface bots, employees will not collaborate as desired due to bots, information discovery within your company will not improve due to bots. Actually, existing software and SaaS services are getting more intelligent, predictive and prescriptive. So this move towards “intelligent interfaces” is the real disruption.
So my concluding predictions:
B2C chatbots will turn out to be mostly hype and very few practical scalable use-cases will emerge.
Voice bots will see increasing adoption due to smaller device sizes. IoT, wearables and music are excellent use-cases for voice based interfaces. Amazon’s Alexa will become the dominant platform for voice controlled apps and devices. Google and Microsoft will invest aggressively to take on Alexa.
B2B bots can be intelligent interfaces on software platforms and SaaS products. Or they can be agents that solve very specific vertical use-cases. I am most bullish about these enterprise focused bots which are helping enterprises become more productive or to increase efficiency with intelligent assistants for specific job functions.
If you’d like to chat about anything related to this article, what tools we use to build bots, or anything else, get in touch.